2.4.2 TimerService时间服务

对于需要依赖时间定时器进行数据处理的算子来讲,需要借助TimerService组件实现对定时器的管理,其中定时器执行的具体处理逻辑主要通过回调函数定义。每个StreamOperator在创建和初始化的过程中,都会通过InternalTimeServiceManager创建TimerService实例,这里的InternalTimeServiceManager管理了Task内所有和时间相关的服务,并向所有Operator提供创建和获取TimerService的方法。

1.TimerService的设计与实现

我们先来看下TimerService的设计与实现,如图2-14所示,在DataStream API中提供了TimerService接口,用于获取和操作时间相关的信息。TimerService接口的默认实现有SimpleTimerService,在Flink Table API模块的AbstractProcessStreamOperator.ContextImpl内部类中也实现了TimerService接口。从图中可以看出,SimpleTimerService会将InternalTimerService接口作为内部成员变量,因此在SimpleTimerService中提供的方法基本上都是借助InternalTimerService实现的。

InternalTimerService实际上是TimerService接口的内部版本,而TimerService接口是专门供用户使用的外部接口。InternalTimerService需要按照Key和命名空间进行划分,并提供操作时间和定时器的内部方法,因此不仅是SimpleTimerService通过InternalTimerService操作和获取时间信息以及定时器,其他还有如WindowOperator、IntervalJoinOperator等内置算子也都会通过InternalTimerService提供的方法执行时间相关的操作。

图2-14 TimerService UML关系图

2.TimerService应用举例

接下来我们以KeyedProcessFunction实现类DeduplicateKeepLastRowFunction为例,详细说明在自定义函数中如何通过调用和操作TimerService服务实现时间信息的获取和定时器的注册。

如代码清单2-28所示,KeyedProcessOperator.open()方法主要包括如下逻辑。

·调用getInternalTimerService()方法创建和获取InternalTimerService实例。实际上最终调用的是AbstractStreamOperator.getInternalTimerService()方法获取InternalTimervService实例。

·基于InternalTimerService实例创建SimpleTimerService实例。

·将创建好的SimpleTimerService封装在ContextImpl和OnTimerContextImpl上下文对象中,此时KeyedProcessFunction的实现类就可以通过上下文获取SimpleTimerService实例了。

代码清单2-28 KeyedProcessOperator.open()方法定义


public void open() throws Exception {
   super.open();
   collector = new TimestampedCollector<>(output);
   InternalTimerService<VoidNamespace> internalTimerService =
      getInternalTimerService("user-timers", VoidNamespaceSerializer.INSTANCE, 
         this);
   TimerService timerService = new SimpleTimerService(internalTimerService);
   context = new ContextImpl(userFunction, timerService);
   onTimerContext = new OnTimerContextImpl(userFunction, timerService);
}

DeduplicateKeepLastRowFunction继承并实现了KeyedProcessFunction接口,如代码清单2-29所示,在DeduplicateKeepLastRowFunction.processElement()方法定义中可以看出,调用Context.timerService()方法获取TimerService实现类,然后调用TimerService.currentProcessingTime()方法获取当前的处理时间,接下来调用registerProcessingCleanupTimer()方法注册状态数据清理定时器。

代码清单2-29 DeduplicateKeepLastRowFunction.processElement()方法定义


public void processElement(BaseRow input, Context ctx, Collector<BaseRow> out) 
   throws Exception {
   if (generateRetraction) {
      long currentTime = ctx.timerService().currentProcessingTime();
      // 注册状态数据清理定时器
      registerProcessingCleanupTimer(ctx, currentTime);
   }
   processLastRow(input, generateRetraction, state, out);
}

对于registerProcessingCleanupTimer()方法,实际上就是调用timerService.registerProcessingTimeTimer(cleanupTime)注册基于处理时间的定时器。

系统时间到达Timer指定的时间后,TimerService会调用和触发注册的定时器,然后调用DeduplicateKeepLastRowFunction.onTimer()方法。从onTimer()方法定义中可以看出,调用cleanupState()方法完成了对指定状态数据的清理操作,如代码清单2-30所示。

代码清单2-30 DeduplicateKeepLastRowFunction.onTimer()方法定义


public void onTimer(long timestamp, OnTimerContext ctx, Collector<BaseRow> 
   out) throws Exception {
   if (stateCleaningEnabled) {
      cleanupState(state);
   }
}

3.InternalTimerService详解

TimerService实际上将InternalTimerService进行了封装,然后供StreamOperator中的KeyedProcessFunction调用,接下来我们看InternalTimerService的具体实现。

如图2-15所示,从InternalTimerService的UML关系图中可以看出,InternalTimerService接口实现了如下方法。

·currentProcessingTime():获取当前的处理时间。

·currentWatermark():获取当前算子基于事件时间的Watermark。

·registerProcessingTimeTimer(...):注册基于处理时间的定时器。

·deleteProcessingTimeTimer(...):删除基于处理时间的定时器。

·registerEventTimeTimer(...):注册基于事件时间的定时器。

·deleteEventTimeTimer(...):删除基于事件时间的定时器。

图2-15 InternalTimerService UML关系图

InternalTimerService接口具有InternalTimerServiceImpl的默认实现类,在InternalTimerServiceImpl中,实际上包含了两个比较重要的成员变量,分别为processingTimeService和KeyGroupedInternalPriorityQueue<TimerHeapInternalTimer<K, N>>队列。其中processingTimeService是基于系统处理时间提供的TimerService,也就是说,基于ProcessingTimeService的实现类可以注册基于处理时间的定时器。TimerHeapInternalTimer队列主要分为processingTimeTimersQueue和eventTimeTimersQueue两种类型,用于存储相应类型的定时器队列。TimerHeapInternalTimer基于Heap堆内存存储定时器,并通过HeapPriorityQueueSet结构存储注册好的定时器。

