消息队列是thingsboard支持集群的基础,比如使用Kafka可实现消息在整个服务集群中共同处理,提高性能。如果是内存队列,则无法再服务实例间共享消息。
在module【common/cluster-api】的org.thingsboard.server.queue包定义了消息队列,其中接口
TbQueueProducer定义了生产者,实现类有:
接口TbQueueConsumer定义了消费者,实现类有:
可以看到除了抽象类外,生产者和消费者的具体实现时一一对应的,应用在启用根据配置
queue.type确定哪种实现,默认是内存
在module【common】的package【transport】中实现了各种不同的传输协议,无论哪种协议当接收到遥测数据时,都会通过抽象父类【DefaultTransportService】的process方法处理:
其中resolve tpi的方法,通过TB_RULE_ENGINT解析出topic是 tb_rule_engine
当配置consumer-per-partition=true时,则开启了分区模式,分区可均分给消费者,实现集群处理;如果为开启分区模式,则一个队列就是一个队列,每个服务都完全消费自己的队列。
分区数通过partitions参数配置,默认是10
消费者服务类【DefaultTbRuleEngineConsumerService】,其init方法是PostContruct方法
根据消息队列的配置加载所有队列的所有分区。
默认有Main队列,因此会加载所有队列的消费者并存入consumers列表;
而DefaultTbRuleEngineConsumerService的抽象父类【AbstractConsumerService】中定义了应用启动完成事件【ApplicationReadyEvent】监听方法,在应用就绪后被执行
因此服务启动后会自动执行DefaultTbRuleEngineConsumerService的lauchMainConsumers方法:
在线程池中异步执行consumerLoop方法
consumerLoop方法中consumer始终循环拉取数据,然后再消费,当consumer stop时会结束循环。
TB可使用Zookeeper实现服务发现,默认情况下ZK是未开启的
当不开启zk时,是无法感知其他服务实例的,此时每个服务都会消费全部分区;
当开启zk后,能感知其他服务实例,才会真正的实现独立处理。
注意当开启zk后,就不能再使用内存型消息队列了,否则会导致消息虽然发送到了(内存)队列的某个分区,但是该分区应该被其他服务实例消费,因此该消息将永远不会被消费。
实现类是ZkDiscoveryService:
服务启动
服务启动完成后,想zk注册当前服务,使得其他服务实例可以发现新增服务实例事件:
均匀分配分区
通过取余将队列均匀的分配给所有服务节点;
例如全部服务节点有2个,节点ID分别是:s0,s1
当判断p1由谁消费时,1%2=1,则由s1消费
当判断p4由谁消费时,4%2=0,则由s0消费
新增服务实例事件处理
ZkDiscoveryService中定义了当收到zk child path变化事件时则回调recalculatePartitions方法进行重新分区:
先debug模式启动一个服务实例A,然后再另外启动一个服务B,服务A收到新服务启动的事件,接下来debug看一下分区是如何重新分配的,首先可以看到oldPartions中每个队列下的value都是10个,即10个分区:
而经过重新分配后,还属于当前服务消费的分区都变成了5个:
然后发送分区变化事件到消费者服务:
消费者服务(DefaultTbRuleEngineConsumerService)收到事件:
将要所有需要消费的分区,存储到临时队列subscribeQueue中
通过将当前消费分区和有权消费分区比较,计算需要增加消费的分区:
当前消费分区是10个,而重新分配后有权消费的分区减半,只剩5个,因此不需要增加,所以计算后addedPartitions是空的:
通过将当前消费分区和有权消费分区比较,计算需要移除消费的分区,由于现在有权消费的只剩5个分区(另外5个被其他服务实例消费),因此计算后removedPartitions中有5个分区
然后对这个5个要移除消费的分区执行removeConsumerForTopicByTpi
因为消费者poll是在一个循环中不断执行的,当设置consumer stop时,则退出循环。
设备连接也是基于消息队列实现异步的,详情参考《设备连接debug解析 》
下一篇:含“举”的成语有哪些