当前位置: 技术文章>> Kafka延迟操作-DelayedJoin

文章标题:Kafka延迟操作-DelayedJoin
文章标签: kafka

DelayedJoin是协助组协调器在消费组准备平衡操作时进行相应的处理。当消费组的状态转换为PreparingRebalance时,即准备进行平衡操作,在组协调器的prepareRebalance()方法中会创建一个DelayedJoin对象,并交由DelayedOperationPurgatory负责监视管理。在消费组进行平衡操作时之所以需要DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest时都会检测DelayedJoin是否满足了完成执行的条件。

在消费组进行平衡操作时之所以需要DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest时都会检测DelayedJoin是否满足了完成执行的条件。

DelayedJoin相应方法的实现是调用GroupCoordinator相关方法来完成。DelayedJoin.tryComplete()调用的是GroupCoordinator.tryCompleteJoin()方法,该方法判断是否还有未申请加入消费组的消费者,若所有消费者均已申请加入消费组,则表示DelayedJoin满足了完成执行的条件,否则继续等待,直到满足执行条件或超时。而DelayedJoin.onComplete()方法调用的是GroupCoordinator.onCompleteJoin()方法,onCompleteJoin()方法的主要执行逻辑如下。

2)若消费组的状态不为Dead(消费组的状态会在3.3节进行相应介绍),则先初始化与协调器对应的一个轮值标识generationId,然后根据该消费组下的成员列表是否为空分别做相应处理。若该消费组没有任何成员,则需要构造消息的Value为空的消费组相关元数据消息,即该消费组对应的元数据信息为空,这里是利用Kafka消息压缩清除的原理,当某消息的Value为空时则表示将要删除同Key的消息,组协调器通过这种方式将消费组相应数据从Kafka内部主题(“__consumer_offsets”)中清除。否则,遍历消费组下的每个成员构造JoinGroupResult,不过Leader消费者比Follower消费者多一个当前消费组的元数据信息字段。最后通过回调函数将JoinGroupResult发送给消费者,并对当前和下一次的心跳检测做相应处理。

(3)将第2步消费组元数据消息写入Kafka内部主题,即在第2步若消费组下已没有任何成员时,只是构造了一条与消费组元数据信息相关的消息,该消息的Value为空,这样当经由本步操作之后,会将该消费组在Kafka内部主题保存的消息删除。DelayedJoin的功能及相应方法已介绍完毕,DelayedJoin.onExpiration()的方法也是调用GroupCoordinator.onExpireJoin()方法,不过该方法没有做任何实现。


推荐文章