2.5.2 WindowedStream的设计与实现

和其他DataStream操作相比,WindowedStream转换操作相对复杂一些,在本节我们结合前面学习的内容,继续了解WindowedStream转换操作的实现。

我们知道,如果将DataStream根据Key进行分组,生成KeyedStream数据集,然后在KeyedStream上执行window()转换操作,就会生成WindowedStream数据集。如果直接调用DataStream.windowAll()方法进行转换,就会生成AllWindowedStream数据集。WindowedStream和AllWindowedStream的主要区别在于是否按照Key进行分区处理,这里我们以WindowedStream为例讲解窗口转换操作的具体实现。

1.WindowAssigner设计与实现

如代码清单2-39所示,当用户调用KeyedStream.window()方法时,会创建WindowedStream转换操作。通过window()方法可以看出,此时需要传递WindowAssigner作为窗口数据元素的分配器,通过WindowAssigner组件,可以根据指定的窗口类型将数据元素分配到指定的窗口中。

代码清单2-39 KeyedStream.window()方法定义


public <W extends Window> WindowedStream<T, KEY, W> window(WindowAssigner<? 
   super T, W> assigner) {
   return new WindowedStream<>(this, assigner);
}

接下来我们看WindowAssigner的具体实现。如图2-17所示,WindowAssigner作为抽象类,其子类实现是非常多的,例如基于事件时间实现的SlidingEventTimeWindows、基于处理时间实现的TumblingProcessingTimeWindows等。这些WindowAssigner根据窗口类型进行区分,且属于DataStream API中内置的窗口分配器,用户可以直接调用它们创建不同类型的窗口转换。

从图2-17中可以看出,SessionWindow类型的窗口比较特殊,在WindowAssigner的基础上又实现了MergingWindowAssigner抽象类,在MergingWindowAssigner抽象类中定义了MergeCallback接口。这样做的原因是SessionWindow的窗口长度不固定,SessionWindow窗口的长度取决于指定时间范围内是否有数据元素接入,然后动态地将接入数据切分成独立的窗口,最后完成窗口计算。此时涉及对窗口中的元素进行动态Merge操作,这里主要借助MergingWindowAssigner提供的mergeWindows()方法来实现。

图2-17 WindowAssigner UML关系图

在WindowAssigner中通过提供WindowAssignerContext上下文获取CurrentProcessingTime等时间信息。在WindowAssigner抽象类中提供了以下方法供子类选择。

·assignWindows():定义将数据元素分配到对应窗口的逻辑。

·getDefaultTrigger():获取默认的Trigger,也就是默认窗口触发器,例如EventTimeTrigger。

·getWindowSerializer():获取WindowSerializer实现,默认为TimeWindow.Serializer()。

·isEventTime():判断是否为基于EventTime时间类型实现的窗口。

如代码清单2-40所示,我们以SlidingEventTimeWindows为例进行说明。

代码清单2-40 SlidingEventTimeWindows.assignWindows()方法定义


public Collection<TimeWindow> assignWindows(Object element, long timestamp, 
   WindowAssignerContext context) {
   if (timestamp > Long.MIN_VALUE) {
      List<TimeWindow> windows = new ArrayList<>((int) (size / slide));
      long lastStart = TimeWindow.getWindowStartWithOffset(timestamp, offset, 
         slide);
      for (long start = lastStart;
         start > timestamp - size;
         start -= slide) {
         windows.add(new TimeWindow(start, start + size));
      }
      return windows;
   } else {
      throw new RuntimeException("Record has Long.MIN_VALUE timestamp (= no 
         timestamp marker). " +
         "Is the time characteristic set to 'ProcessingTime', or did you 
            forget to call " +"'DataStream.assignTimestampsAndWatermarks(...)'?");
   }
}

在SlidingEventTimeWindows.assignWindows()方法中可以看出,assignWindows()方法的参数包含了当前数据元素element、timestamp和WindowAssignerContext的上下文信息,且方法主要包含如下逻辑。

·判断timestamp是否有效,然后根据窗口长度和滑动时间计算数据元素所属窗口的数量,再根据窗口数量创建窗口列表。

·调用TimeWindow.getWindowStartWithOffset()方法,确定窗口列表中最晚的窗口对应的WindowStart时间,并赋值给lastStart变量;然后从lastStart开始遍历,每次向前移动固定的slide长度;最后向windows窗口列表中添加创建的TimeWindow,在TimeWindow中需要指定窗口的起始时间和结束时间。

