2.2.2 OneInputStreamOperator与TwoInputStreamOperator

StreamOperator根据输入流的数量分为两种类型,即支持单输入流的OneInputStreamOperator以及支持双输入流的TwoInputStreamOperator,我们可以将其称为一元输入算子和二元输入算子。下面介绍OneInputStreamOperator和TwoInputStreamOperator的区别。

1.OneInputStreamOperator的实现

OneInputStreamOperator定义了单输入流的StreamOperator,常见的实现类有StreamMap、StreamFilter等算子。OneInputStreamOperator接口主要包含以下方法,专门用于处理接入的单输入数据流,如代码清单2-8所示。

代码清单2-8 OneInputStreamOperator接口定义的主要方法


// 处理输入数据元素的方法
void processElement(StreamRecord<IN> element) throws Exception;
// 处理Watermark的方法
void processWatermark(Watermark mark) throws Exception;
// 处理延时标记的方法
void processLatencyMarker(LatencyMarker latencyMarker) throws Exception;

我们以StreamFilter算子为例,介绍OneInputStreamOperator的实现,如代码清单2-9所示。

·StreamFilter算子在继承AbstractUdfStreamOperator的同时,实现了OneInputStreamOperator接口。

·在StreamFilter算子构造器中,内部的Function类型为FilterFunction,并设定上下游算子的链接策略为ChainingStrategy.ALWAYS,也就是该类型的Operator通常都会与上下游的Operator连接在一起,形成OperatorChain。

·在StreamFilter中实现了OneInputStreamOperator的processElement()方法,通过该方法定义了具体的数据元素处理逻辑。实际上就是使用定义的filterFunction对接入的数据进行筛选,然后通过output.collect(element)方法将符合的条件输出到下游算子中。

代码清单2-9 StreamFilter Class的定义和实现


public class StreamFilter<IN> extends AbstractUdfStreamOperator<IN, 
   FilterFunction<IN>> implements OneInputStreamOperator<IN, IN> {
   private static final long serialVersionUID = 1L;
   // 初始化FilterFunction并设定ChainingStrategy.ALWAYS
   public StreamFilter(FilterFunction<IN> filterFunction) {
      super(filterFunction);
      chainingStrategy = ChainingStrategy.ALWAYS;
   }
   @Override
   public void processElement(StreamRecord<IN> element) throws Exception {
      // 执行userFunction.filter()方法
      if (userFunction.filter(element.getValue())) {
         output.collect(element);
      }
   }
}

2.TwoInputStreamOperator的实现

TwoInputStreamOperator定义了双输入流类型的StreamOperator接口实现,常见的实现类有CoStreamMap、HashJoinOperator等算子。代码清单2-10是TwoInputStreamOperator接口定义的主要方法,在实现对两个数据流转换操作的同时,还定义了两条数据流中Watermark和LatencyMarker的处理逻辑。

代码清单2-10 TwoInputStreamOperator接口定义的主要方法


// 处理输入源1的数据元素方法
void processElement1(StreamRecord<IN1> element) throws Exception;
// 处理输入源2的数据元素方法
void processElement2(StreamRecord<IN2> element) throws Exception;
// 处理输入源1的Watermark方法
void processWatermark1(Watermark mark) throws Exception;
// 处理输入源2的Watermark方法
void processWatermark2(Watermark mark) throws Exception;
// 处理输入源1的LatencyMarker方法
void processLatencyMarker1(LatencyMarker latencyMarker) throws Exception;
// 处理输入源2的LatencyMarker方法
void processLatencyMarker2(LatencyMarker latencyMarker) throws Exception;

如代码清单2-11所示,我们以CoStreamMap为例,介绍TwoInputStreamOperator算子的具体实现。从CoStreamMap算子定义中可以看出,CoStreamMap继承AbstractUdfStreamOperator的同时,实现了TwoInputStreamOperator接口。其中在processElement1()和processElement2()两个方法的实现中,分别调用了用户定义的CoMapFunction的map1()和map2()方法对输入的数据元素Input1和Input2进行处理。经过函数处理后的结果会通过output.collect()接口推送到下游的Operator中。

代码清单2-11 CoStreamMap Class定义和实现


public class CoStreamMap<IN1, IN2, OUT>
      extends AbstractUdfStreamOperator<OUT, CoMapFunction<IN1, IN2, OUT>>
      implements TwoInputStreamOperator<IN1, IN2, OUT> {
   private static final long serialVersionUID = 1L;
   public CoStreamMap(CoMapFunction<IN1, IN2, OUT> mapper) {
      super(mapper);
   }
   @Override
   public void processElement1(StreamRecord<IN1> element) throws Exception {
      output.collect(element.replace(userFunction.map1(element.getValue())));
   }
   @Override
   public void processElement2(StreamRecord<IN2> element) throws Exception {
      output.collect(element.replace(userFunction.map2(element.getValue())));
   }
}