Python 算法交易实验57 ADBS:QuantData到MyQuantBase
创始人
2025-05-30 11:11:33
0

说明

QuantData开始运行了,效果还是挺好的,每分钟都有了实时数据之后,应用就不远了。重要的关键节点通路已经都好了:

  • 1 数据系统获得实时数据(分钟)
  • 2 对实时数据进行必要(通用)的加工(Step1 ~ MyQuantBase)
  • 3 基于基础数据加工出信号(Step2 ~ MyQuantBaseStep2Signals)
  • 4 基于信号(以及特征)作出决策(Step3 ~ MyQuantBaseStep3Decisions)
  • 5 基于决策进行交易,发出交易指令(Step4 ~ MyQuantBaseStep4Trade)
    • 手工操作:系统会发出短信提示,收到提示后登录软件操作
      • 交易成功后,在portal中记录实际交易结果
    • 自动交易:未来条件成熟时,决策将通过微引擎发出交易指令,自动记录交易结果
  • 6 在Portal中观察交易的结果,利润,回撤等指标,以图表的方式直观的展示结果。观察者可以依据规则终止某些策略的生产状态。策略一般不会删除,会持续的运行,以提供更多的参考依据。未来,这部分(Watch)会演变为算法自动管理,而实际的观察者将会在更高的层级上观察。(Step5 ~ MyQuantBaseStep5Watch)

内容

本篇就定位在第二步:基础特征加工利用已经进入系统的数据加工基础特征

1 基础数据

数据的历史数据已经沉淀,无缺失日期,并且在以每分钟的频度更新新的数据。 处于严谨性,未来要考虑数据丢失等的问题,以及数据运算顺序的问题,但现在暂时忽略。

原来MyQuantBase还考虑了从离线数据取数,而现在已经确定,只要锚向ADBS:QuantData就可以了。

目前在QuantData中有两个数据源,一个是RQ,一个是AK,RQ的优先级高于AK。

所以现在需要进行修改,增加sniffer部分进行取数。

2 行动

  • 1 将MyQuantBase现有的数据清空(Clear Collection)
  • 2 停止MyQuantBase服务,重新进入开发

2.1 多个数据源问题

第一个问题产生了

某只标的,某个时刻的数据可能超过2条,这样在进行基础数据加工时容易出问题。例如,我要往前回看240分钟,在只有一个数据源的情况下,往前读取,只要限制240个数据就可以了。

而现在如果只选择240个数据,因为多个数据源的存在,实际拿到的有效数据会小于240个。

**究其根源,是因为交易不是按照自然时间进行的,所以自然时间轴失去了作用。**如果交易时间是特别规律的,那么我们通过固定的起始时隙和结束时隙就能知道应该选择哪些数据,选取记录条数的limit仅仅是为了防止过大的数据溢出。

在QuantData读取数据源数据时,在某个时间点,所有的交易时间是确定的,并且会随着时间推移逐渐变长,所以,我们可以称这个为交易时间轴。

需要的辅助:交易时间轴。

这个时间轴的量级有多长呢?过去十几年也就产生了不到一百万分钟,所以可以按照百万量级来估计时间轴序列的长度。

因为未来哪天会停止交易我们并不确定,所以没法(准确)预先计算未来的交易时间,但是,已发生的历史是可以被准确记录的,这个应该是一个通用的轴

所以可以采取:Mongo - Redis - Pickle 联合的方式来完成高效的存储、同步和访问。

  • Mongo 存储最完整的时间轴,访问量不高
  • Redis 缓存了最近一段时间的轴,例如最近1万个交易时隙(可以做成队列)
  • Pickle 则缓存了每个应用自己保存的交易时隙

当程序第一次运行时(或者被强制时),发现自己没有本地交易时间轴,就会向Mongo(mymeta集群)发起一次请求,拉取完整的时间轴。拉取过程注意分批次(根据最大最小时间,划分批次获取)

再次运行时,去Redis读取增量时隙,以确保按照当前最新的时隙工作。所以这里要确保Redis的这个队列的时隙总是最新的。

QuantData在获取新数据时同步将交易时隙同步到交易时隙队列,以及元数据库。未来交易时间轴会在整个算网内保持同步。

既然确定了要进行改造,那么应该怎么改,在哪里改呢?

  • Rule1: 每个标的维持自己的交易时间轴(不同标的是不同的)
  • Rule2: 由读取数据的sniffer来维持数据轴的更新

sniffer在读取新数据时,会多回看几个时隙的数据,这样会造成存储时有一定的冗余。从简单的开发方式考虑,直接使用Redis的键值存储和Mongo就可以了。

其实比较麻烦的是在Redis中开辟一个专用的变量名称来储存这个,不过这个问题已经很好的解决了。

