Kafka消费者 TCP管理
创始人
2024-05-31 06:52:59
0

Kafka消费者 TCP管理

  • 创建 TCP
    • FindCoordinator
    • 连接协调者
    • 消费数据
  • TCP 连接数
  • 关闭 TCP 连接

消费者的程序入口类是 KafkaConsumer

  • 构建 KafkaConsumer 时 ,不会创建任何 TCP 连接
  • TCP 连接是用 KafkaConsumer.poll 创建

创建 TCP

poll 创建 TCP 的地方 :

  • 发起 FindCoordinator 请求时
  • 连接协调者时
  • 消费数据时

FindCoordinator

协调者 (Coordinator) : 驻留在 Broker 的内存中

  • 负责消费组的组成员管理和各个消费者的位移提交管理
  • 当消费者首次用 poll 时,发送 FindCoordinator 请求到任意个Broker (负载最小) 发送请求,并告知 Broker 的协调者

负载评估 : 消费者连接所有 Broker 中,待发送请求最少

连接协调者

Broker 处理完 FindCoordinator 请求后,会返回 Broker 的协调者

  • 消费者知道协调者后,就对该 Broker 进行 Socket 连接
  • 成功连接协调者后,就能组协调操作,如 : 加入组、等待组分配方案、心跳请求处理、位移获取、位移提交

消费数据

消费者给每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP

  • 例子 : 消费者要消费 5 个分区的数据,这 5 个分区的领导者副本分布在 4 台 Broker 上,那消费者在消费时 ,会与这 4 台 Broker 的创建 Socket 连接

TCP 连接数

消费者创建 3 类 TCP 连接:

  • 确定协调者和获取集群元数据
  • 连接协调者,令其执行组成员管理操作
  • 执行实际的消息获取

Kafka 日志:

# 消费者程序创建的第一个 TCP 连接,用于发送 FindCoordinator 请求
# 消费者创建第一个连接,它连接的 Broker 节点的 ID 是 -1 :
# 消费者不知道 Kafka Broker 的任何信息
[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者复用上次创建 Socket 连接
# 向 Kafka 发送元数据请求,获取整个集群的信息
[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)# 消费者开始发送 FindCoordinator 请求里的 Broker
# 即 localhost:9092,nodeId = -1
[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)# 消费者成功协调者的 Broker 信息(node_id = 2) 后,
# 消费者就知道协调者 Broker 的连接信息
[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)# 发第二个 Socket 连接,TCP连接 localhost:9094
# 只有连接协调者后,消费者才能开启消费组的各种功能
[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者要创建新 TCP 连接,用于实际的消息获取
# 消费分区的领导者副本在哪台 Broker,消费者连接那个 Broker
# 消费者创建 3 个 TCP 连接: 
# 	localhost:9092,localhost:9093 和 localhost:9094
[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)

ID = -1 原因 :

  • 消费者程序(其实也不光是消费者,生产者也是这样的机制)首次启动时,对 Kafka 集群一无所知,因此用 -1 来表示尚未获取到 Broker 数据

ID = 2147483645 原因 :

  • Integer.MAX_VALUE - 协调者的 Broker ID
  • 协调者 ID 是 2,Socket 连接节点 ID = Integer.MAX_VALUE - 2 = 2147483645
  • 这种节点 ID 目的 : 让组协调请求和真正的数据获取请求使用不同的 Socket 连接

关闭 TCP 连接

消费者关闭 Socket :

  • 主动关闭 : 调用 KafkaConsumer.close() ,或执行 kill
  • Kafka 自动关闭 : 由 connection.max.idle.ms 控制 (默认值: 9 分钟),当某个 Socket 连续 9 分钟都没有任何请求,消费者杀掉该 Socket 连接

相关内容

热门资讯

《秋水时至》文言文阅读及答案 《秋水时至》文言文阅读及答案  阅读并回答问题。  秋水时至,百川灌河。泾流之大,两涘渚崖之间,不辩...
《岳阳楼记》的原文及译文 《岳阳楼记》的原文及译文  《岳阳楼记》是北宋文学家范仲淹于庆历六年九月十五日(1046年10月17...
诗经取名女孩 诗经取名女孩大全  利用诗经取名,一直是中国人非常擅长和喜欢的一件事,诗经中含有哲理,含有思想,含有...
杜甫《春夜喜雨》全诗以及解释 杜甫《春夜喜雨》全诗以及解释  杜甫的《春夜喜雨》是描绘春夜雨景,表现喜悦心情的名作。下面我们为大家...
桃花源记读后感 桃花源记读后感(精选15篇)  认真品味一部名著后,相信你一定有很多值得分享的收获,为此需要认真地写...
花落人亡,红楼梦断-读《红楼... 花落人亡,红楼梦断-读《红楼梦》有感作文900字  一曲《红楼梦》,将人世间哀情道遍;一首《葬花吟》...
本草纲目律草文言文 本草纲目律草文言文  作者:李时珍  释名  勒草、葛勒蔓、来莓草。  气味  甘、苦、寒、无毒。 ...
北人生而不识菱者文言文翻译注... 北人生而不识菱者文言文翻译注释及道理  1、文言文  北人生而不识菱者,仕于南方。席上啖菱,并壳入口...
李白传读后感 李白传读后感(精选5篇)  读完一本名著以后,你心中有什么感想呢?这时候,最关键的读后感怎么能落下!...
梦游天姥吟留别高考语文考点 梦游天姥吟留别高考语文考点  原文:  海客谈瀛洲,烟涛微茫信难求,越人语天姥,云霞明灭或可睹。天姥...
乡愁体的作文600字 乡愁体的作文600字  《乡愁》以朴素、简明、隽永的语言,高超的艺术技巧,表达了台湾人民盼望海峡两岸...
《长干曲其二》的原文赏析及翻... 《长干曲其二》的原文赏析及翻译注释  长干曲 其二  崔颢  家临九江水,来去九江侧。  同是长干人...
《山海经》异兽介绍 《山海经》异兽介绍  山海经里的奇珍异兽很多,是集齐古人智慧与想象力的书。小编整理了《山海经》异兽介...
红楼梦读后感300字 红楼梦读后感300字(通用11篇)  当看完一本著作后,想必你有不少可以分享的东西,此时需要认真地做...
《名人传》简介   《名人传》简介  作者介绍:  罗曼·罗兰(Romain Rolland,1866——1944)...
《小石潭记》中考真题 历年语文高考真题与答案推荐度:历年英语高考真题与答案推荐度:高考语文全国乙卷真题和答案推荐度:新高考...
林采薇简介 林采薇简介  林采薇(Yumi ),3月5日出生于台湾台北。就读于台湾辅仁大学韩语系,现为伊林模特儿...
《水调歌头再用韵呈南涧》诗词 《水调歌头再用韵呈南涧》诗词  水调歌头 再用韵呈南涧 辛弃疾 宋  千古老蟾口,云洞插天开。涨痕当...
将进酒 李白拼音版 将进酒 李白拼音版  qiāng jìn jiǔ  将 进 酒  jūn bú jiàn huáng...
三字经全文句句赏析 三字经全文句句赏析  rén zhī chū xìng běn shàn xìng xiāng jì...