Python 算法交易实验55 ADBS:QuantData
创始人
2024-06-02 07:20:26
0

说明

更快的切入实战

我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等。

例如,我希望持续的看到
这样的图(to 2023-3-6)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
从上图,以及模拟交易的结果中,我可以对于信号的效果作出评估,也会产生一些新的想法(例如统计过去一段时间内买卖信号的多寡),同时也可以看到按信号产生的订单在最近市场下是否能起到较好的效果。

所以,先要保证持续的数据流入,才可能自动的给出实时的结果。

  • 1 建立ADBS(QuantData),通过Sniffer持续取数
  • 2 衔接ADBS(QuantData)和ADBS(MyQuantBase),完成数据进入量化ADBS链的动作
  • 3 衔接ADBS(MyQuantBase)和ADBS(MyQuantBaseStep2Signals),完成初步的信号提取
  • 4 衔接ADBS(MyQuantBaseStep2Signals)和ADBS(MyQuantBaseStep3Decisions),形成决策
  • 5 衔接ADBS(MyQuantBaseStep3Decisions)和ADBS(MyQuantBaseStep4Trades),形成交易

实际上就是在认为的观察Trades的结果,最终提取出一些规则,然后交给最后一个ADBS替代人进行观察

  • 6 衔接ADBS(MyQuantBaseStep4Trades)和ADBS(MyQuantBaseStep5Watch)

未来,真实的发出交易指令是在决策环节。决策决定要不要买,交易则是具体成没成交。

上面是把真个链条的内容串起来了,本次要做的是第一步的内容。

内容

1 建立ADBS

  • 1 拷贝其他项目的配置文件(config_base.py), 一般来说修改项目名(同时也是数据库名)就可以了;如果是是基于其他服务,还需要改目标服务
  • 2 拷贝初始化文件(init_projects_base.py),一般来说不太需要修改,但是要根据新增的字段建立索引。

本次,在之前market、code和data_slot之外,还要加上 data_source字段的索引。

使用通用的镜像挂载容器进行执行,很快就可以执行完毕,这种操作比较安全(只要项目名也就是数据库名别给错)

在这里插入图片描述

2 适配对应的Sniffer

Sniffer的主要职责是拉数(要生成主键)。先试一下效果

速度还是挺快的,除了没有成交单数,和rq的数据看起来一样。所以将镜像升级一下,把包装上

具体的做法是用当前镜像打开一个容器,执行pip,然后tag新版本,再次提交

pip install  akshare
pip install easyquotation

docker的镜像是有层数限制的(好像是110多层),虽然目前还没有达到,还是要稍微注意

