大师网-带你快速走向大师之路 解决你在学习过程中的疑惑,带你快速进入大师之门。节省时间,提升效率

kafka消费组设计

kafka消费组

0.8版本后offsize存储到了__consumer_offsets队列里,这个队列有50个分区。先看下offsize在__consumer_offsets的存储情况。

bin/kafka-simple-consumer-shell.sh --topic __consumer_offsets --partition 49 --broker-list localhost:9092,localhost:9093,localhost:9094 --formatter "kafka.coordinator.GroupMetadataManager\$OffsetsMessageFormatter"

1、localhost要替换成服务器的ip

2、partition位置的计算方法是:Math.abs(消费组groupID.hashCode()) % 50


指定的消费组,在__consumer_offsets存储情况

1.能看出结构是个map,key是topic+partition,value是offsets

2.有过期时间,会有定时定位去压缩清理__consumer_offsets队列,使保持最近的状态


消费组Rebalance

什么情况下会出发rebalance?

1.topic的partition数发生变化(broker宕机或者增加partition)

2.当增加或减少consumer时

比如3个broker,topic有3个partition,有一个consumer group里有3个consumer去消费这个topic。如图所示(此处缺个图),此时会分配每个consumer去消费一个partition。当consumer减少为1个时,就会触发rebalance,去消费3个分区。


rebalance的设计过程是怎样的呢?参考博文

1。kafka设计里有一个coordinator的协调角色负责rebalance

coordinator是服务端角色程序

每个consumer group对应一个coordinator实例。

coordinator的所在broker的确定,之前说明Math.abs(消费组groupID.hashCode()) % 50能确认consumer group的offsets所在的分区,选这个分区的leader所在机器。

交互的过程简化版,举例子consumer增减的情况。

1.每个consumer定时发送心跳给coordinator,coordinator发现有变化。给所有的consumer发response,协议里说明要重新rebalance了,把自己的消费分区信息发给coordinator。

2.coordinator从所有的consumer里选出一个leader角色(目的是消费组的rebalance过程是设计发生在客户端去做的,这样设计是为了灵活),把所有的consumer消费分区信息发给leader。

3.这个consumer leader决定分配方案,发给coordinator

4.然后coordinator再把具体的分配发给对应的consumer,完成rebalance


请求协调者,协调者指定consumer leader

分配方案,下发方案

上述过程多了个consumer leader角色,使交互变得复杂些,目的就是想让rebalance的过程

交给客户端去做,增加灵活性。实现方式是需要覆盖consumer的参数:partition.assignment.strategy来实现自己分配策略就好了。应用场景还

比如可以为consumer挑选同一个机架下的分区数据,减少网络传输的开销。不去覆盖自定义策略的话,Kafka默认为你提供了两种分配策略:range和round-robin。

交互时序编码细节可以看下状态定义,基本跟订单状态类似,系统的实现必要的东西。