3.2.2 WebMonitorEndpoint的创建与初始化

WebMonitorEndpoint基于Netty通信框架实现了Restful的服务后端,提供Restful接口支持Flink Web页面在内的所有Rest请求,例如获取集群监控指标。如图3-5所示,WebMonitorEndpoint的创建过程比较简单,并且RestEndpoint实现了针对Session和JobCluster集群的SessionRestEndpointFactory和JobRestEndpointFactory两种工厂创建类。

从图3-5中也可以看出,WebMonitorEndpoint继承了RestServerEndpoint基本实现类,其中RestServerEndpoint基于Netty框架实现了Rest服务后端,并提供了自定义Handler的初始化和实现抽象方法。WebMonitorEndpoint和DispatcherRestEndpoint等子类能够拓展处理各自业务的Rest接口对应的Handlers实现。对于WebMonitorEndpoint的另一个实现类MiniDispatcherRestEndpoint,主要是针对本地执行实现的mini版DispatcherRestEndpoint,区别在于MiniDispatcherRestEndpoint不用加载JobGraph提交使用的Handlers,这是因为MiniDispatcherRestEndpoint不支持通过RestAPI提交JobGraph。在IDEA中执行作业时创建的实际上是MiniCluster,而在MiniCluster中对应的WebMonitorEndpoint实现是Mini-DispatcherRestEndpoint。

图3-5 WebMonitorEndpoint UML关系图

1.创建DispatcherRestEndpoint

如代码清单3-13所示,我们还是以StandaloneSession集群为例进行说明。Dispatcher-RestEndpoint主要通过SessionRestEndpointFactory创建,创建方法涉及参数如下。

·configuration:集群配置参数。

·dispatcherGatewayRetriever:DispatcherGateway服务地址获取器,用于获取当前活跃的dispatcherGateway地址。基于dispatcherGateway可以实现与Dispatcher的RPC通信,最终提交的JobGraph通过dispatcherGateway发送给Dispatcher组件。

·resourceManagerGatewayRetriever:ResourceManagerGateway服务地址获取器,用于获取当前活跃的ResourceManagerGateway地址,通过ResourceManagerGateway实现ResourceManager组件之间的RPC通信,例如在TaskManagersHandler中通过调用ResourceManagerGateway获取集群中的TaskManagers监控信息。

·transientBlobService:临时二进制对象数据存储服务,BlobServer接收数据后,会及时清理Cache中的对象数据。

·executor:用于处理WebMonitorEndpoint请求的线程池服务。

·metricFetcher:用于拉取JobManager和TaskManager上的Metric监控指标。

·leaderElectionService:用于在高可用集群中启动和选择服务的Leader节点,如通过leaderElectionService启动WebMonitorEndpoint RPC服务,然后将Leader节点注册至ZooKeeper,以此实现WebMonitorEndpoint服务的高可用。

·fatalErrorHandler:异常处理器,当WebMonitorEndpoint出现异常时调用fatalError-Handler的中处理接口。

代码清单3-13 SessionRestEndpointFactory.createRestEndpoint()方法


public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint(
      Configuration configuration,
      LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever,
      LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGateway
         Retriever,
      TransientBlobService transientBlobService,
      ExecutorService executor,
      MetricFetcher metricFetcher,
      LeaderElectionService leaderElectionService,
      FatalErrorHandler fatalErrorHandler) throws Exception {
    // 通过Configuration获取RestHandlerConfiguration
   final RestHandlerConfiguration restHandlerConfiguration = 
      RestHandlerConfiguration.fromConfiguration(configuration);
    //创建DispatcherRestEndpoint
   return new DispatcherRestEndpoint(
      RestServerEndpointConfiguration.fromConfiguration(configuration),
      dispatcherGatewayRetriever,
      configuration,
      restHandlerConfiguration,
      resourceManagerGatewayRetriever,
      transientBlobService,
      executor,
      metricFetcher,
      leaderElectionService,
      fatalErrorHandler);
}

2.启动RestServerEndpoint

接下来我们深入了解RestServerEndpoint.start()方法的实现,如代码清单3-14所示,方法主要包含如下逻辑。

·检查RestServerEndpoint.state是否为CREATED状态。

