Python 算法交易实验55 ADBS:QuantData
创始人
2024-06-02 07:20:26
0

说明

更快的切入实战

我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等。

例如,我希望持续的看到
这样的图(to 2023-3-6)
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
在这里插入图片描述
从上图,以及模拟交易的结果中,我可以对于信号的效果作出评估,也会产生一些新的想法(例如统计过去一段时间内买卖信号的多寡),同时也可以看到按信号产生的订单在最近市场下是否能起到较好的效果。

所以,先要保证持续的数据流入,才可能自动的给出实时的结果。

  • 1 建立ADBS(QuantData),通过Sniffer持续取数
  • 2 衔接ADBS(QuantData)和ADBS(MyQuantBase),完成数据进入量化ADBS链的动作
  • 3 衔接ADBS(MyQuantBase)和ADBS(MyQuantBaseStep2Signals),完成初步的信号提取
  • 4 衔接ADBS(MyQuantBaseStep2Signals)和ADBS(MyQuantBaseStep3Decisions),形成决策
  • 5 衔接ADBS(MyQuantBaseStep3Decisions)和ADBS(MyQuantBaseStep4Trades),形成交易

实际上就是在认为的观察Trades的结果,最终提取出一些规则,然后交给最后一个ADBS替代人进行观察

  • 6 衔接ADBS(MyQuantBaseStep4Trades)和ADBS(MyQuantBaseStep5Watch)

未来,真实的发出交易指令是在决策环节。决策决定要不要买,交易则是具体成没成交。

上面是把真个链条的内容串起来了,本次要做的是第一步的内容。

内容

1 建立ADBS

  • 1 拷贝其他项目的配置文件(config_base.py), 一般来说修改项目名(同时也是数据库名)就可以了;如果是是基于其他服务,还需要改目标服务
  • 2 拷贝初始化文件(init_projects_base.py),一般来说不太需要修改,但是要根据新增的字段建立索引。

本次,在之前market、code和data_slot之外,还要加上 data_source字段的索引。

使用通用的镜像挂载容器进行执行,很快就可以执行完毕,这种操作比较安全(只要项目名也就是数据库名别给错)

在这里插入图片描述

2 适配对应的Sniffer

Sniffer的主要职责是拉数(要生成主键)。先试一下效果

速度还是挺快的,除了没有成交单数,和rq的数据看起来一样。所以将镜像升级一下,把包装上

具体的做法是用当前镜像打开一个容器,执行pip,然后tag新版本,再次提交

pip install  akshare
pip install easyquotation

docker的镜像是有层数限制的(好像是110多层),虽然目前还没有达到,还是要稍微注意

