2.2.1 StreamOperator接口实现

接下来我们深入了解StreamOperator接口的定义。如图2-4所示,StreamOperator接口实现的方法主要供Task调用和执行。

图2-4 StreamOperator接口

如图2-4所示,StreamOperator接口主要包括如下核心方法。

·open():定义当前Operator的初始化方法,在数据元素正式接入Operator运算之前,Task会调用StreamOperator.open()方法对该算子进行初始化,具体open()方法的定义由子类实现,常见的用法如调用RichFunction中的open()方法创建相应的状态变量。

·close():当所有的数据元素都添加到当前Operator时,就会调用该方法刷新所有剩余的缓冲数据,保证算子中所有数据被正确处理。

·dispose():算子生命周期结束时会调用此方法,包括算子操作执行成功、失败或者取消时。

·prepareSnapshotPreBarrier():在StreamOperator正式执行checkpoint操作之前会调用该方法,目前仅在MapBundleOperator算子中使用该方法。

·snapshotState():当SubTask执行checkpoint操作时会调用该方法,用于触发该Operator中状态数据的快照操作。

·initializeState():当算子启动或重启时,调用该方法初始化状态数据,当恢复作业任务时,算子会从检查点(checkpoint)持久化的数据中恢复状态数据。

1.AbstractStreamOperator的基本实现

AbstractStreamOperator作为StreamOperator的基本实现类,所有的Operator都会继承和实现该抽象实现类。在AbstractStreamOperator中定义了Operator用到的基础方法和成员信息。如图2-5所示,我们重点梳理AbstractStreamOperator的主要成员变量和方法。

AbstractStreamOperator包含的主要成员变量如下。

·ChainingStrategy chainingStrategy:用于指定Operator的上下游算子链接策略,其中ChainStrategy可以是ALWAYS、NEVER或HEAD类型,该参数实际上就是转换过程中配置的链接策略。

·StreamTask<?, ?> container:表示当前Operator所属的StreamTask,最终会通过StreamTask中的invoke()方法执行当前StreamTask中的所有Operator。

图2-5 AbstractStreamOperator UML关系图

·StreamConfig config:存储了该StreamOperator的配置信息,实际上是对Configuration参数进行了封装。

·Output<StreamRecord<OUT>> output:定义了当前StreamOperator的输出操作,执行完该算子的所有转换操作后,会通过Output组件将数据推送到下游算子继续执行。

·StreamingRuntimeContext runtimeContext:主要定义了UDF执行过程中的上下文信息,例如获取累加器、状态数据。

·KeySelector<?, ?> stateKeySelector1:只有DataStream经过keyBy()转换操作生成KeyedStream后,才会设定该算子的stateKeySelector1变量信息。

·KeySelector<?, ?> stateKeySelector2:只在执行两个KeyedStream关联操作时使用,例如Join操作,在AbstractStreamOperator中会保存stateKeySelector2的信息。

·AbstractKeyedStateBackend<?> keyedStateBackend:用于存储KeyedState的状态管理后端,默认为HeapKeyedStateBackend。如果配置RocksDB作为状态存储后端,则此处为RocksDBKeyedStateBackend。

·DefaultKeyedStateStore keyedStateStore:主要提供KeyedState的状态存储服务,实际上是对KeyedStateBackend进行封装并提供了不同类型的KeyedState获取方法,例如通过getReducingState(ReducingStateDescriptor stateProperties)方法获取ReducingState。

·OperatorStateBackend operatorStateBackend:和keyedStateBackend相似,主要提供OperatorState对应的状态后端存储,默认OperatorStateBackend只有DefaultOperatorStateBackend实现。

·OperatorMetricGroup metrics:用于记录当前算子层面的监控指标,包括numRecordsIn、numRecordsOut、numRecordsInRate、numRecordsOutRate等。

·LatencyStats latencyStats:用于采集和汇报当前Operator的延时状况。

·ProcessingTimeService processingTimeService:基于ProcessingTime的时间服务,实现ProcessingTime时间域操作,例如获取当前ProcessingTime,然后创建定时器回调等。

·InternalTimeServiceManager<?>timeServiceManager:Flink内部时间服务,和processingTimeService相似,但支持基于事件时间的时间域处理数据,还可以同时注册基于事件时间和处理时间的定时器,例如在窗口、CEP等高级类型的算子中,会在ProcessFunction中通过timeServiceManager注册Timer定时器,当事件时间或处理时间到达指定时间后执行Timer定时器,以实现复杂的函数计算。

