2.3.2 SourceFunction与SinkFunction

在DataStream API中,除了有MapFunction、FlatMapFunction等转换函数之外,还有两种比较特殊的Function接口:SourceFunction和SinkFunction。SourceFunction没有具体的数据元素输入,而是通过在SourceFunction实现中与具体数据源建立连接,并读取指定数据源中的数据,然后转换成StreamRecord数据结构发送到下游的Operator中。SinkFunction接口的主要作用是将上游的数据元素输出到外部数据源中。两种函数都具有比较独立的实现逻辑,下面我们分别介绍SourceFunction和SinkFunction的设计和实现。

1.SourceFunction具体实现

如图2-9所示,SourceFunction接口继承了Function接口,并在内部定义了数据读取使用的run()方法和SourceContext内部类,其中SourceContext定义了数据接入过程用到的上下文信息。在默认情况下,SourceFunction不支持并行读取数据,因此SourceFunction被ParallelSourceFunction接口继承,以支持对外部数据源中数据的并行读取操作,比较典型的ParallelSourceFunction实例就是FlinkKafkaConsumer。

图2-9 SourceFunction UML关系图

从图2-9中也可以看出,在SourceFunction的基础上拓展了RichParallelSourceFunction和RichSourceFunction抽象实现类,这使得SourceFunction可以在数据接入的过程中获取RuntimeContext信息,从而实现更加复杂的操作,例如使用OperatorState保存Kafka中数据消费的偏移量,从而实现端到端当且仅被处理一次的语义保障。

如图2-10所示,SourceContext主要用于收集SourceFunction中的上下文信息,SourceContext包含如下方法。

·collect()方法:用于收集从外部数据源读取的数据并下发到下游算子中。

·collectWithTimestamp()方法:支持直接收集数据元素以及EventTime时间戳。

·emitWatermark()方法:用于在SourceFunction中生成Watermark并发送到下游算子进行处理。

·getCheckpointLock()方法:用于获取检查点锁(Checkpoint Lock),例如使用KafkaConsumer读取数据时,可以使用检查点锁,确保记录发出的原子性和偏移状态更新。

从图2-10中可以看出,SourceContext主要有两种类型的实现子类,分别为NonTimestampContext和WatermarkContext。顾名思义,WatermarkContext支持事件时间抽取和生成Watermark,最终用于处理乱序事件;而NonTimestampContext不支持基于事件时间的操作,仅实现了从外部数据源中读取数据并处理的逻辑,主要对应TimeCharacteristic为ProcessingTime的情况。可以看出,用户设定不同的TimeCharacteristic,就会创建不同类型的SourceContext,这里我们梳理SourceContext类型与TimeCharacteristic的对应关系如表2-1所示。

图2-10 SourceContext UML关系图

表2-1 SourceContext类型与TimeCharacteristic的对应关系

其中AutomaticWatermarkContext和ManualWatermarkContext都继承自WatermarkContext抽象类,分别对应接入时间和事件时间。由此也可以看出,接入时间对应的Timestamp和Watermark都是通过Source算子自动生成的。事件时间的实现则相对复杂,需要用户自定义SourceContext.emitWatermark()方法来实现。

同时,SourceFunction接口的实现类主要通过run()方法完成与外部数据源的交互,以实现外部数据的读取,并将读取到的数据通过SourceContext提供的collect()方法发送给DataStream后续的算子进行处理。常见的实现类有ContinuousFileMonitoringFunction、FlinkKafkaConsumer等,这里我们以EventsGeneratorSource为例,简单介绍SourceFunction接口的定义。

如代码清单2-15所示,EventsGeneratorSource通过SourceFunction.run()方法实现了事件的创建和采集,具体创建过程主要通过EventsGenerator完成。实际上,在run()方法中会启动while循环,不断调用EventsGenerator创建新的Event数据,最终通过sourceContext.collect()方法对数据元素进行收集和下发,此时下游算子可以接收到Event数据并进行处理。

代码清单2-15 EventsGeneratorSource.run()方法定义


public void run(SourceContext<Event> sourceContext) throws Exception {
   final EventsGenerator generator = new EventsGenerator(errorProbability);
   final int range = Integer.MAX_VALUE / getRuntimeContext().
      getNumberOfParallelSubtasks();
   final int min = range * getRuntimeContext().getIndexOfThisSubtask();
   final int max = min + range;
   while (running) {
      sourceContext.collect(generator.next(min, max));
      if (delayPerRecordMillis > 0) {
         Thread.sleep(delayPerRecordMillis);
      }
   }
}

SourceFunction定义完毕后,会被封装在StreamSource算子中,前面我们已经知道StreamSource继承自AbstractUdfStreamOperator。在StreamSource算子中提供了run()方法实现SourceStreamTask实例的调用和执行,SourceStreamTask实际上是针对Source类型算子实现的StreamTask实现类。

如代码清单2-16所示,StreamSource.run()方法主要包含如下逻辑。

·从OperatorConfig中获取TimeCharacteristic,并从Task的环境信息Environment中获取Configuration配置信息。

·创建LatencyMarksEmitter实例,主要用于在SourceFunction中输出Latency标记,也就是周期性地生成时间戳,当下游算子接收到SourceOperator发送的LatencyMark后,会使用当前的时间减去LatencyMark中的时间戳,以此确认该算子数据处理的延迟情况,最后算子会将LatencyMark监控指标以Metric的形式发送到外部的监控系统中。

·创建SourceContext,这里调用的是StreamSourceContexts.getSourceContext()方法,在该方法中根据TimeCharacteristic参数创建对应类型的SourceContext。