·启动Rest Endpoint,创建Handler使用的路由类Router,用于根据地址寻找对应的Handlers。

·调用initializeHandlers()方法初始化子类注册的Handlers,例如WebMonitorEndpoint中的Handlers实现。

·调用registerHandler()方法注册已经加载的Handlers。

·创建ChannelInitializer服务,初始化Netty中的Channel,在initChannel()方法中设定SocketChannel中Pipeline使用的拦截器。在Netty中使用ServerBootstrap或者bootstrap启动服务端或者客户端时,会为每个Channel链接创建一个独立的Pipeline,此时需要将自定义的Handler加入Pipeline。这里实际上会将加载的Handlers加入创建的Pipeline。在Pipeline中也会按照顺序在尾部增加HttpServerCodec、FileUpload-Handler以及ChunkedWriteHandler等基础Handlers处理器。

·创建bossGroup和workerGroup两个NioEventLoopGroup实例,可以将其理解为两个线程池,bossGroup设置了一个用于处理连接请求和建立连接的线程,workGroup用于在连接建立之后处理I/O请求。

·创建ServerBootstrap启动类并绑定bossGroup、workerGroup和initializer等参数。

·为了防止出现端口占用的情况,从restBindPortRange中抽取端口范围。使用bootstrap.bind(chosenPort)按照顺序进行绑定,如果绑定成功则调用bind()方法,启动Server-Bootstrap服务,此时Web端口(默认为8081)就可以正常访问了。

·将RestServerEndpoint中的状态设定为RUNNING,调用WebMonitorEndpoint.startInternal()方法,启动RPC高可用服务。

代码清单3-14 RestServerEndpoint.start()方法


public final void start() throws Exception {
   synchronized (lock) {
      //检查RestServerEndpoint.state是否为CREATED
      Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint 
         cannot be restarted.");
      // 启动Rest Endpoint
      log.info("Starting rest endpoint.");
      final Router router = new Router();
      final CompletableFuture<String> restAddressFuture = new 
         CompletableFuture<>();
      // 调用子类初始化Handlers
      handlers = initializeHandlers(restAddressFuture);
      // handlers进行排序处理
      Collections.sort(
         handlers,
         RestHandlerUrlComparator.INSTANCE);
      // 调用registerHandler()方法
      handlers.forEach(handler -> {
         registerHandler(router, handler, log);
      });
      // 创建ChannelInitializer,初始化Channel
      ChannelInitializer<SocketChannel> initializer = new ChannelInitializer<
         SocketChannel>() {
         @Override
         protected void initChannel(SocketChannel ch) {
            // 创建路由RouterHandler,完成业务请求拦截
            RouterHandler handler = new RouterHandler(router, 
               responseHeaders);
            // 将SSL放置在第一个Handler上
            if (isHttpsEnabled()) {
               ch.pipeline().addLast("ssl",
                  new RedirectingSslHandler(restAddress, restAddressFuture, 
                     sslHandlerFactory));
            }
            ch.pipeline()
               .addLast(new HttpServerCodec())
               .addLast(new FileUploadHandler(uploadDir))
               .addLast(new FlinkHttpObjectAggregator(maxContentLength, 
                  responseHeaders))
               .addLast(new ChunkedWriteHandler())
               .addLast(handler.getName(), handler)
               .addLast(new PipelineErrorHandler(log, responseHeaders));
         }
      };
      // 创建bossGroup和workerGroup
      NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new 
         ExecutorThreadFactory("flink-rest-server-netty-boss"));
      NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new 
         ExecutorThreadFactory("flink-rest-server-netty-worker"));
      // 创建ServerBootstrap启动类
      bootstrap = new ServerBootstrap();
      // 绑定创建的bossGroup和workerGroup以及initializer
      bootstrap
         .group(bossGroup, workerGroup)
         .channel(NioServerSocketChannel.class)
         .childHandler(initializer);
      //从restBindPortRange选择端口
      Iterator<Integer> portsIterator;
      try {
         portsIterator = NetUtils.getPortRangeFromString(restBindPortRange);
      } catch (IllegalConfigurationException e) {
         throw e;
      } catch (Exception e) {
         throw new IllegalArgumentException("Invalid port range definition: " 
            + restBindPortRange);
      }
      // 从portsIterator选择没有被占用的端口,作为bootstrap启动的端口
      int chosenPort = 0;
      while (portsIterator.hasNext()) {
         try {
            chosenPort = portsIterator.next();
            final ChannelFuture channel;
            if (restBindAddress == null) {
               channel = bootstrap.bind(chosenPort);
            } else {
               channel = bootstrap.bind(restBindAddress, chosenPort);
            }
            serverChannel = channel.syncUninterruptibly().channel();
            break;
         } catch (final Exception e) {
            if (!(e instanceof org.jboss.netty.channel.ChannelException || e 
               instanceof java.net.BindException)) {
               throw e;
            }
         }
      }
      // ServerBootstrap启动成功,输出restBindAddress和chosenPort
      log.debug("Binding rest endpoint to {}:{}.", restBindAddress, 
         chosenPort);
      final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel.
         localAddress();
      final String advertisedAddress;
      if (bindAddress.getAddress().isAnyLocalAddress()) {
         advertisedAddress = this.restAddress;
      } else {
         advertisedAddress = bindAddress.getAddress().getHostAddress();
      }
      final int port = bindAddress.getPort();
      log.info("Rest endpoint listening at {}:{}", advertisedAddress, port);
      restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, "").
         toString();
      restAddressFuture.complete(restBaseUrl);
      // 将状态设定为RUNNING
      state = State.RUNNING;
      // 调用内部启动方法,启动RestEndpoint服务
      startInternal();
   }
}