·返回创建的窗口列表windows,也就是当前数据元素所属的窗口列表。

创建的WindowAssigner实例会在WindowOperator中使用,输入一条数据元素时会调用WindowAssigner.assignWindows()方法为接入的数据元素分配窗口,WindowOperator会根据元素所属的窗口分别对数据元素进行处理。

当然还有其他类型的WindowAssigner实现,基本功能都是一样的,主要是根据输入的元素确定和分配窗口。对于SlidingWindow类型的窗口来讲,同一个数据元素可能属于多个窗口,主要取决于窗口大小和滑动时间长度;而对于TumpleWindow类型来讲,每个数据元素仅属于一个窗口。

2.Window Trigger的核心实现

Window Trigger决定了窗口触发WindowFunction计算的时机,当接入的数据元素通过WindowAssigner分配到不同的窗口后,数据元素会被不断地累积在窗口状态中。当满足窗口触发条件时,会取出当前窗口中的所有数据元素,基于指定的WindowFunction对窗口中的数据元素进行运算,最后产生窗口计算结果并发送到下游的算子中。

如图2-18所示,在DataStream API中,所有定义的Window Trigger继承自Trigger基本实现类。每种窗口的触发策略不同,相应的Trigger触发器也有所不同。例如TumblingProcessingTimeWindows对应的默认Trigger为ProcessingTimeTrigger,而SlidingEventTimeWindows默认对应的是EventTimeTrigger。

数据元素接入WindowOperator后,调用窗口触发器的onElement()方法,判断窗口是否满足触发条件。如果满足,则触发窗口计算操作。我们以EventTimeTrigger为例介绍Trigger的核心实现,如代码清单2-41所示。

图2-18 Trigger UML关系图

·当数据元素接入后,根据窗口中maxTimestamp是否大于当前算子中的Watermark决定是否触发窗口计算。如果符合触发条件,则返回TriggerResult.FIRE事件,这里的maxTimestamp实际上是窗口的结束时间减1,属于该窗口的最大时间戳。

·如果不满足以上条件,就会继续向TriggerContext中注册Timer定时器,等待指定的时间再通过定时器触发窗口计算,此时方法会返回TriggerResult.CONTINUE消息给WindowOperator,表示此时窗口不会触发计算,继续等待新的数据接入。

·当数据元素不断接入WindowOperator,不断更新Watermark时,只要Watermark大于窗口的右边界就会触发相应的窗口计算。

代码清单2-41 EventTimeTrigger.onElement()方法定义


public TriggerResult onElement(Object element, long timestamp, TimeWindow 
   window, TriggerContext ctx) throws Exception {
   if (window.maxTimestamp() <= ctx.getCurrentWatermark()) {
      // 如果Watermark超过窗口最大时间戳,则立即执行
      return TriggerResult.FIRE;
   } else {
      ctx.registerEventTimeTimer(window.maxTimestamp());
      return TriggerResult.CONTINUE;
   }
}

在EventTimeTrigger.onElement()方法定义中我们可以看到,当窗口不满足触发条件时,会向TriggerContext中注册EventTimeTimer定时器,指定的触发时间为窗口中的最大时间戳。算子中的Watermark到达该时间戳时,会自动触发窗口计算,不需要等待新的数据元素接入。这里TriggerContext使用到的TimerService实际上就是我们在2.4.2节介绍过的InternalTimerService,EventTimeTimer会基于InternalTimerService的实现类进行存储和管理。

当Timer定时器到达maxTimestamp时就会调用EventTimeTrigger.onEventTime()方法。如代码清单2-42所示,在EventTimeTrigger.onEventTime()方法中,实际上会判断传入的事件时间和窗口的maxTimestamp是否相等,如果相等则返回TriggerResult.FIRE并触发窗口的统计计算。

代码清单2-42 EventTimeTrigger.onEventTime()方法定义


public TriggerResult onEventTime(long time, TimeWindow window, TriggerContext 
   ctx) {
   return time == window.maxTimestamp() ?
      TriggerResult.FIRE :
      TriggerResult.CONTINUE;
}

对于其他类型的窗口触发器,在原理上和EventTimeTrigger基本相同,感兴趣的读者可以阅读相关代码实现。

3.WindowFunction的设计与实现

经过以上几个步骤,基本上就能够确认窗口的类型及相应的触发时机了。窗口符合触发条件之后,就会对窗口中已经积蓄的数据元素进行统计计算,以得到最终的统计结果。对窗口元素的计算逻辑定义则主要通过窗口函数来实现。