当sniffer在获取新的时隙数据时,从Redis中获取这个变量,然后进行集合运算后放回,再加上ttl。当集合长度超过限制(例如1万),那么对集合进行排序,保留最新的一半,然后再变为集合重新存储。

然后,同时向Mongo存储最新的数据。Mongo是作为二级缓存的,访问频次会很低,但是访问时调用的数据量比较大。

以上两个操作,应该在1秒内可以完成。

结论:修改QuantData.sniffer01_query_akshare.py, 增加标的的Redis缓存变量,增加元数据集群的连接(ttl=10秒);增加与Redis的操作部分;增加与元数据集群的交互。在元数据的meta下创建trade_slot_axis表,并创建对应的索引。以market、code和data_slot作为联合,创建主键。在获得数据后同时抽离时间信息,写到元数据集群。

2.2 前置问题解决

先切回QuantData,然后确定Redis变量名称。严格来说,只要做到运行的主机变量不重复就行。

在这里插入图片描述
先按照规则,对想要使用的变量名做一下测试

redis_buff_var = {'redis_class':'BUFF','project':'QuantData','subproject':'step1_mongo_in','subapp':'sniffer01_query_akshare','var':'sh_510300_recent_trade_slot_set'}
req.post(redis_agent_host + 'redis_naming_test/',json =  redis_buff_var).json(){'data': None,'msg': 'NOT Existed,  <<<<<<  BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set','status': True}

然后这个变量就固定下来使用了BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set

同时,还要在元数据集群建立索引。

序号名字解释
1rec_id联合主键 market、code、data_slot
2marketSH
3code510300
4data_slot交易时隙
target_server = 'mymeta'
project_name ='meta'
gs_id = 'rec_id'
tier2 = 'trade_slot_axis'try:target_w = from_pickle('%s_w' % target_server)color_print('【Loading cur_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server)to_pickle(target_w, '%s_w' % target_server)# 保证基本索引target_w.ensure_mongo_index(tier1 =project_name, tier2 =tier2, key_index=gs_id)# 保证其他索引target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='market')
target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='code')
target_w.set_a_index(tier1 =project_name, tier2 =tier2, idx_var='data_slot')

最后在sniffer中增加一些逻辑和存储方法。

冷启动,先将已经存在的交易时隙全部提取出来,存到mymeta。这时候就能体现_task_rand的价值了,直接进行千以内的循环就能遍历到全部数据。

声明源mongo和目标mongo,然后使用预埋的随机数将数据进行遍历。这样在执行冷启动任务时非常方便。

from funcs_apifunc_database_model1_6810f9d37e89e5e1f33e1b8f4defa22e import *from configs_base import redis_agent_host,project_name,cur_wsource_w = cur_wtarget_server = 'mymeta'
project_name ='meta'
gs_id = 'rec_id'
tier2 = 'trade_slot_axis'try:target_w = from_pickle('%s_w' % target_server)color_print('【Loading cur_w】from pickle')
except:w = WMongo('w')target_w = w.TryConnectionOnceAndForever(server_name =target_server)to_pickle(target_w, '%s_w' % target_server)s_tier1 = 'QuantData'
s_tier2 = 'step1_mongo_out't_tier1 = 'meta'
t_tier2 = 'trade_slot_axis'market = 'SH'
code = '510300'# 使用随机数简单穷尽
import tqdm 
# tem_task_rand = 0for tem_task_rand in tqdm.tqdm(list(range(1000))):source_recs = source_w.query_recs(tier1 = s_tier1, tier2 = s_tier2,filter_dict = {'market':market,'code':code, '_task_rand':tem_task_rand},keep_cols = ['market','code','data_slot'] ,limits=100000)['data']if len(source_recs):source_df = pd.DataFrame(source_recs)source_df['rec_id'] = source_df['market'] + '_' + source_df['code'] + '_' + \source_df['data_slot'].apply(int).apply(str)q_db_resp = target_w.insert_or_update_with_key(tier1 = t_tier1, tier2 = t_tier2, data_listofdict = source_df.to_dict(orient='records'), key_name=gs_id)

