ConsumerConfig.scala 储存Consumer的配置
按照我的理解,0.10的Kafka没有专门的SimpleConsumer,仍然是沿用0.8版本的。
消费的规则如下:
一个partition只能被同一个ConsumersGroup的一个线程所消费.线程数小于partition数,某些线程会消费多个partition.线程数等于partition数,一个线程正好消费一个线程.当添加消费者线程时,会触发rebalance,partition的分配发送变化.同一个partition的offset保证消费有序,不同的partition消费不保证顺序.Consumers编程的用法:
private final KafkaConsumer<Long, String> consumer; // 与Kafka进行通信的consumer ... consumer = new KafkaConsumer<Long, String>(props); consumer.subscribe(Collections.singletonList(this.topic)); ConsumerRecords<Long, String> records = consumer.poll(512); ...consumer,是一个纯粹的单线程程序,后面所讲的所有机制(包括coordinator,rebalance, heartbeat等),都是在这个单线程的poll函数里面完成的。也因此,在consumer的代码内部,没有锁的出现。
从KafkaConsumer的构造函数可以看出,KafkaConsumer有以下几个核心部件:
Metadata: 存储Topic/Partion与broker的映射关系NetworkClient:网络层 A network client for asynchronous request/response network i/o.ConsumerNetworkClient: Higher level consumer access to the network layer //对NetworkClient的封装,非线程安全ConsumerCoordinator:只是client端的类,只是和服务端的GroupCoordinator通信的介质。(broker端的Coordinator 负责reblance、Offset提交、心跳)SubscriptionState: consumer的Topic、Partition的offset状态维护Fetcher: manage the fetching process with the brokers. //获取消息后面会分组件讲解Consumers的工作流程
见Producer里面的分析。
补充一下,KafkaConsumer、KafkaProducer都是在构造函数中获取metadata信息,通过调用metadata.update方法来获取信息。
为什么?后面讲
提问:为什么在一个group内部,1个parition只能被1个consumer拥有?
给定一个topic,有4个partition: p0, p1, p2, p3, 一个group有3个consumer: c0, c1, c2。
那么,如果按RangeAssignor策略,分配结果是: c0: p0, c1: p1, c2: p2, p3如果按RoundRobinAssignor策略: c0: p1, p3, c1: p1, c2: p2partition.assignment.strategy=RangeAssignor,默认值(到底是哪种分配状态呢) 那这整个分配过程是如何进行的呢?见下图所示:
1. 步骤1:对于每1个consumer group,Kafka集群为其从broker集群中选择一个broker作为其coordinator。因此,第1步就是找到这个coordinator。(1个consumer group对应一个coordinattor)
GroupCoordinatorRequest: GCR,由ConsumerNetworkClient发送请求去寻找coordinator。
2. 步骤2:找到coordinator之后,发送JoinGroup请求 consumer在这里会被划分leader、follower(无责任的说:选择第一个consumer)
leader作用:perform the leader synchronization and send back the assignment for the group(负责发送partition分配的结果)follower作用:send follower's sync group with an empty assignment3. 步骤3:JoinGroup返回之后,发送SyncGroup,得到自己所分配到的partition SyncGroupRequest
consumer leader发送 SyncGroupRequest给Coordinator,Coordinator回给它nullfollower发送 null的 SyncGroupRequest 给Coordinator,Coordinator回给它partition分配的结果。注意,在上面3步中,有一个关键点:
partition的分配策略和分配结果其实是由client决定的,而不是由coordinator决定的。什么意思呢?在第2步,所有consumer都往coordinator发送JoinGroup消息之后,coordinator会指定其中一个consumer作为leader,其他consumer作为follower。然后由这个leader进行partition分配。然后在第3步,leader通过SyncGroup消息,把分配结果发给coordinator,其他consumer也发送SyncGroup消息,获得这个分配结果。接下来就到Fetcher拉取数据了
四个步骤
步骤0:获取consumer的offset步骤1:生成FetchRequest,并放入发送队列步骤2:网络poll步骤3:获取结果当consumer初次启动的时候,面临的一个首要问题就是:从offset为多少的位置开始消费。
poll之前,给集群发送请求,让集群告知客户端,当前该TopicPartition的offset是多少。通过SubscriptionState来实现, 通过ConsumerCoordinator
if (!subscriptions.hasAllFetchPositions()) updateFetchPositions(this.subscriptions.missingFetchPositions());核心是:向Coordinator发了一个OffsetFetchRequest,并且是同步调用,直到获取到初始的offset,再开始接下来的poll.(也就是说Offset的信息如果存在Kafka里,是存在GroupCoordinator里面)
consumer的每个TopicPartition都有了初始的offset,接下来就可以进行不断循环取消息了,这也就是Fetch的过程:
核心就是生成FetchRequest: 假设一个consumer订阅了3个topic: t0, t1, t2,为其分配的partition分别是: t0: p0; t1: p1, p2; t2: p2
即总共4个TopicPartition,即t0p0, t0p1, t1p1, t2p2。这4个TopicPartition可能分布在2台机器n0, n1上面: n0: t0p0, t1p1 n1: t0p1, t2p2
则会分别针对每台机器生成一个FetchRequest,即Map<Node, FetchRequest>。所以会有一个方法把所有属于同一个Node的TopicPartition放在一起,生成一个FetchRequest。
调用ConsumerNetworkClient.poll发送网络请求。向服务器发 送响应请求和获取服务器的响应。(默认值:executeDelayedTasks=true)
获取Broker返回的Response,里面包含了List<ConsumerRecord> records
下面从自动消费确认来分析,Offset自动确认是由ConsumerCoordinator的AutoCommitTask来实现的。
其调用在ConsumerNetworkClient的 DelayedTaskQueue delayedTasks里面,然后被周期性的调用。 周期性的发送确认消息,类似HeartBeat,其实现机制也就是前面所讲的DelayedQueue + DelayedTask.
poll函数中的注释: // execute delayed tasks (e.g. autocommits and heartbeats) prior to fetching records
可以这样理解:第二次poll调用的时候,提交上一次poll的offset和心跳发送。先提交offset,再去拉取record。那么这次Offset其实是上一次poll的Record的offset。因此,当你把按照下面的逻辑写程序的时候,可能会导致Consumer与Coordinator的心跳超时。
while(true) { consumer.poll(); do process message // 假如这个耗时过长,那么这个consumer就无法发送心跳给coordinator,导致它错误认为这个consumer失去联系了,引起不必要的rebalance。槽糕的情况下,会丢重复消费数据。 }