ThingsBoard源码解析-消息队列
创始人
2024-02-16 09:50:44
0

概述

消息队列是thingsboard支持集群的基础,比如使用Kafka可实现消息在整个服务集群中共同处理,提高性能。如果是内存队列,则无法再服务实例间共享消息。

定义

在module【common/cluster-api】的org.thingsboard.server.queue包定义了消息队列,其中接口

TbQueueProducer定义了生产者,实现类有:

接口TbQueueConsumer定义了消费者,实现类有:

可以看到除了抽象类外,生产者和消费者的具体实现时一一对应的,应用在启用根据配置

queue.type确定哪种实现,默认是内存

使用

生产者

在module【common】的package【transport】中实现了各种不同的传输协议,无论哪种协议当接收到遥测数据时,都会通过抽象父类【DefaultTransportService】的process方法处理:

其中resolve tpi的方法,通过TB_RULE_ENGINT解析出topic是 tb_rule_engine

消费者

当配置consumer-per-partition=true时,则开启了分区模式,分区可均分给消费者,实现集群处理;如果为开启分区模式,则一个队列就是一个队列,每个服务都完全消费自己的队列。

分区数通过partitions参数配置,默认是10

无分区模式

消费者服务类【DefaultTbRuleEngineConsumerService】,其init方法是PostContruct方法

根据消息队列的配置加载所有队列的所有分区。

默认有Main队列,因此会加载所有队列的消费者并存入consumers列表;

而DefaultTbRuleEngineConsumerService的抽象父类【AbstractConsumerService】中定义了应用启动完成事件【ApplicationReadyEvent】监听方法,在应用就绪后被执行

因此服务启动后会自动执行DefaultTbRuleEngineConsumerService的lauchMainConsumers方法:

在线程池中异步执行consumerLoop方法

consumerLoop方法中consumer始终循环拉取数据,然后再消费,当consumer stop时会结束循环。

分区模式(默认)

TB可使用Zookeeper实现服务发现,默认情况下ZK是未开启的

当不开启zk时,是无法感知其他服务实例的,此时每个服务都会消费全部分区;

当开启zk后,能感知其他服务实例,才会真正的实现独立处理。

注意当开启zk后,就不能再使用内存型消息队列了,否则会导致消息虽然发送到了(内存)队列的某个分区,但是该分区应该被其他服务实例消费,因此该消息将永远不会被消费。

实现类是ZkDiscoveryService:

服务启动

服务启动完成后,想zk注册当前服务,使得其他服务实例可以发现新增服务实例事件:

均匀分配分区

通过取余将队列均匀的分配给所有服务节点;

例如全部服务节点有2个,节点ID分别是:s0,s1

当判断p1由谁消费时,1%2=1,则由s1消费

当判断p4由谁消费时,4%2=0,则由s0消费

新增服务实例事件处理

ZkDiscoveryService中定义了当收到zk child path变化事件时则回调recalculatePartitions方法进行重新分区:

先debug模式启动一个服务实例A,然后再另外启动一个服务B,服务A收到新服务启动的事件,接下来debug看一下分区是如何重新分配的,首先可以看到oldPartions中每个队列下的value都是10个,即10个分区:

而经过重新分配后,还属于当前服务消费的分区都变成了5个:

然后发送分区变化事件到消费者服务:

消费者服务(DefaultTbRuleEngineConsumerService)收到事件:

将要所有需要消费的分区,存储到临时队列subscribeQueue中

通过将当前消费分区有权消费分区比较,计算需要增加消费的分区:

当前消费分区是10个,而重新分配后有权消费的分区减半,只剩5个,因此不需要增加,所以计算后addedPartitions是空的:

通过将当前消费分区和有权消费分区比较,计算需要移除消费的分区,由于现在有权消费的只剩5个分区(另外5个被其他服务实例消费),因此计算后removedPartitions中有5个分区

然后对这个5个要移除消费的分区执行removeConsumerForTopicByTpi

因为消费者poll是在一个循环中不断执行的,当设置consumer stop时,则退出循环。

API请求异步处理

设备连接也是基于消息队列实现异步的,详情参考《设备连接debug解析 》

 

 

相关内容

热门资讯

进入高三励志标语 进入高三励志标语(精选180句)  在日常的学习、工作、生活中,大家最不陌生的就是标语了吧,标语在一...
大学毕业班级鉴定评语 大学毕业班级鉴定评语(精选120条)  无论是身处学校还是步入社会,大家都有写评语的经历,对评语很是...
中考横幅标语大全   1.春播秋收近十载,一朝收获终有成。  2.不论你在什么时候开始,重要的是开始之后不要停止。  ...
学校六一活动标语 学校六一活动标语(精选50句)  在现实生活或工作学习中,大家对标语都再熟悉不过了吧,标语以其时间性...
家装实木高档油漆门广告语 家装实木高档油漆门广告语  1、一“门”心思只为您。  2、打开木门,开启幸福。  3、独门工艺,专...
厕所文明标语 厕所文明标语(通用185句)  在现实生活或工作学习中,大家都对那些朗朗上口的标语很是熟悉吧,标语是...
宣传稿格式 宣传稿格式  一、宣传稿的介绍  首先,新闻宣传稿要重视宣传的本身意义和作用。  其次,要懂得如何在...
校园文明标语 校园文明标语  在学习、工作、生活中,大家都接触过比较经典的标语吧,标语的作用是便于“造势”,形成一...
打折促销标语 打折促销标语大全  XX真情回馈理惠X折  终极X小时终极抄底价  心动,不如行动!  不做不休用暴...
最新学习小组口号 最新学习小组口号  在日常的学习、工作、生活中,大家都经常接触到口号吧,口号作为意识的表现形式之一,...
垃圾分类标语 垃圾分类标语(精选140句)  在平时的学习、工作或生活中,大家最不陌生的就是宣传语了吧,宣传语具有...
安全生产横幅标语口号 安全生产横幅标语口号大全  ●安全人人抓,幸福千万家  ●安全生产 人人有责  ●安全生产 重在预防...
迎新的标语 迎新的标语(精选150句)  在日常学习、工作和生活中,大家都接触过比较经典的标语吧,标语不但折射着...
2022年村委会普法宣传简报 2022年村委会普法宣传简报  一、村委会相关介绍  村委会的全称为村民委员会,为中国大陆地区乡(镇...
广告公司广告语 广告公司广告语大全  广告公司广告语大全广告语指通过各种传播媒体和招贴形式向公众介绍商品、文化、娱乐...
经典安全标语 经典安全标语大全  导语:安全是宝藏,安全是生命,安全是金钱。下面是由小编为你整理的经典安全标语大全...
奶茶广告语 奶茶广告语大全  广告,顾名思义,就是广而告之,向社会广大公众告知某件事物。那么奶茶店的广告又该是怎...
2班军训押韵口号_口号 2班军训押韵口号_口号  无论在学习、工作或是生活中,说到口号,大家肯定都不陌生吧,口号是供口头呼喊...
煤矿安全生产的标语 煤矿安全生产的标语(通用255句)  标语是指:文字简练、意义鲜明的宣传、鼓动口号。标语是用简短文字...
光盘行动标语 关于光盘行动标语(精选100句)  在平凡的学习、工作、生活中,许多人对一些广为流传的标语都不陌生吧...