3.1.2 集群的启动与初始化

Flink的集群模式主要分为Per-Job和Session两种,其中Per-Job集群模式为每一个提交的Job单独创建一套完整的运行时集群环境,该Job独享运行时集群使用的计算资源以及组件服务。与Per-Job集群相比,Session集群能够运行多个Flink作业,且这些作业可以共享运行时中的Dispatcher、ResourceManager等组件服务。两种集群运行模式各有特点和使用范围,从资源利用的角度看,Session集群资源的使用率相对高一些,Per-Job集群任务之间的资源隔离性会好一些。

不管是哪种类型的集群,集群运行时环境中涉及的核心组件都是一样的,主要的区别集中在作业的提交和运行过程中。这里我们以Session类型集群为例,继续介绍集群运行时,对于Per-Job类型集群单独的实现部分,我们会在介绍过程中进行特殊说明。

1.ClusterEntrypoint详解

当用户指定Session Cli命令启动集群时,首先会在Flink集群启动脚本中调用ClusterEntrypoint抽象实现类中提供的main()方法,以启动和运行相应类型的集群环境。ClusterEntrypoint是整个集群运行时的启动入口类,且内部带有main()方法。运行时管理节点中,所有服务都通过ClusterEntrypoint进行触发和启动,进而完成核心组件的创建和初始化。

如图3-2所示,ClusterEntrypoint会根据集群运行模式,将ClusterEntrypoint分为SessionClusterEntrypoint和JobClusterEntrypoint两种基本实现类。顾名思义,SessionClusterEntrypoint是Session类型集群的入口启动类,JobClusterEntrypoint是Per-Job类型集群的入口启动类。在集群运行模式基本类的基础上,衍生出了集群资源管理器对应的ClusterEntrypoint实现类,例如YarnJobClusterEntrypoint、StandaloneJobClusterEntrypoint等。

从图3-2中可以看出,SessionClusterEntrypoint的实现类有YarnSessionClusterEntrypoint、StandaloneSessionClusterEntrypoint以及KubernetesSessionClusterEntrypoint等。JobClusterEntrypoint的实现类主要有YarnJobClusterEntrypoint、StandaloneJobClusterEntrypoint和MesosJobClusterEntrypoint。用户创建和启动的集群类型不同,最终通过不同的ClusterEntrypoint实现类启动对应类型的集群运行环境。

2.通过ClusterEntrypoint启动集群服务

如图3-3所示,我们以StandaloneSessionClusterEntrypoint为例说明StandaloneSession集群的启动过程,并介绍其主要核心组件的创建和初始化方法。关于其他类型的集群创建,我们将在第4章重点讲解。

从图3-3中我们可以看出,集群初始化过程主要包含如下步骤。

·用户运行start-cluster.sh命令启动StandaloneSession集群,此时在启动脚本中会启动StandaloneSessionClusterEntrypoint入口类。

·在StandaloneSessionClusterEntrypoint.main方法中创建StandaloneSessionClusterEntrypoint实例,然后将该实例传递至抽象实现类ClusterEntrypoint.runClusterEntrypoint(entrypoint)方法继续后续流程。

·在ClusterEntrypoint中调用clusterEntrypoint.startCluster()方法启动指定的ClusterEntrypoint实现类。

·调用基本实现类ClusterEntrypoint.runCluster()私有方法启动集群服务和组件。

·调用ClusterEntrypoint.initializeServices(configuration)内部方法,初始化运行时集群需要创建的基础组件服务,如HAServices、CommonRPCService等。

·调用ClusterEntrypoint.createDispatcherResourceManagerComponentFactory()子类实现方法,创建DispatcherResourceManagerComponentFactory对象,在本实例中会调用StandaloneSessionClusterEntrypoint实现的方法,其他类型的集群环境会根据不同实现,创建不同类型的DispatcherResourceManager工厂类。

·在StandaloneSessionClusterEntrypoint.createDispatcherResourceManagerComponentFacto-ry()方法中最终调用DefaultDispatcherResourceManagerComponentFactory.createSessionComponentFactory()方法,创建基于Session模式的DefaultDispatcherResourceMana-gerComponentFactory。

