放假在家无聊,完成了一键将BigQuant选出的股票池放入萝卜投研中查看其近期研报的程序,提供大家参考
import requests
import json
from datetime import datetime
year = datetime.now().year
month = datetime.now().month
day = datetime.now().day
class Signal(object):
def __init__(self):
self.token = ''
self.ids = ''
def request_plan_order(self):
url = 'https://bigquant.com/bigwebapi/algo_info/planned_orders'
headers = {'Authorization': 'Bearer {}'.format(self.token.strip().replace("\n", ""))}
data = {
'id_list': self.ids
}
r = requests.post(url=url, data=data, headers=headers)
res_dict = json.loads(r.text)['data']
return res_dict
def get_stock_code(self):
res = self.request_plan_order()
stock_list = []
for i in res:
stocks = i['planned_orders']
for j in stocks:
stock_list.append(j['sid'].split('.')[0])
return stock_list
class Main(object):
def __init__(self):
self.headers = {
'cookie': '',
'User-Agent': ''
}
def get_Notice(self):
# 新建自选股
url = 'https://gw.datayes.com/rrp_adventure/web/stockgroup'
name = "{}-{}-{}".format(year, month, day)
data = {
'groupName': name,
'isPublic': 'false',
'name': name}
self.headers.update({'Content-Type': 'application/json'})
group_id = json.loads(requests.post(url, data=json.dumps(data), headers=self.headers).content.decode())['data'][
'id']
# 创建自选股
url = 'https://gw.datayes.com/rrp_adventure/web/stockgroup/{}/stock'.format(group_id)
stock_list = Signal().get_stock_code()
self.headers.pop('Content-Type')
for i in stock_list:
data = {
'tickers': i
}
requests.post(url, data=data, headers=self.headers).content.decode()
# 查看个股研报
url = 'https://gw.datayes.com/rrp_adventure/web/getAnnouncementsByStockGroup?'
data = {
'pageNow': '1',
'groupId': group_id,
'category': "",
'important': 'false',
'pageSize': '120'
}
self.headers.update({'Content-Type': 'application/json'})
notice_list = \
json.loads(requests.post(url, data=json.dumps(data), headers=self.headers).content.decode())['data'][
'list']
self.headers.pop('Content-Type')
with open('{}.{}.{}股票池公告.txt'.format(year, month, day), 'a', encoding='utf-8') as f:
for i in notice_list:
url = 'https://gw.datayes.com/rrp_adventure/web/announcement/{}'.format(i['id'])
notice_url = json.loads(requests.get(url, headers=self.headers).content.decode())['data']['downloadUrl']
text = "时间:{},股票代码:{},股票名称:{},标题:{},链接:{}\n".format(
datetime.fromtimestamp(int(i['scannedTimestamp']) / 1000), i['stockId'], i['stockName'], i['title'],
notice_url)
print(text)
f.write(text)
if __name__ == '__main__':
Main().get_Notice()