...
7a694df0ad6c: Layer already exists
3fd9df553184: Layer already exists
805802706667: Layer already exists'''.count('Layer already exists')30

然后对于ak,分钟数据似乎只能提供5天之内的,我觉得也足够了。历史的数据我可以用rq的数据一次手动补满,然后还可以不定期的手动上传进行校验。

  • ak : 数天内的实时源
  • rq : 数年内的历史源

总体上rq的数据质量应该还是更可靠的,不过初期不必考虑,等我年收益过10万了再说。还有一个很重要的点,我怀疑去做个股是劳多而功少的行为。可能存在这样的假设:对于大量的个股,少部分投资者能获益是因为随机偏差,而其管理的繁琐,以及数据的不及时/不可靠带来的成本过高

所以ETF类的标的是更好的选择,当然,如果不存在这样的ETF,本质上是要构造这样一个组合的,这时候就会买入短期内固定配比的一篮子股票。

我还是希望找到一组或者一对,周期互补的ETF组合来进行交易的。

回到实际的工作,接下来怎么做呢?

实时的sniffer,在工作日的上午9:25分开始,到11:35;下午12:55 到15:05分为止,每个时隙执行查询。

有个细节是,9:30分的数据要不要?因为容易产生噪声,暂时先保留吧,反正我滚动几百分钟,这个可以消弭掉。

既然需要定时执行的部分确定了,那么整个ADBS的就可以开始开发了。手动的部分我觉得可以采用独立与ADBS体系外的单独脚本,调试好,按照规则发往系统入队列就行。

**关于原始数据,其实还有一个比较麻烦的地方,就是除权除息。**这个问题押后考虑吧,主要的工作也就是获得对应的除权除息日和折算关系,然后提前进行计算上的准备,在新的时点进行切换。实时性不高的甚至可以提前清仓,除权日过后重新开仓。(我怀疑可能真有机构这么搞的,至少会减仓)

比起ADBS之间的衔接Sniffer,这种按简单规则从接口取数的Sniffer显然要简单的多。

交易时间限制(先不管节假日,反正每分钟只要10条数据,没关系):


# 获取当前的分钟数
cur_slot = ts2ord()# ts = inverse_time_str('2000-01-01 09:25:00')
# ts = inverse_time_str('2000-01-01 11:35:00')
# ts = inverse_time_str('2000-01-01 12:55:00')
# ts = inverse_time_str('2000-01-01 15:05:00')
# slot = ts2ord(ts) % 1440
morning_start = 565
morning_end = 695
afternoon_start = 775
afternoon_end = 905slot_hour = cur_slot % 1440if (slot_hour >= morning_start and slot_hour = afternoon_start and slot_hour

在调试的时候转了个圈,发现队列一会有消息,一会没消息。原因是默认的函数里含有了一个另外一个服务器的地址(感觉很像走进科学…)

将sniffer部分进行重写

import osprint('>>>sniffer is Running  ')
runcode =''
for some_app in ['sniffer01_query_akshare.py']:runcode += str(os.system('python3 %s' % some_app))
print('>>>sniffer RunCode :%s ' % runcode)

将sniffer和sniffer01_query_akshare在启动时挂载(覆盖)就完成了Sniffer部分。(注意要调整sniffer的执行周期,之前是默认按天的…)

3 适配 app01_PullToStep1MongoIn

主要就是修改一下使用的Redis Buffer的名称,这个是最早设计的时候没有考虑redis buffer 统计入流量。

这里也是同上,修改名字后挂载就可以。

4 透传 TransWorker

在这个Case中,暂时想不到还要做什么ETL,所以可以写一个透传Worker。原来的DummyWorker有些地方写死,并不好,所以做一个微调,这样就可以实现输入的透传了。

明天先实验将这个ADBS运行起来(虽然worker不通,但是可以入库的)

这里碰到一个小问题,就是AETL默认是不允许数据有空值的,这个在当时的设计有当时的考虑(高稳定),放在这里倒是给自己设了一道坎。

比较合适的解决方式是在sniffer阶段就进行字段的框定和空值的处理,值得注意的是,如果多个数据源都要进入一个ADBS,那么这些数据源只能保持相同的列。

在Sniffer阶段要设计schema,后续的Worker可以透传这个schema。

  • Sniffer有一个特殊的变量 vars_to_handle(代码上非必须,流程上必须)。后续Worker会透传其中的内容。

TransWorker.py

from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import * gs_id = 'rec_id'
sample_listofdict = [{'amt': '3171687.0','close': '4.025','code': '510300','data_dt': '2023-03-09 13:49:00','data_slot': '27972829','data_source': 'AK','high': '4.025','low': '4.024','market': 'SH','open': '4.025','rec_id': 'AK_SH_510300_27972829','vol': '7880','_ch001': 0}]af = APIFunc('apifunc_database_model1', listofdict= sample_listofdict, key_id=gs_id)chain_funcname_dict = {}# 透传
@af.route('/MergeOut', is_force=True)
def MergeOut(input_dict = None, para_dict = None):msg = RuleMSG('MergeOut')# 1 【不检查字段】# need_cols = ['some_str']# input_cols = list(input_dict.keys())# if not (set(need_cols) <= set(input_cols)):#     msg.status = False #     msg.rule_result = None #     msg.msg = 'InputSetError'#     msg.data = None  #     return msg.to_dict()try:msg.msg ='ok'msg.status = Truemsg.data = input_dictmsg.rule_result = 1except:msg.status = Falsemsg.rule_result = Nonemsg.data = Nonereturn msg.to_dict()# >>> 添加到chain_funcname_dict
MergeOut = {}
MergeOut['subline_dict'] =None
MergeOut['main_dict'] = {'key_id':gs_id, 'depend_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot'],'input_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot']}
MergeOut['para_dict'] = None
chain_funcname_dict['MergeOut'] = MergeOutTransPassChain_session = ['MergeOut']TransPassChain_list = TransPassChain_session
TransPassChain_dict = {}
for k in TransPassChain_list:TransPassChain_dict[k] = chain_funcname_dict[k]if __name__ =='__main__':af.run_chain(chain_funcname_list=TransPassChain_list,chain_funcname_dict=TransPassChain_dict)af.g['ruleresult_frame'][ [x for x in af.g['ruleresult_frame'].columns if x !=gs_id]].sum()

5 修改 app03_PullToStep1MongoOut.py

这里似乎有一个ADBS设计上需要修改的地方:变量类型转换。

原来默认都是字符串,似乎在进行Redis批量写入的时候都转为字符串了。

现在希望能够将变量进行数值类型的转换,因为在使用时,筛选起来比较容易。

在两个地方可以做这个操作,一个是拉到MongoIn的过程,还有一个是拉到MongoOut的过程

保险起见,我们在这两个过程都对数值型的变量进行声明(为Float)

具体的做法就是通过重写一个方法S2M( Stream To Mongo)

...# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False ):must_cols = ['tier1','tier2','sniffer_name','keyname']assert all(must_cols), ','.join(must_cols) + ' 不能为空'cur_w = w or self.w cur_redis_agent = redis_agent or self.redis_agentbatch_num = batch_num or 1msg =''log_tier1 = log_tier1 or tier1 log_tier2 = log_tier2 or 'log_sniffer' + get_time_str2()# 默认三十秒超时time_out = time_out or 30tick1 = time.time()qname = stream_namecur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':qname,'connection_hash':connection_hash}).json()q_len = cur_len_resp['data']print('{} Q has {} Messages' .format (qname,q_len))if q_len :q_data_resp = self.xrange(qname, count = batch_num, redis_agent = redis_agent, connection_hash = connection_hash)q_data = pd.DataFrame(q_data_resp['data'])# q_data = pd.DataFrame(cur_lq.xrange(qname, count=batch_num))msg_id_list = list(q_data['_msg_id'])# 去重q_data1 = q_data.drop_duplicates([keyname], keep='last')# 脱掉自动变量:在这种情况,不需要声明其他参数(通道是默认的_ch001)q_data2 = self.filter_and_add_msg(q_data1)q_listofdict = q_data2.to_dict(orient='records')q_db_resp = cur_w.insert_or_update_with_key(tier1 = tier1, tier2 = tier2, data_listofdict = q_listofdict, key_name=keyname)# 完成后删除消息del_cnt = self.xdel(qname, mid_or_list = msg_id_list)msg = 'ok,deliver(del) {} messages from {}' .format(del_cnt['data'] ,qname)else:msg ='no data source {}' .format(qname)tick2 = time.time()duration = round(tick2 -tick1,2)log_dict = {'sniffer': sniffer_name,'duration':duration,'msg': msg }return cur_w.insert_recs(tier1=log_tier1, tier2=log_tier2, data_listofdict =[log_dict])

要修改的部分其实不多,增加一个toDoubleVarList,将对应的变量做一个转换即可。使用python对象的继承方法,将这个对象

class StreamsIO_X(StreamsIO):# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False, toDoubleVarList = [] ):...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

app03_PullToStep1MongoOut.py

...
sio = StreamsIO_X('sio', w = cur_w ,redis_agent = redis_agent_host)
toDoubleVarList = ['open', 'close', 'high', 'low', 'vol', 'amt','data_slot']sio.S2M(work_out_stream,keyname= keyname, tier1= tier1,tier2=tier2,w =cur_w, batch_num =batch_num,sniffer_name=sniffer_name,log_tier2 =log_tier2, redis_agent = redis_agent_host,toDoubleVarList=toDoubleVarList)

同理,也可以修改app01_PullToStep1MongoIn.py(当时app01并没有直接使用S2M方法)

...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

在这里插入图片描述

6 补充

如果一开始不小心将mongo的变量格式搞错了,那么可以这么修改

import pymongo
lmongo=pymongo.MongoClient(host='172.17.0.1',port=111,username='aaa',password='bbb',authSource='admin',authMechanism='SCRAM-SHA-1')lmongo.list_database_names()# 声明数据库和集合
the_collection = lmongo['MyQuantBase']['step1_mongo_in']for varname in sorted(['amt','close','data_slot','high','low','open','vol']):command_dict = {'%s' % varname :{'$exists': True, '$type': 'string'}} command_list = [{'$set':{'%s' % varname  :  { '$toDouble': '$%s' % varname }}}]res = the_collection.update_many(command_dict, command_list)print(res.acknowledged)

7 总结

  • 1 本次打通了实时的分钟级取数
  • 2 对ADBS的规范进行了一些强化:例如入数据的变量要确定,不可缺失。同时对透传的Worker进行了一些适配性更改。
  • 3 考虑到有一些字段是要作为过滤器的,而经过Redis队列,很多变量似乎都变字符了。所以在两个ToMongo的App(01,03)增加了转浮点的部分。(在以后的ADBS升级中可以考虑将这些固化到配置文件中)

相关内容

热门资讯

经典搞笑诗句 经典搞笑诗句  1、《清明》  唐·杜牧  清明时节雨纷纷  孤家寡人欲断魂  借问美女何处有  牧...
《长恨歌·汉皇重色思倾国》翻... 《长恨歌·汉皇重色思倾国》翻译赏析  《长恨歌·汉皇重色思倾国》出自唐诗三百首全集,其作者是唐朝文学...
“春去能忘诗共赋,客来应是酒... 寄刘禹锡戴叔伦谢相园西石径斜,知君习隐暂为家。有时出郭行芳草,长日临池看落花。春去能忘诗共赋,客来应...
晚春唐韩愈的古诗 晚春唐韩愈的古诗  《晚春》是唐代文学家韩愈的诗作,这是一首写暮春景物的七绝,表达了诗人惜春的思想感...
盛开的荷花诗句 盛开的荷花诗句  瞧,在一张张绿色的大玉盘似的荷叶中冒出了一朵朵粉嫩的荷花,是那么的.美丽。  荷风...
咏柳的诗句 咏柳的诗句  篇一:古诗 咏柳  咏柳  贺知章  碧玉妆成一树高,万条垂下绿丝绦。 不知细叶谁裁出...
元宵赏花灯的诗句 元宵赏花灯的诗句(精选16首)  在现实生活或工作学习中,大家一定没少看到经典的.诗句吧,诗句节奏上...
讽刺人的诗句 讽刺人的诗句  讽刺人的诗句(第一部分)  1、《七步诗》  作者:魏、曹植  煮豆持作羹,漉豉以为...
马李贺拼音 马李贺拼音  李贺所写的咏马的诗句有哪些呢?诗人李贺通过马的形象表达了自己心中的`情感。下面是小编分...
和荷花的诗句 和荷花有关的诗句  无论是身处学校还是步入社会,大家都对那些朗朗上口的诗句很是熟悉吧,诗句具有语言高...
形容夏季的诗句 形容夏季的诗句  夏天是生机勃勃的,夏天是美丽的,关于夏天的诗句有哪些呢?  1、仲夏苦夜短,开轩纳...
繁华事散逐香尘 “繁华事散逐香尘”出处 出自 唐代 杜牧 的《金谷园》“繁华事散逐香尘”平仄韵脚 拼音:fán hu...
女子思念心上人的诗句 女子思念心上人的诗句  引导语:自古以来,亦流传下来了不少诗句,那么其中有哪些是写女子思念心上人的呢...
描与春天的花的诗句 描与春天的花的诗句  草色青青柳色黄, 桃花历乱李花香。 (贾至《春思》)  池塘生春草,园柳变鸣禽...
慨当初,倚飞何重,后来何酷 “慨当初,倚飞何重,后来何酷。”出处 出自 明代 文征明 的《满江红·拂拭残碑》“慨当初,倚飞何重,...
《王安石诗全集》(9)   《耿天骘惠梨次韵奉酬三首》  桐乡山远复川长,紫翠连城碧满隍。  今日桐乡谁爱我,当时我自爱桐乡...
莫言名言名句 莫言名言名句  导语:“思念一个人的滋味,就象是喝了一杯冰冷的水,然后一滴一滴凝成热泪。”下面是应届...
东指羲和能走马,海尘新生石山... “东指羲和能走马,海尘新生石山下。”出处 出自 唐代 李贺 的《天上谣》“东指羲和能走马,海尘新生石...
寻胡隐君古诗-寻胡隐君 高启 寻胡隐君古诗-寻胡隐君 高启寻胡隐君古诗,是一首无言绝句诗,主要借景色叙述作者的好心情。本文由unj...
描写知己的抒情诗句 描写知己的抒情诗句  1、婴其呜矣,求其友声。——《诗经·小雅》  2、四海皆兄弟,谁为行路人。——...