·long combinedWatermark:在双输入类型的算子中,如果基于事件时间处理乱序事件,会在AbstractStreamOperator中合并输入的Watermark,选择最小的Watermark作为合并后的指标,并存储在combinedWatermark变量中。

·long input1Watermark:二元输入算子中input1对应的Watermark大小。

·long input2Watermark:二元输入算子中input2对应的Watermark大小。

AbstractStreamOperator除了定义主要的成员变量之外,还定义了子类实现的基本抽象方法。

·processLatencyMarker():用于处理在SourceOperator中产生的LatencyMarker信息。在当前Operator中会计算事件和LatencyMarker之间的差值,用于评估当前算子的延时程度。

·processWatermark():用于处理接入的Watermark时间戳信息,并用最新的Watermark更新当前算子内部的时钟。

·getInternalTimerService():提供子类获取InternalTimerService的方法,以实现不同类型的Timer注册操作。

2.AbstractUdfStreamOperator基本实现

当StreamOperator涉及自定义用户函数数据转换处理时,对应的Operator会继承AbstractUdfStreamOperator抽象实现类,常见的有StreamMap、CoProcessOperator等算子。当然,并不是所有的Operator都继承自AbstractUdfStreamOperator。在Flink Table API模块实现的算子中,都会直接继承和实现AbstractStreamOperator抽象实现类。另外,有状态查询的AbstractQueryableStateOperator也不需要使用用户自定义函数处理数据。

AbstractUdfStreamOperator继承自AbstractStreamOperator抽象类,对于AbstractUdfStreamOperator抽象类来讲,最重要的拓展就是增加了成员变量userFunction,且提供了userFunction初始化以及状态持久化的抽象方法。下面我们简单介绍AbstractUdfStreamOperator提供的主要方法。

如代码清单2-4所示,在AbstractUdfStreamOperator.setup()方法中会调用FunctionUtils为userFunction设定RuntimeContext变量。此时userFunction能够获取RuntimeContext变量,然后实现获取状态数据等操作。

代码清单2-4 AbstractUdfStreamOperator.setup()方法定义


public void setup(StreamTask<?, ?> containingTask, StreamConfig config, 
   Output<StreamRecord<OUT>> output) {
   super.setup(containingTask, config, output);
   FunctionUtils.setFunctionRuntimeContext(userFunction, getRuntimeContext());
}

如代码清单2-5所示,在AbstractUdfStreamOperator.snapshotState()方法中调用了StreamingFunctionUtils.snapshotFunctionState()方法,以实现对userFunction中的状态进行快照操作。

代码清单2-5 AbstractUdfStreamOperator.snapshotState()方法


public void snapshotState(StateSnapshotContext context) throws Exception {
   super.snapshotState(context);
   StreamingFunctionUtils.snapshotFunctionState(context, 
      getOperatorStateBackend(), userFunction);
}

如代码清单2-6所示,在initializeState()方法中调用StreamingFunctionUtils.restoreFunctionState()方法初始化userFunction的状态值。

代码清单2-6 AbstractUdfStreamOperator.initializeState()方法定义


public void initializeState(StateInitializationContext context) throws Exception {
   super.initializeState(context);
   StreamingFunctionUtils.restoreFunctionState(context, userFunction);
}

如代码清单2-7所示,在AbstractUdfStreamOperator.open()方法中调用了FunctionUtils.openFunction()方法。当用户自定义并实现RichFunction时,FunctionUtils.openFunction()方法会调用RichFunction.open()方法,完成用户自定义状态的创建和初始化。

代码清单2-7 AbstractUdfStreamOperator.open()方法定义


public void open() throws Exception {
   super.open();
   FunctionUtils.openFunction(userFunction, new Configuration());
}

可以看出,当用户自定义实现Function时,在AbstractUdfStreamOperator抽象类中提供了对这些Function的初始化操作,也就实现了Operator和Function之间的关联。Operator也是Function的载体,具体数据处理操作借助Operator中的Function进行。StreamOperator提供了执行Function的环境,包括状态数据管理和处理Watermark、LatencyMarker等信息。