- Flink设计与实现:核心原理与源码解析
- 张利兵
- 2307字
- 2021-08-13 17:26:49
2.3.2 SourceFunction与SinkFunction
在DataStream API中,除了有MapFunction、FlatMapFunction等转换函数之外,还有两种比较特殊的Function接口:SourceFunction和SinkFunction。SourceFunction没有具体的数据元素输入,而是通过在SourceFunction实现中与具体数据源建立连接,并读取指定数据源中的数据,然后转换成StreamRecord数据结构发送到下游的Operator中。SinkFunction接口的主要作用是将上游的数据元素输出到外部数据源中。两种函数都具有比较独立的实现逻辑,下面我们分别介绍SourceFunction和SinkFunction的设计和实现。
1.SourceFunction具体实现
如图2-9所示,SourceFunction接口继承了Function接口,并在内部定义了数据读取使用的run()方法和SourceContext内部类,其中SourceContext定义了数据接入过程用到的上下文信息。在默认情况下,SourceFunction不支持并行读取数据,因此SourceFunction被ParallelSourceFunction接口继承,以支持对外部数据源中数据的并行读取操作,比较典型的ParallelSourceFunction实例就是FlinkKafkaConsumer。
图2-9 SourceFunction UML关系图
从图2-9中也可以看出,在SourceFunction的基础上拓展了RichParallelSourceFunction和RichSourceFunction抽象实现类,这使得SourceFunction可以在数据接入的过程中获取RuntimeContext信息,从而实现更加复杂的操作,例如使用OperatorState保存Kafka中数据消费的偏移量,从而实现端到端当且仅被处理一次的语义保障。
如图2-10所示,SourceContext主要用于收集SourceFunction中的上下文信息,SourceContext包含如下方法。
·collect()方法:用于收集从外部数据源读取的数据并下发到下游算子中。
·collectWithTimestamp()方法:支持直接收集数据元素以及EventTime时间戳。
·emitWatermark()方法:用于在SourceFunction中生成Watermark并发送到下游算子进行处理。
·getCheckpointLock()方法:用于获取检查点锁(Checkpoint Lock),例如使用KafkaConsumer读取数据时,可以使用检查点锁,确保记录发出的原子性和偏移状态更新。
从图2-10中可以看出,SourceContext主要有两种类型的实现子类,分别为NonTimestampContext和WatermarkContext。顾名思义,WatermarkContext支持事件时间抽取和生成Watermark,最终用于处理乱序事件;而NonTimestampContext不支持基于事件时间的操作,仅实现了从外部数据源中读取数据并处理的逻辑,主要对应TimeCharacteristic为ProcessingTime的情况。可以看出,用户设定不同的TimeCharacteristic,就会创建不同类型的SourceContext,这里我们梳理SourceContext类型与TimeCharacteristic的对应关系如表2-1所示。
图2-10 SourceContext UML关系图
表2-1 SourceContext类型与TimeCharacteristic的对应关系
其中AutomaticWatermarkContext和ManualWatermarkContext都继承自WatermarkContext抽象类,分别对应接入时间和事件时间。由此也可以看出,接入时间对应的Timestamp和Watermark都是通过Source算子自动生成的。事件时间的实现则相对复杂,需要用户自定义SourceContext.emitWatermark()方法来实现。
同时,SourceFunction接口的实现类主要通过run()方法完成与外部数据源的交互,以实现外部数据的读取,并将读取到的数据通过SourceContext提供的collect()方法发送给DataStream后续的算子进行处理。常见的实现类有ContinuousFileMonitoringFunction、FlinkKafkaConsumer等,这里我们以EventsGeneratorSource为例,简单介绍SourceFunction接口的定义。
如代码清单2-15所示,EventsGeneratorSource通过SourceFunction.run()方法实现了事件的创建和采集,具体创建过程主要通过EventsGenerator完成。实际上,在run()方法中会启动while循环,不断调用EventsGenerator创建新的Event数据,最终通过sourceContext.collect()方法对数据元素进行收集和下发,此时下游算子可以接收到Event数据并进行处理。
代码清单2-15 EventsGeneratorSource.run()方法定义
public void run(SourceContext<Event> sourceContext) throws Exception { final EventsGenerator generator = new EventsGenerator(errorProbability); final int range = Integer.MAX_VALUE / getRuntimeContext(). getNumberOfParallelSubtasks(); final int min = range * getRuntimeContext().getIndexOfThisSubtask(); final int max = min + range; while (running) { sourceContext.collect(generator.next(min, max)); if (delayPerRecordMillis > 0) { Thread.sleep(delayPerRecordMillis); } } }
SourceFunction定义完毕后,会被封装在StreamSource算子中,前面我们已经知道StreamSource继承自AbstractUdfStreamOperator。在StreamSource算子中提供了run()方法实现SourceStreamTask实例的调用和执行,SourceStreamTask实际上是针对Source类型算子实现的StreamTask实现类。
如代码清单2-16所示,StreamSource.run()方法主要包含如下逻辑。
·从OperatorConfig中获取TimeCharacteristic,并从Task的环境信息Environment中获取Configuration配置信息。
·创建LatencyMarksEmitter实例,主要用于在SourceFunction中输出Latency标记,也就是周期性地生成时间戳,当下游算子接收到SourceOperator发送的LatencyMark后,会使用当前的时间减去LatencyMark中的时间戳,以此确认该算子数据处理的延迟情况,最后算子会将LatencyMark监控指标以Metric的形式发送到外部的监控系统中。
·创建SourceContext,这里调用的是StreamSourceContexts.getSourceContext()方法,在该方法中根据TimeCharacteristic参数创建对应类型的SourceContext。
·将SourceContext实例应用在自定义的SourceFunction中,此时SourceFunction能够直接操作SourceContext,例如收集数据元素、输出Watermark事件等。
·调用userFunction.run(ctx)方法,调用和执行SourceFunction实例。
代码清单2-16 StreamSource.run()方法定义
public void run(final Object lockingObject, final StreamStatusMaintainer streamStatusMaintainer, final Output<StreamRecord<OUT>> collector, final OperatorChain<?, ?> operatorChain) throws Exception { // 获取TimeCharacteristic final TimeCharacteristic timeCharacteristic = getOperatorConfig(). getTimeCharacteristic(); // 获取Configuration final Configuration configuration = this.getContainingTask().getEnvironment(). getTaskManagerInfo().getConfiguration(); final long latencyTrackingInterval = getExecutionConfig(). isLatencyTrackingConfigured() ? getExecutionConfig().getLatencyTrackingInterval() : configuration.getLong(MetricOptions.LATENCY_INTERVAL); // 创建LatencyMarksEmitter LatencyMarksEmitter<OUT> latencyEmitter = null; if (latencyTrackingInterval > 0) { latencyEmitter = new LatencyMarksEmitter<>( getProcessingTimeService(), collector, latencyTrackingInterval, this.getOperatorID(), getRuntimeContext().getIndexOfThisSubtask()); } // 获取SourceContext final long watermarkInterval = getRuntimeContext().getExecutionConfig(). getAutoWatermarkInterval(); this.ctx = StreamSourceContexts.getSourceContext( timeCharacteristic, getProcessingTimeService(), lockingObject, streamStatusMaintainer, collector, watermarkInterval, -1); // 运行SourceFunction try { userFunction.run(ctx); if (!isCanceledOrStopped()) { synchronized (lockingObject) { operatorChain.endHeadOperatorInput(1); } } } finally { if (latencyEmitter != null) { latencyEmitter.close(); } } }
需要注意的是,由于未来社区会基于DataStream API实现流批一体,因此SourceFunction后期的变化会比较大,笔者也会持续关注Flink社区的最新动向,并及时跟进相关的设计和实现。
2.SinkFunction具体实现
相比于SourceFunction,SinkFunction的实现相对简单。在SinkFunction中同样需要关注和外部介质的交互,尤其对于支持两阶段提交的数据源来讲,此时需要使用TwoPhaseCommitSinkFunction实现端到端的数据一致性。在SinkFunction中也会通过SinkContext获取与Sink操作相关的上下文信息。
如图2-11所示,SinkFunction继承自Function接口,且SinkFunciton分为WriteSink-Function和RichSinkFunction两种类型的子类,其中WriteSinkFunction实现类已经被废弃,大部分情况下使用的都是RichSinkFunction实现类。常见的RichSinkFunction实现类有SocketClientSink和StreamingFileSink,对于支持两阶段提交的TwoPhaseCommitSinkFunction,实现类主要有FlinkKafkaProducer。
图2-11 SinkFunction UML关系图
从图2-11中也可以看出,和SourceFunction中的SourceContext一样,在SinkFuntion中也会创建和使用SinkContext,以获取Sink操作过程需要的上下文信息。但相比于SourceContext,SinkFuntion中的SinkContext仅包含一些基本方法,例如获取currentProcessingTime、currentWatermark以及Timestamp等变量。
如代码清单2-17所示,在StreamSink Operator中提供了默认SinkContext实现,通过SimpleContext可以从ProcessingTimeservice中获取当前的处理时间、当前最大的Watermark和事件中的Timestamp等信息。
代码清单2-17 SinkContext默认定义SimpleContext实现
private class SimpleContext<IN> implements SinkFunction.Context<IN> { // 处理数据 private StreamRecord<IN> element; // 时间服务 private final ProcessingTimeService processingTimeService; public SimpleContext(ProcessingTimeService processingTimeService) { this.processingTimeService = processingTimeService; } // 获取当前的处理时间 @Override public long currentProcessingTime() { return processingTimeService.getCurrentProcessingTime(); } // 获取当前的Watermark @Override public long currentWatermark() { return currentWatermark; } // 获取数据中的Timestamp @Override public Long timestamp() { if (element.hasTimestamp()) { return element.getTimestamp(); } return null; } }
如代码清单2-18所示,在StreamSink.processElement()方法中,通过调用userFunction.invoke()方法触发Function计算,并将sinkContext作为参数传递到userFunction中使用,此时SinkFunction就能通过SinkContext提供的方法获取相应的时间信息并进行数据处理,实现将数据发送至外部系统的功能。
代码清单2-18 StreamSink.processElement()方法定义
public void processElement(StreamRecord<IN> element) throws Exception { sinkContext.element = element; userFunction.invoke(element.getValue(), sinkContext); }
TwoPhaseCommitSinkFunction主要用于需要严格保证数据当且仅被输出一条的语义保障的场景。在TwoPhaseCommitSinkFunction中实现了和外围数据交互过程的Transaction逻辑,也就是只有当数据真正下发到外围存储介质时,才会认为Sink中的数据输出成功,其他任何因素导致写入过程失败,都会对输出操作进行回退并重新发送数据。目前所有Connector中支持TwoPhaseCommitSinkFunction的只有Kafka消息中间件,且要求Kafka的版本在0.11以上。