3.Handlers的加载与注册

接下来我们看Handlers的初始化和加载过程。如代码清单3-15所示,初始化JobSubmitHandler主要有以下步骤。

·调用WebMonitorEndpoint.initializeHandlers()方法,加载WebMonitorEndpoint中用于监控指标展示的Handlers。

·创建JobSubmitHandler,用于任务提交,其中leaderRetriever参数用于获取Dispatcher-Gateway的Leader地址。

·如果集群允许通过Web提交JobGraph,就会通过WebSubmissionExtension加载Web提交任务相关的Handler。在WebSubmissionExtension中包含通过WebSubmission-Extension提交作业的全部Handler,如上传JAR包使用的JarUploadHandler、执行任务使用的JarRunHandler等。对Per-Job类型集群来讲,JarUploadHandler默认是不加载的,不允许提交和运行新的作业。

·在loadWebSubmissionExtension()方法中,实际上通过反射的方式构建WebSubmission-Extension,然后获取WebSubmissionExtension中的Handlers。

·将jobSubmitHandler添加到handlers中,并返回handlers集合。

代码清单3-15 DispatcherRestEndpoint.initializeHandlers()方法


protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> 
   initializeHandlers(final CompletableFuture<String> localAddressFuture) {
   //调用WebMonitorEndpoint.initializeHandlers()方法
   List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = 
      super.initializeHandlers(localAddressFuture);
   final Time timeout = restConfiguration.getTimeout();
   // 创建JobSubmitHandler用于任务提交
    JobSubmitHandler jobSubmitHandler = new JobSubmitHandler(
      leaderRetriever,
      timeout,
      responseHeaders,
      executor,
      clusterConfiguration);
      // 如果允许通过Web提交JobGraph,就会通过WebSubmissionExtension加载Web提交
      任务相关的Handler
   if (restConfiguration.isWebSubmitEnabled()) {
      try {
         webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension(
            leaderRetriever,
            timeout,
            responseHeaders,
            localAddressFuture,
            uploadDir,
            executor,
            clusterConfiguration);
         // 将通过webSubmissionExtension加载的Handler添加到Handlers中
         handlers.addAll(webSubmissionExtension.getHandlers());
      } catch (FlinkException e) {
         if (log.isDebugEnabled()) {
            log.debug("Failed to load web based job submission extension.", e);
         } else {
            log.info("Failed to load web based job submission extension. " +
               "Probable reason: flink-runtime-web is not in the classpath.");
         }
      }
   } else {
      log.info("Web-based job submission is not enabled.");
   }
   // 将jobSubmitHandler添加到handlers中
   handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), 
      jobSubmitHandler));
   return handlers;
}