看起来精确完成了
在这里插入图片描述
接下来要进行运行时的调整

  • 1 Sniffer运行时会触碰到最新的数据
  • 2 Sniffer会读取约定的redis缓存变量,此时有三种情况
    • 1 不存在:那么直接按约定创建新集合存入
    • 2 存在:
      • 长度小于1万:直接将新的集合合并,保存
      • 长度大于1万:将集合转为队列,保留较新的1000个,然后合并新的集合保存
            # ---- 保存新的交易时隙new_axis_df = df1[['market','code','data_slot']]new_axis_df['rec_id'] = new_axis_df['market'] + '_' + new_axis_df['code'] + '_' + \new_axis_df['data_slot'].apply(int).apply(str)# 保存到mymetatime_axis_resp = time_axis_w.insert_or_update_with_key(tier1 = time_axis_tier1, tier2 = time_axis_tier2, data_listofdict = new_axis_df.to_dict(orient='records'), key_name=gs_id)            # 保存到redisredis_var = sh_510300_time_axis_redis_varredis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data']if redis_buff is None:print('TimeAxis Redis Buff None')redis_buff = list(new_axis_df['data_slot'])else:print('TimeAxis Redis Buff ', len(redis_buff))if len(redis_buff) <10000:new_slots = list(new_axis_df['data_slot'])redis_buff = sorted(list(set(redis_buff)|set(new_slots)))else:new_slots = list(new_axis_df['data_slot'])redis_buff = sorted(list(set(redis_buff[-1000:])|set(new_slots)))# 存入redis_buff_resp  = req.post(redis_agent_host + 'setv/',json ={'k':redis_var, 'v':redis_buff,'ex_seconds':10000}).json()

做的时候突然想到集合不好序列化,所以用了列表,并且排序。

然后将QuantData关掉,同步代码后重启。

2.2 基于交易时间轴加工特征

目前看起来,修改生效且按预期的设计在执行,现在终于回归本篇的正题:使用MyQuantBase去向QuantData读取数据,并加工基础特征。只不过这次的筛选要基于时间轴进行筛选。

从逻辑上,先通过当前数据时隙确定起止时隙位置,然后筛选并处理

在使用时基本是逆过程,使用本地的pickle文件。

  • 1 当前时隙最多比历史时隙多1(最新工作时隙)
  • 2 确定当前时隙的索引的位置(终点,不含),基于索引-周期的方式找到起点。

存量时隙的读取,一般只会读取一次,之后就读取最新的更新

# 读取交易时隙
trade_time_axis_name = 'trade_time_axis_%s_%s' %(market, code)
try:# 直接读取本地文件trade_time_axis = from_pickle(trade_time_axis_name)
except:# 不存在本地文件,向集群读取time_axis_list = []min_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='min',attrname='data_slot')['data']min_slot -=1max_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='max',attrname='data_slot')['data']max_slot +=1slice_list = slice_list_by_batch1(int(min_slot), int(max_slot), 10000)for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1to_pickle(time_axis_list,trade_time_axis_name)

读取缓存以及差值同步

考虑到服务可能会经历短暂的停机,那么使用本地文件+缓存的方式就会产生差值。因此,在使用断点读写的方式,一定要确保连续。

目前可以基于的一个点是:无论是Mongo还是Redis数据,均是以有序的方式存储的。

在执行每一次的操作之前,向Mongo请求当前data_slot的最大值。如果最大值在Redis缓存的列表中,那么表示当前服务没有和缓存脱节,可以直接使用缓存数据。否则就像服务器发起增量读取,不理会缓存。

这里有一个设计上的矛盾点:使用Redis的本义是减少对Mongo零碎的访问,但是每次如果要向Mongo请求最大data_slot这个计算,又仍然耗费了Mongo的开销(虽然很小)。当前这个问题在很长一段时间不是问题,所以这里不打算解决。但是可以有一个新概念:长效缓存

长效缓存是指(Redis)在内存中尽可能永久保存的元数据,例如本次的最大data_slot。程序在每次写mongo的时候更新这个值即可。这个缓存只有当失效时程序才会去mongo重新读取一次。原来的大列表不能用这种方式存储,必须遵守最小范围、最小时段占用的原则,那个更符合缓存本身的意义。

所以,看起来要用两级的缓存设计才能解决最初的目标:让Redis完全承担零散的Mongo请求。


嗯,我发现有个小误区,在维护时间轴的时候不需要考虑Mongo(除了第一次啥都没有的时候)。不过上面提的Redis两级缓存与Mongo搭配的说法是成立的。

速速写了一版时间轴的更新、维护

