目標:在中國的股票市場上盈利,每周都有單個股票盈利2%,月總盈利超過2%
計劃實現方式:Pycharm + Anaconda3 + Python3 + Django + AKShare + MongoDB
目前采用的實現方式:Pycharm + Anaconda3 + Python3 + Flask + AKShare
以后可能會用到 :MongoDB , SQLAlchemy ,baostock ,Tushare
機器學習 會在以后的實踐中逐步用到。
實現方式
上一篇文章寫了采集的方法。本篇文章包含完整代碼和調用代碼。
采用后臺執行的方式。
gupiao.py 如下:
import akshare as ak
import threading
import datetime
import os
from threading import Thread
def get_start():
start_stock_daily()
# 這里就是核心了,調用這部分就會自動下載 深圳A股 的所有股票的歷史記錄
def start_stock_daily(indicator="A股列表", folder="sz_a", prefix="sz"):
file_path = "D:/work/data/" + folder + "/"
file_path_name = get_sz_a(file_path, indicator)
print(file_path_name)
num = 0
with open(file_path_name, "r", encoding='UTF-8') as stock_lines:
for stock_line in stock_lines.readlines():
num = num + 1
if num == 1:
continue
stock_line_arr = stock_line.split("|")
symbol = prefix + stock_line_arr[5]
print("股票信息=" + symbol + "||" + stock_line_arr[6])
stock_csv = get_stock_daily(file_path, symbol)
print("stock_csv=" + stock_csv)
# 獲得深圳主板A股列表,每天獲取一次不重復獲取
# file_path 需要全路徑,以 | 進行間隔
# indicator 可選參數 "A股列表", "B股列表", "AB股列表", "上市公司列表", "主板", "中小企業板", "創業板"
def get_sz_a(file_path, indicator="A股列表"):
today = datetime.datetime.today()
file_name = "sz_a_" + today.strftime('%Y%m%d') + ".csv"
if not os.path.exists(file_path): # 如果路徑不存在則創建
os.makedirs(file_path)
if os.path.exists(file_path + file_name):
print("今日已經獲取無需再次獲取," + today.strftime('%Y%m%d'))
return file_path + file_name
stock_info_sz_df = ak.stock_info_sz_name_code(indicator=indicator)
stock_info_sz_df.to_csv(file_path + file_name, sep="|")
print('獲取深圳主板A股列表并存儲為CSV!' + today.strftime('%Y%m%d'))
return file_path + file_name
# 根據股票代碼獲取股票歷史數據
# symbol 股票代碼 需要前綴 sh 上海 sz 深圳,例如:sz300846
def get_stock_daily(file_path, symbol):
stock_zh_a_daily_hfq_df = ak.stock_zh_a_daily(symbol=symbol) # 返回不復權的數據
file_name = symbol + '.csv'
stock_zh_a_daily_hfq_df.to_csv(file_path + file_name)
return file_path + file_name
調用下載的部分,注意命名我隨便寫的,請根據情況自己修改,App.py 如下:
from flask import Flask
import akshare as ak
import gupiao
import datetime
import os
from concurrent.futures import ThreadPoolExecutor
import time
executor = ThreadPoolExecutor(2)
app = Flask(__name__)
@app.route('/test_thread')
def test_thread():
executor.submit(gupiao.get_start)
return "thread is running at background !!!"
if __name__ == '__main__':
app.run()
使用 Flask 框架,生成一個項目,然后創建一個gupiao.py 在 app.py 中調用,然后運行項目。
在瀏覽器里面訪問 http://127.0.0.1:5000/test_thread
就能在后臺看到如圖的畫面,整個深圳A股的下載時間大約在2個小時到3個小時。
股票歷史數據
下載到本地如圖
股票歷史數據
爬取數據部分就完成了,之后就是篩選了。






