QuantData开始运行了,效果还是挺好的,每分钟都有了实时数据之后,应用就不远了。重要的关键节点通路已经都好了:
本篇就定位在第二步:基础特征加工利用已经进入系统的数据加工基础特征
数据的历史数据已经沉淀,无缺失日期,并且在以每分钟的频度更新新的数据。 处于严谨性,未来要考虑数据丢失等的问题,以及数据运算顺序的问题,但现在暂时忽略。
原来MyQuantBase还考虑了从离线数据取数,而现在已经确定,只要锚向ADBS:QuantData就可以了。
目前在QuantData中有两个数据源,一个是RQ,一个是AK,RQ的优先级高于AK。
所以现在需要进行修改,增加sniffer部分进行取数。
第一个问题产生了
某只标的,某个时刻的数据可能超过2条,这样在进行基础数据加工时容易出问题。例如,我要往前回看240分钟,在只有一个数据源的情况下,往前读取,只要限制240个数据就可以了。
而现在如果只选择240个数据,因为多个数据源的存在,实际拿到的有效数据会小于240个。
**究其根源,是因为交易不是按照自然时间进行的,所以自然时间轴失去了作用。**如果交易时间是特别规律的,那么我们通过固定的起始时隙和结束时隙就能知道应该选择哪些数据,选取记录条数的limit仅仅是为了防止过大的数据溢出。
在QuantData读取数据源数据时,在某个时间点,所有的交易时间是确定的,并且会随着时间推移逐渐变长,所以,我们可以称这个为交易时间轴。
需要的辅助:交易时间轴。
这个时间轴的量级有多长呢?过去十几年也就产生了不到一百万分钟,所以可以按照百万量级来估计时间轴序列的长度。
因为未来哪天会停止交易我们并不确定,所以没法(准确)预先计算未来的交易时间,但是,已发生的历史是可以被准确记录的,这个应该是一个通用的轴。
所以可以采取:Mongo - Redis - Pickle 联合的方式来完成高效的存储、同步和访问。
当程序第一次运行时(或者被强制时),发现自己没有本地交易时间轴,就会向Mongo(mymeta集群)发起一次请求,拉取完整的时间轴。拉取过程注意分批次(根据最大最小时间,划分批次获取)
再次运行时,去Redis读取增量时隙,以确保按照当前最新的时隙工作。所以这里要确保Redis的这个队列的时隙总是最新的。
QuantData在获取新数据时同步将交易时隙同步到交易时隙队列,以及元数据库。未来交易时间轴会在整个算网内保持同步。
既然确定了要进行改造,那么应该怎么改,在哪里改呢?
sniffer在读取新数据时,会多回看几个时隙的数据,这样会造成存储时有一定的冗余。从简单的开发方式考虑,直接使用Redis的键值存储和Mongo就可以了。
其实比较麻烦的是在Redis中开辟一个专用的变量名称来储存这个,不过这个问题已经很好的解决了。
当sniffer在获取新的时隙数据时,从Redis中获取这个变量,然后进行集合运算后放回,再加上ttl。当集合长度超过限制(例如1万),那么对集合进行排序,保留最新的一半,然后再变为集合重新存储。
然后,同时向Mongo存储最新的数据。Mongo是作为二级缓存的,访问频次会很低,但是访问时调用的数据量比较大。
以上两个操作,应该在1秒内可以完成。
结论:修改QuantData.sniffer01_query_akshare.py
, 增加标的的Redis缓存变量,增加元数据集群的连接(ttl=10秒);增加与Redis的操作部分;增加与元数据集群的交互。在元数据的meta下创建trade_slot_axis表,并创建对应的索引。以market、code和data_slot作为联合,创建主键。在获得数据后同时抽离时间信息,写到元数据集群。
先切回QuantData,然后确定Redis变量名称。严格来说,只要做到运行的主机变量不重复就行。
先按照规则,对想要使用的变量名做一下测试
redis_buff_var = {'redis_class':'BUFF','project':'QuantData','subproject':'step1_mongo_in','subapp':'sniffer01_query_akshare','var':'sh_510300_recent_trade_slot_set'}
req.post(redis_agent_host + 'redis_naming_test/',json = redis_buff_var).json(){'data': None,'msg': 'NOT Existed, <<<<<< BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set','status': True}
然后这个变量就固定下来使用了BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set
。
同时,还要在元数据集群建立索引。
序号 | 名字 | 解释 |
---|---|---|
1 | rec_id | 联合主键 market、code、data_slot |
2 | market | SH |
3 | code | 510300 |
4 | data_slot | 交易时隙 |
target_server = 'mymeta'
project_name ='meta'
gs_id = 'rec_id'
tier2 = 'trade_slot_axis'try:target_w = from_pickle('%s_w' % target_server)color_print('【Loading cur_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server)to_pickle(target_w, '%s_w' % target_server)# 保证基本索引target_w.ensure_mongo_index(tier1 =project_name, tier2 =tier2, key_index=gs_id)# 保证其他索引target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='market')
target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='code')
target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='data_slot')
最后在sniffer中增加一些逻辑和存储方法。
冷启动,先将已经存在的交易时隙全部提取出来,存到mymeta。这时候就能体现_task_rand
的价值了,直接进行千以内的循环就能遍历到全部数据。
声明源mongo和目标mongo,然后使用预埋的随机数将数据进行遍历。这样在执行冷启动任务时非常方便。
from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import *from configs_base import redis_agent_host,project_name,cur_wsource_w = cur_wtarget_server = 'mymeta'
project_name ='meta'
gs_id = 'rec_id'
tier2 = 'trade_slot_axis'try:target_w = from_pickle('%s_w' % target_server)color_print('【Loading cur_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server)to_pickle(target_w, '%s_w' % target_server)s_tier1 = 'QuantData'
s_tier2 = 'step1_mongo_out't_tier1 = 'meta'
t_tier2 = 'trade_slot_axis'market = 'SH'
code = '510300'# 使用随机数简单穷尽
import tqdm
# tem_task_rand = 0for tem_task_rand in tqdm.tqdm(list(range(1000))):source_recs = source_w.query_recs(tier1 = s_tier1, tier2 = s_tier2,filter_dict = {'market':market,'code':code, '_task_rand':tem_task_rand},keep_cols = ['market','code','data_slot'] ,limits=100000)['data']if len(source_recs):source_df = pd.DataFrame(source_recs)source_df['rec_id'] = source_df['market'] + '_' + source_df['code'] + '_' + \source_df['data_slot'].apply(int).apply(str)q_db_resp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = source_df.to_dict(orient='records'), key_name=gs_id)
看起来精确完成了
接下来要进行运行时的调整
# ---- 保存新的交易时隙new_axis_df = df1[['market','code','data_slot']]new_axis_df['rec_id'] = new_axis_df['market'] + '_' + new_axis_df['code'] + '_' + \new_axis_df['data_slot'].apply(int).apply(str)# 保存到mymetatime_axis_resp = time_axis_w.insert_or_update_with_key(tier1 = time_axis_tier1, tier2 = time_axis_tier2, data_listofdict = new_axis_df.to_dict(orient='records'), key_name=gs_id) # 保存到redisredis_var = sh_510300_time_axis_redis_varredis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']if redis_buff is None:print('TimeAxis Redis Buff None')redis_buff = list(new_axis_df['data_slot'])else:print('TimeAxis Redis Buff ', len(redis_buff))if len(redis_buff) <10000:new_slots = list(new_axis_df['data_slot'])redis_buff = sorted(list(set(redis_buff)|set(new_slots)))else:new_slots = list(new_axis_df['data_slot'])redis_buff = sorted(list(set(redis_buff[-1000:])|set(new_slots)))# 存入redis_buff_resp = req.post(redis_agent_host + 'setv/',json ={'k':redis_var, 'v':redis_buff,'ex_seconds':10000}).json()
做的时候突然想到集合不好序列化,所以用了列表,并且排序。
然后将QuantData关掉,同步代码后重启。
目前看起来,修改生效且按预期的设计在执行,现在终于回归本篇的正题:使用MyQuantBase去向QuantData读取数据,并加工基础特征。只不过这次的筛选要基于时间轴进行筛选。
从逻辑上,先通过当前数据时隙确定起止时隙位置,然后筛选并处理
在使用时基本是逆过程,使用本地的pickle文件。
存量时隙的读取,一般只会读取一次,之后就读取最新的更新
# 读取交易时隙
trade_time_axis_name = 'trade_time_axis_%s_%s' %(market, code)
try:# 直接读取本地文件trade_time_axis = from_pickle(trade_time_axis_name)
except:# 不存在本地文件,向集群读取time_axis_list = []min_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='min',attrname='data_slot')['data']min_slot -=1max_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='max',attrname='data_slot')['data']max_slot +=1slice_list = slice_list_by_batch1(int(min_slot), int(max_slot), 10000)for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1to_pickle(time_axis_list,trade_time_axis_name)
读取缓存以及差值同步
考虑到服务可能会经历短暂的停机,那么使用本地文件+缓存的方式就会产生差值。因此,在使用断点读写的方式,一定要确保连续。
目前可以基于的一个点是:无论是Mongo还是Redis数据,均是以有序的方式存储的。
在执行每一次的操作之前,向Mongo请求当前data_slot的最大值。如果最大值在Redis缓存的列表中,那么表示当前服务没有和缓存脱节,可以直接使用缓存数据。否则就像服务器发起增量读取,不理会缓存。
这里有一个设计上的矛盾点:使用Redis的本义是减少对Mongo零碎的访问,但是每次如果要向Mongo请求最大data_slot这个计算,又仍然耗费了Mongo的开销(虽然很小)。当前这个问题在很长一段时间不是问题,所以这里不打算解决。但是可以有一个新概念:长效缓存。
长效缓存是指(Redis)在内存中尽可能永久保存的元数据,例如本次的最大data_slot。程序在每次写mongo的时候更新这个值即可。这个缓存只有当失效时程序才会去mongo重新读取一次。原来的大列表不能用这种方式存储,必须遵守最小范围、最小时段占用的原则,那个更符合缓存本身的意义。
所以,看起来要用两级的缓存设计才能解决最初的目标:让Redis完全承担零散的Mongo请求。
嗯,我发现有个小误区,在维护时间轴的时候不需要考虑Mongo(除了第一次啥都没有的时候)。不过上面提的Redis两级缓存与Mongo搭配的说法是成立的。
速速写了一版时间轴的更新、维护
# 读取交易时隙
trade_time_axis_name = 'trade_time_axis_%s_%s' %(market, code)
try:# 直接读取本地文件trade_time_axis = from_pickle(trade_time_axis_name)
except:# 不存在本地文件,向集群读取time_axis_list = []min_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='min',attrname='data_slot')['data']min_slot -=1max_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='max',attrname='data_slot')['data']max_slot +=1slice_list = slice_list_by_batch1(int(min_slot), int(max_slot), 10000)for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1to_pickle(time_axis_list,trade_time_axis_name)trade_time_axis = time_axis_list# 【不会频繁访问mongo】加载最新的时隙数据 --- Redis
sh_510300_time_axis_redis_var ='BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set'
redis_var = sh_510300_time_axis_redis_varredis_agent_host = 'http://172.17.0.1:24021/'
redis_agent = redis_agent_host
redis_connection_hash =None
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data'] or []# 合并 ==>
already_max_slot = trade_time_axis[-1]# 当前的最大时隙位于缓存中,服务器未脱节
if already_max_slot in redis_buff:print('>>> On Tracking , Read From Buffer')new_trade_time_axis = sorted_set_merge_list(trade_time_axis,redis_buff)
else:print('>>> Off Line , Read From Mongo')cur_slot = ts2ord()slice_list = slice_list_by_batch1(int(already_max_slot), int(cur_slot) +1 , 10000)time_axis_list = []for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1new_trade_time_axis = sorted_set_merge_list(trade_time_axis,time_axis_list)# 将最新的时间轴保存在本地
to_pickle(new_trade_time_axis, trade_time_axis_name)# ===================== 时间轴 END - Worker就不用再去管时间轴了
这章写太多了,下次再续吧…
下一篇: 观察小白菜的日记