消费者的程序入口类是 KafkaConsumer
KafkaConsumer.poll
创建poll 创建 TCP 的地方 :
协调者 (Coordinator) : 驻留在 Broker 的内存中
FindCoordinator
请求到任意个Broker (负载最小) 发送请求,并告知 Broker 的协调者负载评估 : 消费者连接所有 Broker 中,待发送请求最少
Broker 处理完 FindCoordinator
请求后,会返回 Broker 的协调者
消费者给每个要消费的分区创建与该分区领导者副本所在 Broker 连接的 TCP
消费者创建 3 类 TCP 连接:
Kafka 日志:
# 消费者程序创建的第一个 TCP 连接,用于发送 FindCoordinator 请求
# 消费者创建第一个连接,它连接的 Broker 节点的 ID 是 -1 :
# 消费者不知道 Kafka Broker 的任何信息
[2019-05-27 10:00:54,142] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: -1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者复用上次创建 Socket 连接
# 向 Kafka 发送元数据请求,获取整个集群的信息
[2019-05-27 10:00:54,188] DEBUG [Consumer clientId=consumer-1, groupId=test] Sending metadata request MetadataRequestData(topics=[MetadataRequestTopic(name=‘t4’)], allowAutoTopicCreation=true, includeClusterAuthorizedOperations=false, includeTopicAuthorizedOperations=false) to node localhost:9092 (id: -1 rack: null) (org.apache.kafka.clients.NetworkClient:1097)# 消费者开始发送 FindCoordinator 请求里的 Broker
# 即 localhost:9092,nodeId = -1
[2019-05-27 10:00:54,188] TRACE [Consumer clientId=consumer-1, groupId=test] Sending FIND_COORDINATOR {key=test,key_type=0} with correlation id 0 to node -1 (org.apache.kafka.clients.NetworkClient:496)# 消费者成功协调者的 Broker 信息(node_id = 2) 后,
# 消费者就知道协调者 Broker 的连接信息
[2019-05-27 10:00:54,203] TRACE [Consumer clientId=consumer-1, groupId=test] Completed receive from node -1 for FIND_COORDINATOR with correlation id 0, received {throttle_time_ms=0,error_code=0,error_message=null, node_id=2,host=localhost,port=9094} (org.apache.kafka.clients.NetworkClient:837)# 发第二个 Socket 连接,TCP连接 localhost:9094
# 只有连接协调者后,消费者才能开启消费组的各种功能
[2019-05-27 10:00:54,204] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2147483645 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)# 消费者要创建新 TCP 连接,用于实际的消息获取
# 消费分区的领导者副本在哪台 Broker,消费者连接那个 Broker
# 消费者创建 3 个 TCP 连接:
# localhost:9092,localhost:9093 和 localhost:9094
[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9094 (id: 2 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,237] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9092 (id: 0 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)[2019-05-27 10:00:54,238] DEBUG [Consumer clientId=consumer-1, groupId=test] Initiating connection to node localhost:9093 (id: 1 rack: null) using address localhost/127.0.0.1 (org.apache.kafka.clients.NetworkClient:944)
ID = -1 原因 :
ID = 2147483645 原因 :
Integer.MAX_VALUE
- 协调者的 Broker IDInteger.MAX_VALUE - 2
= 2147483645消费者关闭 Socket :
KafkaConsumer.close()
,或执行 kill
connection.max.idle.ms
控制 (默认值: 9 分钟),当某个 Socket 连续 9 分钟都没有任何请求,消费者杀掉该 Socket 连接
上一篇:黑马程序最后