更快的切入实战
我希望在开发的过程中也可以看到一些信号或者模拟结果在当前市场下的表现,这样有助于更好的评估方法的健壮性,特性等。
例如,我希望持续的看到
这样的图(to 2023-3-6)
从上图,以及模拟交易的结果中,我可以对于信号的效果作出评估,也会产生一些新的想法(例如统计过去一段时间内买卖信号的多寡),同时也可以看到按信号产生的订单在最近市场下是否能起到较好的效果。
所以,先要保证持续的数据流入,才可能自动的给出实时的结果。
实际上就是在认为的观察Trades的结果,最终提取出一些规则,然后交给最后一个ADBS替代人进行观察
未来,真实的发出交易指令是在决策环节。决策决定要不要买,交易则是具体成没成交。
上面是把真个链条的内容串起来了,本次要做的是第一步的内容。
本次,在之前market、code和data_slot之外,还要加上 data_source字段的索引。
使用通用的镜像挂载容器进行执行,很快就可以执行完毕,这种操作比较安全(只要项目名也就是数据库名别给错)
Sniffer的主要职责是拉数(要生成主键)。先试一下效果
速度还是挺快的,除了没有成交单数,和rq的数据看起来一样。所以将镜像升级一下,把包装上
具体的做法是用当前镜像打开一个容器,执行pip,然后tag新版本,再次提交
pip install akshare
pip install easyquotation
docker的镜像是有层数限制的(好像是110多层),虽然目前还没有达到,还是要稍微注意
...
7a694df0ad6c: Layer already exists
3fd9df553184: Layer already exists
805802706667: Layer already exists'''.count('Layer already exists')30
然后对于ak,分钟数据似乎只能提供5天之内的,我觉得也足够了。历史的数据我可以用rq的数据一次手动补满,然后还可以不定期的手动上传进行校验。
总体上rq的数据质量应该还是更可靠的,不过初期不必考虑,等我年收益过10万了再说。还有一个很重要的点,我怀疑去做个股是劳多而功少的行为。可能存在这样的假设:对于大量的个股,少部分投资者能获益是因为随机偏差,而其管理的繁琐,以及数据的不及时/不可靠带来的成本过高。
所以ETF类的标的是更好的选择,当然,如果不存在这样的ETF,本质上是要构造这样一个组合的,这时候就会买入短期内固定配比的一篮子股票。
我还是希望找到一组或者一对,周期互补的ETF组合来进行交易的。
回到实际的工作,接下来怎么做呢?
实时的sniffer,在工作日的上午9:25分开始,到11:35;下午12:55 到15:05分为止,每个时隙执行查询。
有个细节是,9:30分的数据要不要?因为容易产生噪声,暂时先保留吧,反正我滚动几百分钟,这个可以消弭掉。
既然需要定时执行的部分确定了,那么整个ADBS的就可以开始开发了。手动的部分我觉得可以采用独立与ADBS体系外的单独脚本,调试好,按照规则发往系统入队列就行。
**关于原始数据,其实还有一个比较麻烦的地方,就是除权除息。**这个问题押后考虑吧,主要的工作也就是获得对应的除权除息日和折算关系,然后提前进行计算上的准备,在新的时点进行切换。实时性不高的甚至可以提前清仓,除权日过后重新开仓。(我怀疑可能真有机构这么搞的,至少会减仓)
比起ADBS之间的衔接Sniffer,这种按简单规则从接口取数的Sniffer显然要简单的多。
交易时间限制(先不管节假日,反正每分钟只要10条数据,没关系):
# 获取当前的分钟数
cur_slot = ts2ord()# ts = inverse_time_str('2000-01-01 09:25:00')
# ts = inverse_time_str('2000-01-01 11:35:00')
# ts = inverse_time_str('2000-01-01 12:55:00')
# ts = inverse_time_str('2000-01-01 15:05:00')
# slot = ts2ord(ts) % 1440
morning_start = 565
morning_end = 695
afternoon_start = 775
afternoon_end = 905slot_hour = cur_slot % 1440if (slot_hour >= morning_start and slot_hour = afternoon_start and slot_hour
在调试的时候转了个圈,发现队列一会有消息,一会没消息。原因是默认的函数里含有了一个另外一个服务器的地址(感觉很像走进科学…)
将sniffer部分进行重写
import osprint('>>>sniffer is Running ')
runcode =''
for some_app in ['sniffer01_query_akshare.py']:runcode += str(os.system('python3 %s' % some_app))
print('>>>sniffer RunCode :%s ' % runcode)
将sniffer和sniffer01_query_akshare在启动时挂载(覆盖)就完成了Sniffer部分。(注意要调整sniffer的执行周期,之前是默认按天的…)
主要就是修改一下使用的Redis Buffer的名称,这个是最早设计的时候没有考虑redis buffer 统计入流量。
这里也是同上,修改名字后挂载就可以。
在这个Case中,暂时想不到还要做什么ETL,所以可以写一个透传Worker。原来的DummyWorker有些地方写死,并不好,所以做一个微调,这样就可以实现输入的透传了。
明天先实验将这个ADBS运行起来(虽然worker不通,但是可以入库的)
这里碰到一个小问题,就是AETL默认是不允许数据有空值的,这个在当时的设计有当时的考虑(高稳定),放在这里倒是给自己设了一道坎。
比较合适的解决方式是在sniffer阶段就进行字段的框定和空值的处理,值得注意的是,如果多个数据源都要进入一个ADBS,那么这些数据源只能保持相同的列。
在Sniffer阶段要设计schema,后续的Worker可以透传这个schema。
TransWorker.py
from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import * gs_id = 'rec_id'
sample_listofdict = [{'amt': '3171687.0','close': '4.025','code': '510300','data_dt': '2023-03-09 13:49:00','data_slot': '27972829','data_source': 'AK','high': '4.025','low': '4.024','market': 'SH','open': '4.025','rec_id': 'AK_SH_510300_27972829','vol': '7880','_ch001': 0}]af = APIFunc('apifunc_database_model1', listofdict= sample_listofdict, key_id=gs_id)chain_funcname_dict = {}# 透传
@af.route('/MergeOut', is_force=True)
def MergeOut(input_dict = None, para_dict = None):msg = RuleMSG('MergeOut')# 1 【不检查字段】# need_cols = ['some_str']# input_cols = list(input_dict.keys())# if not (set(need_cols) <= set(input_cols)):# msg.status = False # msg.rule_result = None # msg.msg = 'InputSetError'# msg.data = None # return msg.to_dict()try:msg.msg ='ok'msg.status = Truemsg.data = input_dictmsg.rule_result = 1except:msg.status = Falsemsg.rule_result = Nonemsg.data = Nonereturn msg.to_dict()# >>> 添加到chain_funcname_dict
MergeOut = {}
MergeOut['subline_dict'] =None
MergeOut['main_dict'] = {'key_id':gs_id, 'depend_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot'],'input_cols':['data_dt', 'open', 'close', 'high', 'low', 'vol', 'amt', 'data_source', 'code', 'market', 'data_slot']}
MergeOut['para_dict'] = None
chain_funcname_dict['MergeOut'] = MergeOutTransPassChain_session = ['MergeOut']TransPassChain_list = TransPassChain_session
TransPassChain_dict = {}
for k in TransPassChain_list:TransPassChain_dict[k] = chain_funcname_dict[k]if __name__ =='__main__':af.run_chain(chain_funcname_list=TransPassChain_list,chain_funcname_dict=TransPassChain_dict)af.g['ruleresult_frame'][ [x for x in af.g['ruleresult_frame'].columns if x !=gs_id]].sum()
这里似乎有一个ADBS设计上需要修改的地方:变量类型转换。
原来默认都是字符串,似乎在进行Redis批量写入的时候都转为字符串了。
现在希望能够将变量进行数值类型的转换,因为在使用时,筛选起来比较容易。
在两个地方可以做这个操作,一个是拉到MongoIn的过程,还有一个是拉到MongoOut的过程
保险起见,我们在这两个过程都对数值型的变量进行声明(为Float)
具体的做法就是通过重写一个方法S2M( Stream To Mongo)
...# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False ):must_cols = ['tier1','tier2','sniffer_name','keyname']assert all(must_cols), ','.join(must_cols) + ' 不能为空'cur_w = w or self.w cur_redis_agent = redis_agent or self.redis_agentbatch_num = batch_num or 1msg =''log_tier1 = log_tier1 or tier1 log_tier2 = log_tier2 or 'log_sniffer' + get_time_str2()# 默认三十秒超时time_out = time_out or 30tick1 = time.time()qname = stream_namecur_len_resp = req.post(redis_agent + 'len_of_queue/',json ={'stream_name':qname,'connection_hash':connection_hash}).json()q_len = cur_len_resp['data']print('{} Q has {} Messages' .format (qname,q_len))if q_len :q_data_resp = self.xrange(qname, count = batch_num, redis_agent = redis_agent, connection_hash = connection_hash)q_data = pd.DataFrame(q_data_resp['data'])# q_data = pd.DataFrame(cur_lq.xrange(qname, count=batch_num))msg_id_list = list(q_data['_msg_id'])# 去重q_data1 = q_data.drop_duplicates([keyname], keep='last')# 脱掉自动变量:在这种情况,不需要声明其他参数(通道是默认的_ch001)q_data2 = self.filter_and_add_msg(q_data1)q_listofdict = q_data2.to_dict(orient='records')q_db_resp = cur_w.insert_or_update_with_key(tier1 = tier1, tier2 = tier2, data_listofdict = q_listofdict, key_name=keyname)# 完成后删除消息del_cnt = self.xdel(qname, mid_or_list = msg_id_list)msg = 'ok,deliver(del) {} messages from {}' .format(del_cnt['data'] ,qname)else:msg ='no data source {}' .format(qname)tick2 = time.time()duration = round(tick2 -tick1,2)log_dict = {'sniffer': sniffer_name,'duration':duration,'msg': msg }return cur_w.insert_recs(tier1=log_tier1, tier2=log_tier2, data_listofdict =[log_dict])
要修改的部分其实不多,增加一个toDoubleVarList,将对应的变量做一个转换即可。使用python对象的继承方法,将这个对象
class StreamsIO_X(StreamsIO):# Pattern: 从队列获取数据,存储到Mongo,然后删除。适用于新增数据。 Stream到Mongo【队列Write相关-删除消息】def S2M(self, stream_name,keyname =None, tier1 = None, tier2 = None,w = None, batch_num = None,sniffer_name = None,log_tier1 = None, log_tier2 = None,time_out = None,redis_agent = None,connection_hash =None,is_return_msg_id_list=False, toDoubleVarList = [] ):...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)
app03_PullToStep1MongoOut.py
...
sio = StreamsIO_X('sio', w = cur_w ,redis_agent = redis_agent_host)
toDoubleVarList = ['open', 'close', 'high', 'low', 'vol', 'amt','data_slot']sio.S2M(work_out_stream,keyname= keyname, tier1= tier1,tier2=tier2,w =cur_w, batch_num =batch_num,sniffer_name=sniffer_name,log_tier2 =log_tier2, redis_agent = redis_agent_host,toDoubleVarList=toDoubleVarList)
同理,也可以修改app01_PullToStep1MongoIn.py
(当时app01并没有直接使用S2M方法)
...# - toDoubleVarListif len(toDoubleVarList):for _var in toDoubleVarList:q_data2[_var] = q_data2[_var].apply(float)
如果一开始不小心将mongo的变量格式搞错了,那么可以这么修改
import pymongo
lmongo=pymongo.MongoClient(host='172.17.0.1',port=111,username='aaa',password='bbb',authSource='admin',authMechanism='SCRAM-SHA-1')lmongo.list_database_names()# 声明数据库和集合
the_collection = lmongo['MyQuantBase']['step1_mongo_in']for varname in sorted(['amt','close','data_slot','high','low','open','vol']):command_dict = {'%s' % varname :{'$exists': True, '$type': 'string'}} command_list = [{'$set':{'%s' % varname : { '$toDouble': '$%s' % varname }}}]res = the_collection.update_many(command_dict, command_list)print(res.acknowledged)
上一篇:彻底搞懂promise