当前位置: 技术文章>> kafka延迟操作

文章标题:kafka延迟操作
文章标签: kafka

关联小册:


链接:

https://www.maxiaoke.com/manual/kafka_base.html


本节先简要介绍Kafka延迟操作的组件,该组件可以辅助Kafka其他组件完成相应的功能,如协助客户端处理创建主题操作、协助组协调器(GroupCoordinator)处理JoinGroupRequest和HeartbeatRequest请求、协助副本管理器(ReplicaManager)处理ProduceRequest和FetchRequest请求。因此在讲解Kafka其他组件之前,先介绍Kafka的延迟操作组。



 DelayedOperationKafka将一些不立即执行而要等待满足一定条件之后才触发完成的操作称为延迟操作,并将这类操作定义为一个抽象类DelayedOperation, DelayedOperation是一个基于事件启动有失效时间的TimerTask。TimerTask实现了Runnable接口,维护了一个TimerTaskEntry对象,TimerTaskEntry绑定了一个TimerTask,TimerTaskEntry被添加到TimerTaskList中,TimerTaskList是一个环形双向链表,按失效时间排序。DelayedOperation是一个抽象类,具体的延迟操作类继承于该抽象类,分别用来协助相应组件对不同的请求完成延迟处理操作,类图如图3-1所示。



DelayedOperation只有一个AtomicBoolean类型的completed属性,用来控制某个延迟操作。在延迟时间(delayMs)内,onComplete()方法只被调用一次。DelayedOperation主要方法如下。● tryComplete()方法:一个抽象方法,由子类来实现,负责检测执行条件是否满足。若满足执行条件,则调用forceComplete()方法完成延迟操作。● forceCompete()方法:该方法在条件满足时,检测延迟任务是否未被执行。若未被执行,则先调用TimerTask.cancel()方法解除该延迟操作与TimerTaskEntry的绑定,将该延迟操作从TimerTaskList链表中移除,然后调用onComplete()方法让延迟操作执行完成。


通过completed的CAS原子操作(completed.compareAndSet),可以保证并发操作时只有第一个调用该方法的线程能够顺利调用onComplete()完成延迟操作,其他线程获取的completed属性为false,即不会调用onComplete()方法,这就保证了onComplete()只会被调用一次。


● onComplete()方法:是一个抽象方法,由子类来实现,执行延迟操作满足执行条件后需要执行的实际业务逻辑。例如,DelayedProduce和DelayedFetch都是在该方法内调用responseCallback向客户端做出响应。


● safeTryComplete()方法:以synchronized同步锁调用onComplete()方法,供外部调用。


● onExpiration()方法:也是一个抽象方法,由子类来实现当延迟操作已达失效时间的相应逻辑处理。Kafka通过SystemTimer来定期检测请求是否超时。SystemTimer是Kafka实现的底层基于层级时间轮和DelayQueue定时器,维护了一个newFixedThreadPool线程池,用于提交相应的线程执行。


例如,当检测到延迟操作已失效时则将延迟操作提交到该线程池,即执行线程的run()方法的逻辑。DelayedOperation覆盖了TimerTask的run()方法,在该方法中先调用forceCompete()方法,当该方法返回true后再调用onExpiration()方法。


Kafka当前的设计onComplete()方法是向客户端做出响应的唯一出口,当延迟操作达到失效时间时也是先执行forceCompete()方法,让onComplete()方法执行之后再调用onExpiration()方法,在onExpiration()方法中仅是进行相应的过期信息收集之类的操作。


DelayedOperationPurgatory是一个对DelayedOperation管理的辅助类,为了书写简便,我们将其简称为Purgatory。Purgatory以泛型的形式将一个DelayedOperation添加到其内部维护的Pool[Any, Watchers]类型watchersForKey对象中,同时将DelayedOperation添加到SystemTimer中。


其中,Watchers是Purgatory的内部类,底层是一个ConcurrentLinkedQueue,该类定义了一个ConcurrentLinkedQueue类型的operations属性,用于保存DelayedOperation。从Watchers类名可以看出,该类的作用就是对DelayedOperation进行监视。Watchers提供了以下3个对DelayedOperation操作的方法。


● watch()方法:用于将DelayedOperation添加到operations集合中。


● tryCompleteWatched()方法:用于迭代operations集合中的DelayedOperation,通过DelayedOperation.isCompleted检测该DelayedOperation是否已执行完成。若已执行完成,则从operations集合中移除该DelayedOperation。否则调用DelayedOperation.safeTryComplete()方法尝试让该DelayedOperation执行完成,若执行完成,即safeTryComplete()方法返回true,则将该DelayedOperation从operations集合中移除。最后检测operations集合是否为空,如果operations为空,则表示该operations所关联的DelayedOperation已全部执行完成,因此将该Watchers从Purgatory的Pool中移除。


● purgeCompleted()方法:与tryCompleteWatched()方法基本功能类似,区别在于purgeCompleted()方法只单纯地将该operations集合中已完成的DelayedOperation移除,对未完成的DelayedOperation并不尝试将其执行完成。


我们可以简单地将Purgatory与Spring Quartz类比,这样对Purgatory的作用就不难理解了。Purgatory相当于Quartz的SchedulerFactoryBean,而DelayedOperation相当于ScheduleFactoryBean所管理的具体Schedule,由Purgatory负责调度。只不过Purgatory除了管理调度DelayedOperation之外,还负责DelayedOperation超时的管理。下面简要介绍Purgatory的两个主要方法。


● tryCompleteElseWatch()方法:该方法首先调用待检测的DelayedOperation.safeTryComplete()方法,检测是否能执行完成,若未执行完成,则迭代watchersForKey对应的DelayedOperation检测DelayedOperation是否已完成,若未完成,则将其添加到Watchers中。添加完成后,再调用safeTryComplete()方法再次尝试让DelayedOpeartion执行完成,若还是未完成,再将其添加到SystemTimer中。添加完后再次检测是否执行完成,若已执行完成则将其从SystemTimer中移除可以看到,在整个操作逻辑中多次执行safeTryComplete()方法以及多次检测是否已完成,是以防在操作过程中可能已被其他线程触发执行完成。同时在将DelayedOperation添加到Watchers操作时并没有将原来的Key清理掉,这是因为Purgatory在启动时会同时启动一个ExpiredOperationReaper线程,该线程除了推进时间轮的指针外还会定期清理watchersForKey已完成的DelayedOperation。


checkAndComplete()方法:根据所传入的Key,检测该Key对应的Watchers是否执行完成,若未完成,再调用Watchers.tryCompleteWatched()方法进行处理。


由此可见,Purgatory对DelayedOperation的管理是通过Watchers来完成的,通过Watchers调用DelayedOperation相应的方法,让DelayedOperation要么在delayMs时间内完成,要么超时。在对Purgatory有了基本了解之后,下面将逐一介绍DelayedOperation实现类的具体作用及实现细节。


推荐文章