图3-2 ClusterEntrypoint UML关系图

图3-3 StandaloneSession集群启动流程

·基于前面创建的基础服务,调用DispatcherResourceManagerComponentFactory.create()方法创建集群核心组件封装类DispatcherResourceManagerComponent,可以看出核心组件实际上包括了Dispatcher和ResourceManager。

·在DispatcherResourceManagerComponentFactory.create()方法中,首先创建和启动WebMonitorEndpoint对象,作为Dispatcher对应的Rest endpoint,通过Rest API将JobGraph提交到Dispatcher上,同时,WebMonitorEndpoint也会提供Web UI需要的Rest API接口实现。

·调用ResourceManagerFactory.createResourceManager()方法创建ResourceManager组件并启动。

·调用DispatcherRunnerFactory.createDispatcherRunner()方法创建DispatcherRunner组件后,启动DispatcherRunner服务。

·将创建好的WebMonitorEndpoint、ResourceManager和DispatcherRunner封装到DispatcherResourceManagerComponent中,其中还包括DispatcherRunner和ResourceManager对应的高可用管理服务dispatcherLeaderRetrievalService和resourceManagerRetrievalService。

以下我们从源码层面介绍ClusterEntrypoint涉及的重点步骤。

如代码清单3-1所示,ClusterEntrypoint提供了实现子类的runClusterEntrypoint()静态方法。例如在StandaloneSessionClusterEntrypoint中,main()方法会调用runClusterEntrypoint()方法,触发集群启动的进程。

代码清单3-1 ClusterEntrypoint.runClusterEntrypoint()方法


public static void runClusterEntrypoint(ClusterEntrypoint clusterEntrypoint) {
   final String clusterEntrypointName = clusterEntrypoint.getClass().
      getSimpleName();
   try {
      // 调用clusterEntrypoint.startCluster()方法
      clusterEntrypoint.startCluster();
   } catch (ClusterEntrypointException e) {
      LOG.error(String.format("Could not start cluster entrypoint %s.", 
         clusterEntrypointName), e);
      System.exit(STARTUP_FAILURE_RETURN_CODE);
   }
       // 此处省略部分代码
}

接着在clusterEntrypoint.startCluster()方法中调用ClusterEntrypoint.runCluster()的内部方法创建集群组件。如代码清单3-2所示,ClusterEntrypoint.runCluster()方法主要包含如下步骤。

·调用ClusterEntrypoint.initializeServices()方法,完成集群需要的基础服务初始化操作。

·将创建和初始化RPC服务地址和端口配置写入configuration,以便在接下来创建的组件中使用。

·调用createDispatcherResourceManagerComponentFactory()抽象方法,创建对应集群的DispatcherResourceManagerComponentFactory。

·通过dispatcherResourceManagerComponentFactory的实现类创建clusterComponent,也就是运行时中使用的组件服务。

·向clusterComponent的ShutDownFuture对象中添加需要在集群停止后执行的异步操作。

代码清单3-2 ClusterEntrypoint.runCluster()方法


private void runCluster(Configuration configuration) throws Exception {
   synchronized (lock) {
      // 通过configuration初始化集群服务
      initializeServices(configuration);
      // 将RPC服务中的端口和地址写入configuration
      configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.
         getAddress());
      configuration.setInteger(JobManagerOptions.PORT, commonRpcService.
         getPort());
      // 创建DispatcherResourceManagerComponentFactory,创建方法由子类实现
      final DispatcherResourceManagerComponentFactory dispatcherResourceMana
         gerComponentFactory = createDispatcherResourceManagerComponentFactory
            (configuration);
      // 通过dispatcherResourceManagerComponentFactory创建clusterComponent
      clusterComponent = dispatcherResourceManagerComponentFactory.create(
         configuration,
         ioExecutor,
         commonRpcService,
         haServices,
         blobServer,
         heartbeatServices,
         metricRegistry,
         archivedExecutionGraphStore,
         new RpcMetricQueryServiceRetriever(metricRegistry.getMetricQueryService
            RpcService()),
         this);
      // 向clusterComponent的ShutDownFuture中添加操作
      clusterComponent.getShutDownFuture().whenComplete(
         (ApplicationStatus applicationStatus, Throwable throwable) -> {
            if (throwable != null) {
               shutDownAsync(
                  ApplicationStatus.UNKNOWN,
                  ExceptionUtils.stringifyException(throwable),
                  false);
            } else {
               shutDownAsync(
                  applicationStatus,
                  null,
                  true);
            }
         });
   }
}