# 读取交易时隙
trade_time_axis_name = 'trade_time_axis_%s_%s' %(market, code)
try:# 直接读取本地文件trade_time_axis = from_pickle(trade_time_axis_name)
except:# 不存在本地文件,向集群读取time_axis_list = []min_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='min',attrname='data_slot')['data']min_slot -=1max_slot = mymeta_w.minmax(tier1 = m_tier1, tier2 =m_tier2,query_dict = {'market':market,'code':code},minmax='max',attrname='data_slot')['data']max_slot +=1slice_list = slice_list_by_batch1(int(min_slot), int(max_slot), 10000)for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1to_pickle(time_axis_list,trade_time_axis_name)trade_time_axis = time_axis_list# 【不会频繁访问mongo】加载最新的时隙数据 --- Redis 
sh_510300_time_axis_redis_var ='BUFF.QuantData.step1_mongo_in.pf.app.sniffer01_query_akshare.af.gp.0.uf.sh_510300_recent_trade_slot_set'
redis_var = sh_510300_time_axis_redis_varredis_agent_host = 'http://172.17.0.1:24021/'
redis_agent = redis_agent_host
redis_connection_hash =None
redis_buff = req.post(redis_agent_host + 'getv/',json ={'k':redis_var}).json()['data'] or []# 合并 ==>
already_max_slot = trade_time_axis[-1]# 当前的最大时隙位于缓存中,服务器未脱节
if already_max_slot in redis_buff:print('>>> On Tracking , Read From Buffer')new_trade_time_axis = sorted_set_merge_list(trade_time_axis,redis_buff)
else:print('>>> Off Line , Read From Mongo')cur_slot = ts2ord()slice_list = slice_list_by_batch1(int(already_max_slot), int(cur_slot) +1 , 10000)time_axis_list = []for tem_slice in tqdm.tqdm(slice_list):tem_list = mymeta_w.query_recs(tier1 = m_tier1, tier2 =m_tier2, filter_dict = {'market':market,'code':code,'data_slot':{'$gte':tem_slice[0],'$lt':tem_slice[1]}},keep_cols=['data_slot'],limits=100000, silent=True)['data']if len(tem_list):tem_list1 = list(pd.DataFrame(tem_list)['data_slot'].apply(int))time_axis_list = time_axis_list +tem_list1new_trade_time_axis = sorted_set_merge_list(trade_time_axis,time_axis_list)# 将最新的时间轴保存在本地
to_pickle(new_trade_time_axis, trade_time_axis_name)# ===================== 时间轴 END - Worker就不用再去管时间轴了

修改Worker

这章写太多了,下次再续吧…

相关内容

热门资讯

中学德育工作计划 中学德育工作计划(通用10篇)  时间一晃而过,我们又将续写新的诗篇,展开新的旅程,现在就让我们好好...
班长个人工作计划 班长个人工作计划范文(精选3篇)  光阴如水,我们又将迎来新一轮的努力,为此需要好好地写一份工作计划...
办公室年度工作总结及工作计划 办公室年度工作总结及工作计划办公室年度工作总结及工作计划1  20xx年是公司各项工作都步入新阶段的...
LeetCode143. 重排... 题目 LeetCode143. 重排链表 给定一个单链表 L 的头节点 head ,单...
union大小端模式 在c中,联合体(共用体)的数据成员都是从低地址开始存放。 ...
Javascript学习笔记—... 文章目录数据类型通过控制台颜色判断数据类型数字类型特殊数据类型字符串转义字符字符串的不可变性不同数据...
5自由度串联机械臂实现搬运物品... 1. 功能描述     本文提供的示例所实现的功能为:实现5自由度串联机械臂搬运物品的...
Application使用内核... 环境 $ cat /etc/os-release PRETTY_NAME="Ubuntu ...
监督年度工作计划 监督年度工作计划汇总10篇  时间的脚步是无声的,它在不经意间流逝,我们又将迎来新的喜悦、新的收获,...
平安建设工作计划 精选平安建设工作计划(通用9篇)  时间过得可真快,从来都不等人,成绩已属于过去,新一轮的工作即将来...
小学美术教学工作计划 小学美术教学工作计划兴隆小学美术第四册教学工作计划兴隆小学:曾素梅又一期来临,我们将继续以课堂教学研...
C++构造函数详解 在C++中,有一种特殊的成员函数,它的名字和类名相同&#...
windows安装jenkin... 官网 安装jdk环境 需要jdk11或者jdk17 下载jdk17 下载解压设置环境变量 JAV...
远程教育工作计划 精选远程教育工作计划范文集合5篇  时间就如同白驹过隙般的流逝,成绩已属于过去,新一轮的工作即将来临...
秋季初中德育工作计划 秋季初中德育工作计划(精选3篇)  日子在弹指一挥间就毫无声息的流逝,迎接我们的将是新的生活,新的挑...
驻村工作计划 驻村工作计划集合六篇  时光在流逝,从不停歇,我们的工作又进入新的阶段,为了今后更好的工作发展,该为...
MatBox—基于PyQt快速... MatBox—基于PyQt快速入门matplotlib的教程库 __ __ ...
公司职员个人年度工作计划 公司职员个人年度工作计划  日子如同白驹过隙,迎接我们的将是新的生活,新的挑战,写一份工作计划,为接...
XICE-HUAWEI-超级完... XICE-HUAWEI-超级完整的BGP-2 上一章的总结 BGP-大型路由协议,即可...
小学教育教学工作计划 小学教育教学工作计划范文(精选3篇)  时间流逝得如此之快,我们又将奔赴下一阶段的教学,该好好计划一...