...
7a694df0ad6c: Layer already exists
3fd9df553184: Layer already exists
805802706667: Layer already exists'''.count('Layer already exists')30

然后对于ak,分钟数据似乎只能提供5天之内的,我觉得也足够了。历史的数据我可以用rq的数据一次手动补满,然后还可以不定期的手动上传进行校验。

  • ak : 数天内的实时源
  • rq : 数年内的历史源

总体上rq的数据质量应该还是更可靠的,不过初期不必考虑,等我年收益过10万了再说。还有一个很重要的点,我怀疑去做个股是劳多而功少的行为。可能存在这样的假设:对于大量的个股,少部分投资者能获益是因为随机偏差,而其管理的繁琐,以及数据的不及时/不可靠带来的成本过高

所以ETF类的标的是更好的选择,当然,如果不存在这样的ETF,本质上是要构造这样一个组合的,这时候就会买入短期内固定配比的一篮子股票。

我还是希望找到一组或者一对,周期互补的ETF组合来进行交易的。

回到实际的工作,接下来怎么做呢?

实时的sniffer,在工作日的上午9:25分开始,到11:35;下午12:55 到15:05分为止,每个时隙执行查询。

有个细节是,9:30分的数据要不要?因为容易产生噪声,暂时先保留吧,反正我滚动几百分钟,这个可以消弭掉。

既然需要定时执行的部分确定了,那么整个ADBS的就可以开始开发了。手动的部分我觉得可以采用独立与ADBS体系外的单独脚本,调试好,按照规则发往系统入队列就行。

**关于原始数据,其实还有一个比较麻烦的地方,就是除权除息。**这个问题押后考虑吧,主要的工作也就是获得对应的除权除息日和折算关系,然后提前进行计算上的准备,在新的时点进行切换。实时性不高的甚至可以提前清仓,除权日过后重新开仓。(我怀疑可能真有机构这么搞的,至少会减仓)

比起ADBS之间的衔接Sniffer,这种按简单规则从接口取数的Sniffer显然要简单的多。

交易时间限制(先不管节假日,反正每分钟只要10条数据,没关系):


# 获取当前的分钟数
cur_slot = ts2ord()# ts = inverse_time_str('2000-01-01 09:25:00')
# ts = inverse_time_str('2000-01-01 11:35:00')
# ts = inverse_time_str('2000-01-01 12:55:00')
# ts = inverse_time_str('2000-01-01 15:05:00')
# slot = ts2ord(ts) % 1440
morning_start = 565
morning_end = 695
afternoon_start = 775
afternoon_end = 905slot_hour = cur_slot % 1440if (slot_hour >= morning_start and slot_hour = afternoon_start and slot_hour

在调试的时候转了个圈,发现队列一会有消息,一会没消息。原因是默认的函数里含有了一个另外一个服务器的地址(感觉很像走进科学…)

将sniffer部分进行重写

import osprint('>>>sniffer is Running  ')
runcode =''
for some_app in ['sniffer01_query_akshare.py']:runcode += str(os.system('python3 %s' % some_app))
print('>>>sniffer RunCode :%s ' % runcode)

将sniffer和sniffer01_query_akshare在启动时挂载(覆盖)就完成了Sniffer部分。(注意要调整sniffer的执行周期,之前是默认按天的…)

3 适配 app01_PullToStep1MongoIn

主要就是修改一下使用的Redis Buffer的名称,这个是最早设计的时候没有考虑redis buffer 统计入流量。

这里也是同上,修改名字后挂载就可以。

4 透传 TransWorker

在这个Case中,暂时想不到还要做什么ETL,所以可以写一个透传Worker。原来的DummyWorker有些地方写死,并不好,所以做一个微调,这样就可以实现输入的透传了。

明天先实验将这个ADBS运行起来(虽然worker不通,但是可以入库的)

这里碰到一个小问题,就是AETL默认是不允许数据有空值的,这个在当时的设计有当时的考虑(高稳定),放在这里倒是给自己设了一道坎。

比较合适的解决方式是在sniffer阶段就进行字段的框定和空值的处理,值得注意的是,如果多个数据源都要进入一个ADBS,那么这些数据源只能保持相同的列。

在Sniffer阶段要设计schema,后续的Worker可以透传这个schema。

  • Sniffer有一个特殊的变量 vars_to_handle(代码上非必须,流程上必须)。后续Worker会透传其中的内容。

TransWorker.py

from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import * gs_id = 'rec_id'
sample_listofdict = [{'amt': '3171687.0','close': '4.025','code': '510300','data_dt': '2023-03-09 13:49:00','data_slot': '27972829','data_source': 'AK','high': '4.025','low': '4.024','market': 'SH','open': '4.025','rec_id': 'AK_SH_510300_27972829','vol': '7880','_ch001': 0}]af = APIFunc('apifunc_database_model1', listofdict= sample_listofdict, key_id=gs_id)chain_funcname_dict = {}# 透传
@af.route('/MergeOut', is_force=True)
def MergeOut(input_dict = None, para_dict = None):msg = RuleMSG('MergeOut')# 1 【不检查字段】# need_cols = ['some_str']# input_cols = list(input_dict.keys())# if not (set(need_cols) <= set(input_cols)):#     msg.status = False #     msg.rule_result = None #     msg.msg = 'InputSetError'#     msg.data = None  #     return msg.to_dict()try:msg.msg ='ok'msg.status = Truemsg.data = input_dictmsg.rule_result = 1except:msg.status = Falsemsg.rule_result = Nonemsg.data = Nonereturn msg.to_dict()# >>> 添加到chain_funcname_dict
MergeOut = {}
MergeOut['subline_dict'] =None
MergeOut['main_dict'] = {'key_id':gs_id, 'depend_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot'],'input_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot']}
MergeOut['para_dict'] = None
chain_funcname_dict['MergeOut'] = MergeOutTransPassChain_session = ['MergeOut']TransPassChain_list = TransPassChain_session
TransPassChain_dict = {}
for k in TransPassChain_list:TransPassChain_dict[k] = chain_funcname_dict[k]if __name__ =='__main__':af.run_chain(chain_funcname_list=TransPassChain_list,chain_funcname_dict=TransPassChain_dict)af.g['ruleresult_frame'][ [x for x in af.g['ruleresult_frame'].columns if x !=gs_id]].sum()

5 修改 app03_PullToStep1MongoOut.py

这里似乎有一个ADBS设计上需要修改的地方:变量类型转换。

原来默认都是字符串,似乎在进行Redis批量写入的时候都转为字符串了。

现在希望能够将变量进行数值类型的转换,因为在使用时,筛选起来比较容易。

在两个地方可以做这个操作,一个是拉到MongoIn的过程,还有一个是拉到MongoOut的过程

保险起见,我们在这两个过程都对数值型的变量进行声明(为Float)

具体的做法就是通过重写一个方法S2M( Stream To Mongo)

...# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False ):must_cols = ['tier1','tier2','sniffer_name','keyname']assert all(must_cols), ','.join(must_cols) + ' 不能为空'cur_w = w or self.w cur_redis_agent = redis_agent or self.redis_agentbatch_num = batch_num or 1msg =''log_tier1 = log_tier1 or tier1 log_tier2 = log_tier2 or 'log_sniffer' + get_time_str2()# 默认三十秒超时time_out = time_out or 30tick1 = time.time()qname = stream_namecur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':qname,'connection_hash':connection_hash}).json()q_len = cur_len_resp['data']print('{} Q has {} Messages' .format (qname,q_len))if q_len :q_data_resp = self.xrange(qname, count = batch_num, redis_agent = redis_agent, connection_hash = connection_hash)q_data = pd.DataFrame(q_data_resp['data'])# q_data = pd.DataFrame(cur_lq.xrange(qname, count=batch_num))msg_id_list = list(q_data['_msg_id'])# 去重q_data1 = q_data.drop_duplicates([keyname], keep='last')# 脱掉自动变量:在这种情况,不需要声明其他参数(通道是默认的_ch001)q_data2 = self.filter_and_add_msg(q_data1)q_listofdict = q_data2.to_dict(orient='records')q_db_resp = cur_w.insert_or_update_with_key(tier1 = tier1, tier2 = tier2, data_listofdict = q_listofdict, key_name=keyname)# 完成后删除消息del_cnt = self.xdel(qname, mid_or_list = msg_id_list)msg = 'ok,deliver(del) {} messages from {}' .format(del_cnt['data'] ,qname)else:msg ='no data source {}' .format(qname)tick2 = time.time()duration = round(tick2 -tick1,2)log_dict = {'sniffer': sniffer_name,'duration':duration,'msg': msg }return cur_w.insert_recs(tier1=log_tier1, tier2=log_tier2, data_listofdict =[log_dict])

要修改的部分其实不多,增加一个toDoubleVarList,将对应的变量做一个转换即可。使用python对象的继承方法,将这个对象

class StreamsIO_X(StreamsIO):# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False, toDoubleVarList = [] ):...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

app03_PullToStep1MongoOut.py

...
sio = StreamsIO_X('sio', w = cur_w ,redis_agent = redis_agent_host)
toDoubleVarList = ['open', 'close', 'high', 'low', 'vol', 'amt','data_slot']sio.S2M(work_out_stream,keyname= keyname, tier1= tier1,tier2=tier2,w =cur_w, batch_num =batch_num,sniffer_name=sniffer_name,log_tier2 =log_tier2, redis_agent = redis_agent_host,toDoubleVarList=toDoubleVarList)

同理,也可以修改app01_PullToStep1MongoIn.py(当时app01并没有直接使用S2M方法)

...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)

在这里插入图片描述

6 补充

如果一开始不小心将mongo的变量格式搞错了,那么可以这么修改

import pymongo
lmongo=pymongo.MongoClient(host='172.17.0.1',port=111,username='aaa',password='bbb',authSource='admin',authMechanism='SCRAM-SHA-1')lmongo.list_database_names()# 声明数据库和集合
the_collection = lmongo['MyQuantBase']['step1_mongo_in']for varname in sorted(['amt','close','data_slot','high','low','open','vol']):command_dict = {'%s' % varname :{'$exists': True, '$type': 'string'}} command_list = [{'$set':{'%s' % varname  :  { '$toDouble': '$%s' % varname }}}]res = the_collection.update_many(command_dict, command_list)print(res.acknowledged)

7 总结

  • 1 本次打通了实时的分钟级取数
  • 2 对ADBS的规范进行了一些强化:例如入数据的变量要确定,不可缺失。同时对透传的Worker进行了一些适配性更改。
  • 3 考虑到有一些字段是要作为过滤器的,而经过Redis队列,很多变量似乎都变字符了。所以在两个ToMongo的App(01,03)增加了转浮点的部分。(在以后的ADBS升级中可以考虑将这些固化到配置文件中)

相关内容

热门资讯

常用商务英语口语   商务英语是以适应职场生活的语言要求为目的,内容涉及到商务活动的方方面面。下面是小编收集的常用商务...
六年级上册英语第一单元练习题   一、根据要求写单词。  1.dry(反义词)__________________  2.writ...
复活节英文怎么说 复活节英文怎么说?复活节的英语翻译是什么?复活节:Easter;"Easter,anniversar...
2008年北京奥运会主题曲 2008年北京奥运会(第29届夏季奥林匹克运动会),2008年8月8日到2008年8月24日在中华人...
英语道歉信 英语道歉信15篇  在日常生活中,道歉信的使用频率越来越高,通过道歉信,我们可以更好地解释事情发生的...
六年级英语专题训练(连词成句... 六年级英语专题训练(连词成句30题)  1. have,playhouse,many,I,toy,i...
上班迟到情况说明英语   每个人都或多或少的迟到过那么几次,因为各种原因,可能生病,可能因为交通堵车,可能是因为天气冷,有...
小学英语教学论文 小学英语教学论文范文  引导语:英语教育一直都是每个家长所器重的,那么有关小学英语教学论文要怎么写呢...
英语口语学习必看的方法技巧 英语口语学习必看的方法技巧如何才能说流利的英语? 说外语时,我们主要应做到四件事:理解、回答、提问、...
四级英语作文选:Birth ... 四级英语作文范文选:Birth controlSince the Chinese Governmen...
金融专业英语面试自我介绍 金融专业英语面试自我介绍3篇  金融专业的学生面试时,面试官要求用英语做自我介绍该怎么说。下面是小编...
我的李老师走了四年级英语日记... 我的李老师走了四年级英语日记带翻译  我上了五个学期的小学却换了六任老师,李老师是带我们班最长的语文...
小学三年级英语日记带翻译捡玉... 小学三年级英语日记带翻译捡玉米  今天,我和妈妈去外婆家,外婆家有刚剥的`玉米棒上带有玉米籽,好大的...
七年级英语优秀教学设计 七年级英语优秀教学设计  作为一位兢兢业业的人民教师,常常要写一份优秀的教学设计,教学设计是把教学原...
我的英语老师作文 我的英语老师作文(通用21篇)  在日常生活或是工作学习中,大家都有写作文的经历,对作文很是熟悉吧,...
英语老师教学经验总结 英语老师教学经验总结(通用19篇)  总结是指社会团体、企业单位和个人对某一阶段的学习、工作或其完成...
初一英语暑假作业答案 初一英语暑假作业答案  英语练习一(基础训练)第一题1.D2.H3.E4.F5.I6.A7.J8.C...
大学生的英语演讲稿 大学生的英语演讲稿范文(精选10篇)  使用正确的写作思路书写演讲稿会更加事半功倍。在现实社会中,越...
VOA美国之音英语学习网址 VOA美国之音英语学习推荐网址 美国之音网站已经成为语言学习最重要的资源站点,在互联网上还有若干网站...
商务英语期末试卷 Part I Term Translation (20%)Section A: Translate ...