<p>DelayedJoin是协助组协调器在消费组准备平衡操作时进行相应的处理。当消费组的状态转换为PreparingRebalance时,即准备进行平衡操作,在组协调器的prepareRebalance()方法中会创建一个DelayedJoin对象,并交由DelayedOperationPurgatory负责监视管理。在消费组进行平衡操作时之所以需要DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest时都会检测DelayedJoin是否满足了完成执行的条件。</p><p>在消费组进行平衡操作时之所以需要DelayedJoin处理,是为了让组协调器等待当前消费组下所有的消费者都请求加入消费组,即发起了JoinGroupRequest请求。每次组协调器处理完JoinGroupRequest时都会检测DelayedJoin是否满足了完成执行的条件。</p><p>DelayedJoin相应方法的实现是调用GroupCoordinator相关方法来完成。DelayedJoin.tryComplete()调用的是GroupCoordinator.tryCompleteJoin()方法,该方法判断是否还有未申请加入消费组的消费者,若所有消费者均已申请加入消费组,则表示DelayedJoin满足了完成执行的条件,否则继续等待,直到满足执行条件或超时。而DelayedJoin.onComplete()方法调用的是GroupCoordinator.onCompleteJoin()方法,onCompleteJoin()方法的主要执行逻辑如下。</p><p>2)若消费组的状态不为Dead(消费组的状态会在3.3节进行相应介绍),则先初始化与协调器对应的一个轮值标识generationId,然后根据该消费组下的成员列表是否为空分别做相应处理。若该消费组没有任何成员,则需要构造消息的Value为空的消费组相关元数据消息,即该消费组对应的元数据信息为空,这里是利用Kafka消息压缩清除的原理,当某消息的Value为空时则表示将要删除同Key的消息,组协调器通过这种方式将消费组相应数据从Kafka内部主题(“__consumer_offsets”)中清除。否则,遍历消费组下的每个成员构造JoinGroupResult,不过Leader消费者比Follower消费者多一个当前消费组的元数据信息字段。最后通过回调函数将JoinGroupResult发送给消费者,并对当前和下一次的心跳检测做相应处理。</p><p>(3)将第2步消费组元数据消息写入Kafka内部主题,即在第2步若消费组下已没有任何成员时,只是构造了一条与消费组元数据信息相关的消息,该消息的Value为空,这样当经由本步操作之后,会将该消费组在Kafka内部主题保存的消息删除。DelayedJoin的功能及相应方法已介绍完毕,DelayedJoin.onExpiration()的方法也是调用GroupCoordinator.onExpireJoin()方法,不过该方法没有做任何实现。</p><p><br/></p>
文章列表
<p>关联小册:</p><p><img src="/uploads/images/20230710/abfeea8b3fa115a76a11593ab8f2072a.jpg" title="1.jpg" alt=""/></p><p><br/></p><p>链接:</p><p><a href="https://www.maxiaoke.com/manual/kafka_base.html" target="_blank" title="kafka入门到实战">https://www.maxiaoke.com/manual/kafka_base.html</a></p><p><br/></p><p>本节先简要介绍Kafka延迟操作的组件,该组件可以辅助Kafka其他组件完成相应的功能,如协助客户端处理创建主题操作、协助组协调器(GroupCoordinator)处理JoinGroupRequest和HeartbeatRequest请求、协助副本管理器(ReplicaManager)处理ProduceRequest和FetchRequest请求。因此在讲解Kafka其他组件之前,先介绍Kafka的延迟操作组。</p><p><br/></p><p><br/> DelayedOperationKafka将一些不立即执行而要等待满足一定条件之后才触发完成的操作称为延迟操作,并将这类操作定义为一个抽象类DelayedOperation, DelayedOperation是一个基于事件启动有失效时间的TimerTask。TimerTask实现了Runnable接口,维护了一个TimerTaskEntry对象,TimerTaskEntry绑定了一个TimerTask,TimerTaskEntry被添加到TimerTaskList中,TimerTaskList是一个环形双向链表,按失效时间排序。DelayedOperation是一个抽象类,具体的延迟操作类继承于该抽象类,分别用来协助相应组件对不同的请求完成延迟处理操作,类图如图3-1所示。</p><p><br/></p><p><img src="/uploads/images/20230713/29541f0e893bd107c69e632fc156409c.png" title="1.png" alt="" width="886" height="458"/></p><p><br/></p><p>DelayedOperation只有一个AtomicBoolean类型的completed属性,用来控制某个延迟操作。在延迟时间(delayMs)内,onComplete()方法只被调用一次。DelayedOperation主要方法如下。● tryComplete()方法:一个抽象方法,由子类来实现,负责检测执行条件是否满足。若满足执行条件,则调用forceComplete()方法完成延迟操作。● forceCompete()方法:该方法在条件满足时,检测延迟任务是否未被执行。若未被执行,则先调用TimerTask.cancel()方法解除该延迟操作与TimerTaskEntry的绑定,将该延迟操作从TimerTaskList链表中移除,然后调用onComplete()方法让延迟操作执行完成。</p><p><br/></p><p>通过completed的CAS原子操作(completed.compareAndSet),可以保证并发操作时只有第一个调用该方法的线程能够顺利调用onComplete()完成延迟操作,其他线程获取的completed属性为false,即不会调用onComplete()方法,这就保证了onComplete()只会被调用一次。</p><p><br/></p><p>● onComplete()方法:是一个抽象方法,由子类来实现,执行延迟操作满足执行条件后需要执行的实际业务逻辑。例如,DelayedProduce和DelayedFetch都是在该方法内调用responseCallback向客户端做出响应。</p><p><br/></p><p>● safeTryComplete()方法:以synchronized同步锁调用onComplete()方法,供外部调用。</p><p><br/></p><p>● onExpiration()方法:也是一个抽象方法,由子类来实现当延迟操作已达失效时间的相应逻辑处理。Kafka通过SystemTimer来定期检测请求是否超时。SystemTimer是Kafka实现的底层基于层级时间轮和DelayQueue定时器,维护了一个newFixedThreadPool线程池,用于提交相应的线程执行。</p><p><br/></p><p>例如,当检测到延迟操作已失效时则将延迟操作提交到该线程池,即执行线程的run()方法的逻辑。DelayedOperation覆盖了TimerTask的run()方法,在该方法中先调用forceCompete()方法,当该方法返回true后再调用onExpiration()方法。</p><p><br/></p><p>Kafka当前的设计onComplete()方法是向客户端做出响应的唯一出口,当延迟操作达到失效时间时也是先执行forceCompete()方法,让onComplete()方法执行之后再调用onExpiration()方法,在onExpiration()方法中仅是进行相应的过期信息收集之类的操作。</p><p><br/></p><p>DelayedOperationPurgatory是一个对DelayedOperation管理的辅助类,为了书写简便,我们将其简称为Purgatory。Purgatory以泛型的形式将一个DelayedOperation添加到其内部维护的Pool[Any, Watchers]类型watchersForKey对象中,同时将DelayedOperation添加到SystemTimer中。</p><p><br/></p><p>其中,Watchers是Purgatory的内部类,底层是一个ConcurrentLinkedQueue,该类定义了一个ConcurrentLinkedQueue类型的operations属性,用于保存DelayedOperation。从Watchers类名可以看出,该类的作用就是对DelayedOperation进行监视。Watchers提供了以下3个对DelayedOperation操作的方法。</p><p><br/></p><p>● watch()方法:用于将DelayedOperation添加到operations集合中。</p><p><br/></p><p>● tryCompleteWatched()方法:用于迭代operations集合中的DelayedOperation,通过DelayedOperation.isCompleted检测该DelayedOperation是否已执行完成。若已执行完成,则从operations集合中移除该DelayedOperation。否则调用DelayedOperation.safeTryComplete()方法尝试让该DelayedOperation执行完成,若执行完成,即safeTryComplete()方法返回true,则将该DelayedOperation从operations集合中移除。最后检测operations集合是否为空,如果operations为空,则表示该operations所关联的DelayedOperation已全部执行完成,因此将该Watchers从Purgatory的Pool中移除。</p><p><br/></p><p>● purgeCompleted()方法:与tryCompleteWatched()方法基本功能类似,区别在于purgeCompleted()方法只单纯地将该operations集合中已完成的DelayedOperation移除,对未完成的DelayedOperation并不尝试将其执行完成。</p><p><br/></p><p>我们可以简单地将Purgatory与Spring Quartz类比,这样对Purgatory的作用就不难理解了。Purgatory相当于Quartz的SchedulerFactoryBean,而DelayedOperation相当于ScheduleFactoryBean所管理的具体Schedule,由Purgatory负责调度。只不过Purgatory除了管理调度DelayedOperation之外,还负责DelayedOperation超时的管理。下面简要介绍Purgatory的两个主要方法。</p><p><br/></p><p>● tryCompleteElseWatch()方法:该方法首先调用待检测的DelayedOperation.safeTryComplete()方法,检测是否能执行完成,若未执行完成,则迭代watchersForKey对应的DelayedOperation检测DelayedOperation是否已完成,若未完成,则将其添加到Watchers中。添加完成后,再调用safeTryComplete()方法再次尝试让DelayedOpeartion执行完成,若还是未完成,再将其添加到SystemTimer中。添加完后再次检测是否执行完成,若已执行完成则将其从SystemTimer中移除可以看到,在整个操作逻辑中多次执行safeTryComplete()方法以及多次检测是否已完成,是以防在操作过程中可能已被其他线程触发执行完成。同时在将DelayedOperation添加到Watchers操作时并没有将原来的Key清理掉,这是因为Purgatory在启动时会同时启动一个ExpiredOperationReaper线程,该线程除了推进时间轮的指针外还会定期清理watchersForKey已完成的DelayedOperation。</p><p><br/></p><p>checkAndComplete()方法:根据所传入的Key,检测该Key对应的Watchers是否执行完成,若未完成,再调用Watchers.tryCompleteWatched()方法进行处理。</p><p><br/></p><p>由此可见,Purgatory对DelayedOperation的管理是通过Watchers来完成的,通过Watchers调用DelayedOperation相应的方法,让DelayedOperation要么在delayMs时间内完成,要么超时。在对Purgatory有了基本了解之后,下面将逐一介绍DelayedOperation实现类的具体作用及实现细节。</p><p><br/></p>