在InternalTimerServiceImpl中,会记录currentWatermark信息,用于表示当前算子的最新Watermark,实际上InternalTimerServiceImpl实现了基于Watermark的时钟,此时算子会递增更新InternalTimerServiceImpl中Watermark对应的时间戳。此时InternalTimerService会判断eventTimeTimersQueue队列中是否有定时器、是否满足触发条件,如果满足则将相应的TimerHeapInternalTimer取出,并执行对应算子中的onEventTime()回调方法,此时就和ProcessFunction中的onTimer()方法联系在一起了。

这里我们以IntervalJoinOperator为例说明内部算子如何直接调用InternalTimerService注册定时器。

如代码清单2-31所示,在IntervalJoinOperator.processElement()方法中,实际上会调用internalTimerService.registerEventTimeTimer()方法注册基于事件时间的定时器,专门用于数据清理任务。随后internalTimerService会根据指定的cleanupTime完成对窗口中历史状态数据的清理。

代码清单2-31 IntervalJoinOperator.processElement方法


long cleanupTime = (relativeUpperBound > 0L) ? ourTimestamp + 
   relativeUpperBound : ourTimestamp;
if (isLeft) {
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_LEFT, 
      cleanupTime);
} else {
   internalTimerService.registerEventTimeTimer(CLEANUP_NAMESPACE_RIGHT, 
      cleanupTime);
}

如代码清单2-32所示,当StreamOperator算子中的Watermark更新时,就会通过InternalTimeServiceManager通知所有的InternalTimerService实例,这里实际上就是调用InternalTimerServiceImpl.advanceWatermark()方法实现的。从advanceWatermark()方法中可以看出,首先会通过最新的时间更新currentWatermark,然后从eventTimeTimersQueue队列中获取事件时间定时器,最后判断timer.getTimestamp()是否小于接入的time变量,如果小于,则说明当前算子的时间大于定时器中设定的时间,此时就会执行triggerTarget.onEventTime (timer)方法,这里的triggerTarget实际上就是StreamOperator的具体实现类。

代码清单2-32 InternalTimerServiceImpl.advanceWatermark()方法定义


public void advanceWatermark(long time) throws Exception {
   currentWatermark = time;
   InternalTimer<K, N> timer;
   while ((timer = eventTimeTimersQueue.peek()) != null && timer.
      getTimestamp() <= time) {
      eventTimeTimersQueue.poll();
      keyContext.setCurrentKey(timer.getKey());
      triggerTarget.onEventTime(timer);
   }
}

接下来我们看看在算子中如何通过调用AbstractStreamOperator.getInternalTimerService()方法创建和获取InternalTimerService实例。

如代码清单2-33所示,在AbstractStreamOperator.getInternalTimerService()方法中,实际上会调用InternalTimeServiceManager.getInternalTimerService()方法获取InternalTimerService实例。在一个Operator中可以同时创建多个TimerService实例,且必须具有相应的KeySerializer和NamespaceSerializer序列化类,如果不需要区分Namespace类型,也可以使用VoidNamespaceSerializer。

除了name和timerSerializer参数外,getInternalTimerService()方法还需要传递triggerable回调函数作为参数。当触发定时器时会调用Triggerable接口的onEventTime()或onProcessingTime()方法,以触发定时调度需要执行的逻辑,这里的Triggerable接口实现类实际上就是StreamOperator接口的实现类。

代码清单2-33 InternalTimeServiceManager.getInternalTimerService()方法


//获取InternalTimerService
public <N> InternalTimerService<N> getInternalTimerService(
   String name,
   TimerSerializer<K, N> timerSerializer,
   Triggerable<K, N> triggerable) {
   InternalTimerServiceImpl<K, N> timerService = registerOrGetTimerService
      (name, timerSerializer);
   timerService.startTimerService(
      timerSerializer.getKeySerializer(),
      timerSerializer.getNamespaceSerializer(),
      triggerable);
   return timerService;
}

如代码清单2-34所示,在getInternalTimerService()方法中实际上会调用registerOrGetTimerService()方法注册和获取InternalTimerService实例。在InternalTimeServiceManager.registerOrGetTimerService中可以看出,会事先根据名称从timerServices的HashMap获取已经注册的InternalTimerService,如果没有获取到,则实例化InternalTimerServiceImpl类,创建新的TimerService。

代码清单2-34 InternalTimeServiceManager.registerOrGetTimerService()方法定义


// 注册及获取TimerService
<N> InternalTimerServiceImpl<K, N> registerOrGetTimerService(String name, 
   TimerSerializer<K, N> timerSerializer) {
      InternalTimerServiceImpl<K, N> timerService = 
         (InternalTimerServiceImpl<K, N>)
//先从timerServices中获取创建好的TimerService
   timerServices.get(name);
      // 如果没有获取到就创建新的timerService
      if (timerService == null) {
         timerService = new InternalTimerServiceImpl<>(
            localKeyGroupRange,
            keyContext,
            processingTimeService,
            createTimerPriorityQueue(PROCESSING_TIMER_PREFIX + name, 
               timerSerializer),
            createTimerPriorityQueue(EVENT_TIMER_PREFIX + name, 
               timerSerializer));
         timerServices.put(name, timerService);
      }
      return timerService;
   }