在WindowStream的计算中,将窗口函数分为两种类型:用户指定的聚合函数AggregateFunction和专门用于窗口计算的WindowFunction。对于大部分用户来讲,基本都是基于窗口做聚合类型的统计运算,因此只需要在WindowStream中指定相应的聚合函数,如ReduceFunction和AggregateFunction。而在WindowStream的计算过程中,实际上会通过WindowFunction完成更加复杂的窗口计算。

如图2-19所示,WindowFunction继承了Function接口,同时又被不同类型的聚合函数实现,例如实现窗口关联计算的CoGroupWindowFunction、在窗口中对元素进行Reduce操作的ReduceApplyWindowFunction。这些函数同时继承自WrappingFunction,WrappingFunction对WindowFunction进行了一层封装,主要通过继承AbstractRichFunction抽象类,拓展和实现了RichFunction提供的能力。

图2-19 WindowFunction UML类图

总而言之,窗口中的函数会将用户定义的聚合函数和WindowFunction进行整合,形成统一的RichWindowFunction,然后基于RichWindowFunction进行后续的操作。

如代码清单2-43所示,用户创建WindowStream后,将ReduceFunction传递给WindowStream.reduce()方法。在WindowStream.reduce()方法中可以看出,还需要将WindowFunction作为参数,但这里的WindowFunction会在WindowStream中创建PassThroughWindowFunction默认实现类。

代码清单2-43 WindowStream.reduce()方法定义


public <R> SingleOutputStreamOperator<R> reduce(
      ReduceFunction<T> reduceFunction,
      WindowFunction<T, R, K, W> function,
      TypeInformation<R> resultType) {
   if (reduceFunction instanceof RichFunction) {
      throw new UnsupportedOperationException("ReduceFunction of reduce can 
         not be a RichFunction.");
   }
   //清理函数闭包
   function = input.getExecutionEnvironment().clean(function);
   reduceFunction = input.getExecutionEnvironment().clean(reduceFunction);
   final String opName = generateOperatorName(windowAssigner, trigger, 
      evictor, reduceFunction, function);
   KeySelector<T, K> keySel = input.getKeySelector();
   OneInputStreamOperator<T, R> operator;
   if (evictor != null) {
      @SuppressWarnings({"unchecked", "rawtypes"})
      TypeSerializer<StreamRecord<T>> streamRecordSerializer =
         (TypeSerializer<StreamRecord<T>>) new StreamElementSerializer(input.
            getType().createSerializer(getExecutionEnvironment().getConfig()));
      ListStateDescriptor<StreamRecord<T>> stateDesc =
         new ListStateDescriptor<>("window-contents", streamRecordSerializer);
      operator =
         new EvictingWindowOperator<>(windowAssigner,
            windowAssigner.getWindowSerializer(getExecutionEnvironment().
               getConfig()),
            keySel,
            input.getKeyType().createSerializer(getExecutionEnvironment().
               getConfig()),
            stateDesc,
            new InternalIterableWindowFunction<>(new ReduceApplyWindowFunction
               <>(reduceFunction, function)),
            trigger,
            evictor,
            allowedLateness,
            lateDataOutputTag);
   } else {
      ReducingStateDescriptor<T> stateDesc = new ReducingStateDescriptor
         <>("window-contents",
         reduceFunction,
         input.getType().createSerializer(getExecutionEnvironment().
            getConfig()));
      operator =
         new WindowOperator<>(windowAssigner,
            windowAssigner.getWindowSerializer(getExecutionEnvironment().
               getConfig()),
            keySel,
            input.getKeyType().createSerializer(getExecutionEnvironment().
               getConfig()),
            stateDesc,
            new InternalSingleValueWindowFunction<>(function),
            trigger,
            allowedLateness,
            lateDataOutputTag);
   }
   return input.transform(opName, resultType, operator);
}

最后实际上就是创建OneInputStreamOperator实例,StreamOperator会根据evictor数据剔除器是否为空,选择创建EvictingWindowOperator还是WindowOperator。在创建EvictingWindowOperator时,通过调用new ReduceApplyWindowFunction <?> (reduceFunction, function)合并ReduceFunction和WindowFunction,然后转换为InternalIterableWindowFunction函数供WindowOperator使用。接下来调用input.transform()方法将创建好的EvictingWindowOperator或WindowOperator实例添加到OneInputTransformation转换操作中。其他的窗口计算函数和Reduce聚合函数基本一致,这里不再赘述。