如代码清单3-3所示,在ClusterEntrypoint.initializeServices()方法中初始化了集群组件需要的多种服务,方法包括如下步骤。

·从configuration中获取配置的RPC地址和portRange参数,根据配置地址和端口信息创建集群所需的公用commonRpcService服务。更新configuration中的address和port配置,用于支持集群组件高可用服务。

·创建ioExecutor线程池,用于集群组件的I/O操作,如本地文件数据读取和输出等。

·创建并启动haService,向集群组件提供高可用支持,集群中的组件都会通过haService创建高可用服务。

·创建并启动blobServer,存储集群需要的Blob对象数据,blobServer中存储的数据能够被JobManager以及TaskManager访问,例如JobGraph中的JAR包等数据。

·创建heartbeatServices,主要用于创建集群组件之间的心跳检测,例如ResourceManager与JobManager之间的心跳服务。

·创建metricRegistry服务,用于注册集群监控指标收集。

·创建archivedExecutionGraphStore服务,用于压缩并存储集群中的ExecutionGraph,主要有FileArchivedExecutionGraphStore和MemoryArchivedExecutionGraphStore两种实现类型。

代码清单3-3 ClusterEntrypoint.initializeServices()方法


protected void initializeServices(Configuration configuration) throws Exception {
      // 初始化集群服务
   LOG.info("Initializing cluster services.");
      // 加锁处理
   synchronized (lock) {
      //获取RPC地址和端口
      final String bindAddress = configuration.getString(JobManagerOptions.
         ADDRESS);
      final String portRange = getRPCPortRange(configuration);
      // 根据配置的地址和端口创建公用的RpcService
      commonRpcService = createRpcService(configuration, bindAddress, 
         portRange);
      // 根据创建好的commonRpcService,更新configuration中的address和port配置,
         用于创建高可用服务
      configuration.setString(JobManagerOptions.ADDRESS, commonRpcService.
         getAddress());
      configuration.setInteger(JobManagerOptions.PORT, commonRpcService.
         getPort());
      // 创建ioExecutor线程池,提供集群中的I/O操作
      ioExecutor = Executors.newFixedThreadPool(
         Hardware.getNumberCPUCores(),
         new ExecutorThreadFactory("cluster-io"));
      // 创建高可用服务,提供集群高可用支持
      haServices = createHaServices(configuration, ioExecutor);
      // 创建并启动BlobServer,用于存储对象数据,例如JobGraph中的JAR包等
      blobServer = new BlobServer(configuration, haServices.
         createBlobStore());
      blobServer.start();
      // 创建heartbeatServices,用于在组件和组件之间进行心跳检测
      heartbeatServices = createHeartbeatServices(configuration);
      // 创建metricRegistry
      metricRegistry = createMetricRegistry(configuration);
      final RpcService metricQueryServiceRpcService = MetricUtils.startMetrics
         RpcService(configuration, bindAddress);
      metricRegistry.startQueryService(metricQueryServiceRpcService, null);
      final String hostname = RpcUtils.getHostname(commonRpcService);
      // 创建processMetricGroup,用于监控集群系统指标
      processMetricGroup = MetricUtils.instantiateProcessMetricGroup(
         metricRegistry,
         hostname,
         ConfigurationUtils.getSystemResourceMetricsProbingInterval(configuration));
      // 创建archivedExecutionGraphStore,不同的集群类型创建不同的ExecutionGraphStore
      archivedExecutionGraphStore = createSerializableExecutionGraphStore
         (configuration, commonRpcService.getScheduledExecutor());
   }
}