·将SourceContext实例应用在自定义的SourceFunction中,此时SourceFunction能够直接操作SourceContext,例如收集数据元素、输出Watermark事件等。

·调用userFunction.run(ctx)方法,调用和执行SourceFunction实例。

代码清单2-16 StreamSource.run()方法定义


public void run(final Object lockingObject,
      final StreamStatusMaintainer streamStatusMaintainer,
      final Output<StreamRecord<OUT>> collector,
      final OperatorChain<?, ?> operatorChain) throws Exception {
   // 获取TimeCharacteristic
   final TimeCharacteristic timeCharacteristic = getOperatorConfig().
      getTimeCharacteristic();
   // 获取Configuration
   final Configuration configuration = this.getContainingTask().getEnvironment().
      getTaskManagerInfo().getConfiguration();
   final long latencyTrackingInterval = getExecutionConfig().
      isLatencyTrackingConfigured()
      ? getExecutionConfig().getLatencyTrackingInterval()
      : configuration.getLong(MetricOptions.LATENCY_INTERVAL);
   // 创建LatencyMarksEmitter
   LatencyMarksEmitter<OUT> latencyEmitter = null;
   if (latencyTrackingInterval > 0) {
      latencyEmitter = new LatencyMarksEmitter<>(
         getProcessingTimeService(),
         collector,
         latencyTrackingInterval,
         this.getOperatorID(),
         getRuntimeContext().getIndexOfThisSubtask());
   }
   // 获取SourceContext
   final long watermarkInterval = getRuntimeContext().getExecutionConfig().
      getAutoWatermarkInterval();
   this.ctx = StreamSourceContexts.getSourceContext(
      timeCharacteristic,
      getProcessingTimeService(),
      lockingObject,
      streamStatusMaintainer,
      collector,
      watermarkInterval,
      -1);
   // 运行SourceFunction
   try {
      userFunction.run(ctx);
      if (!isCanceledOrStopped()) {
         synchronized (lockingObject) {
            operatorChain.endHeadOperatorInput(1);
         }
      }
   } finally {
      if (latencyEmitter != null) {
         latencyEmitter.close();
      }
   }
}

需要注意的是,由于未来社区会基于DataStream API实现流批一体,因此SourceFunction后期的变化会比较大,笔者也会持续关注Flink社区的最新动向,并及时跟进相关的设计和实现。

2.SinkFunction具体实现

相比于SourceFunction,SinkFunction的实现相对简单。在SinkFunction中同样需要关注和外部介质的交互,尤其对于支持两阶段提交的数据源来讲,此时需要使用TwoPhaseCommitSinkFunction实现端到端的数据一致性。在SinkFunction中也会通过SinkContext获取与Sink操作相关的上下文信息。

如图2-11所示,SinkFunction继承自Function接口,且SinkFunciton分为WriteSink-Function和RichSinkFunction两种类型的子类,其中WriteSinkFunction实现类已经被废弃,大部分情况下使用的都是RichSinkFunction实现类。常见的RichSinkFunction实现类有SocketClientSink和StreamingFileSink,对于支持两阶段提交的TwoPhaseCommitSinkFunction,实现类主要有FlinkKafkaProducer。

图2-11 SinkFunction UML关系图

从图2-11中也可以看出,和SourceFunction中的SourceContext一样,在SinkFuntion中也会创建和使用SinkContext,以获取Sink操作过程需要的上下文信息。但相比于SourceContext,SinkFuntion中的SinkContext仅包含一些基本方法,例如获取currentProcessingTime、currentWatermark以及Timestamp等变量。

如代码清单2-17所示,在StreamSink Operator中提供了默认SinkContext实现,通过SimpleContext可以从ProcessingTimeservice中获取当前的处理时间、当前最大的Watermark和事件中的Timestamp等信息。

代码清单2-17 SinkContext默认定义SimpleContext实现


private class SimpleContext<IN> implements SinkFunction.Context<IN> {
   // 处理数据
   private StreamRecord<IN> element;
   // 时间服务
   private final ProcessingTimeService processingTimeService;
   public SimpleContext(ProcessingTimeService processingTimeService) {
      this.processingTimeService = processingTimeService;
   }
   // 获取当前的处理时间
   @Override
   public long currentProcessingTime() {
      return processingTimeService.getCurrentProcessingTime();
   }
   // 获取当前的Watermark
   @Override
   public long currentWatermark() {
      return currentWatermark;
   }
   // 获取数据中的Timestamp
   @Override
   public Long timestamp() {
      if (element.hasTimestamp()) {
         return element.getTimestamp();
      }
      return null;
   }
}

如代码清单2-18所示,在StreamSink.processElement()方法中,通过调用userFunction.invoke()方法触发Function计算,并将sinkContext作为参数传递到userFunction中使用,此时SinkFunction就能通过SinkContext提供的方法获取相应的时间信息并进行数据处理,实现将数据发送至外部系统的功能。

代码清单2-18 StreamSink.processElement()方法定义


public void processElement(StreamRecord<IN> element) throws Exception {
   sinkContext.element = element;
   userFunction.invoke(element.getValue(), sinkContext);
}

TwoPhaseCommitSinkFunction主要用于需要严格保证数据当且仅被输出一条的语义保障的场景。在TwoPhaseCommitSinkFunction中实现了和外围数据交互过程的Transaction逻辑,也就是只有当数据真正下发到外围存储介质时,才会认为Sink中的数据输出成功,其他任何因素导致写入过程失败,都会对输出操作进行回退并重新发送数据。目前所有Connector中支持TwoPhaseCommitSinkFunction的只有Kafka消息中间件,且要求Kafka的版本在0.11以上。