- Flink设计与实现:核心原理与源码解析
- 张利兵
- 1941字
- 2021-08-13 17:26:52
3.2.2 WebMonitorEndpoint的创建与初始化
WebMonitorEndpoint基于Netty通信框架实现了Restful的服务后端,提供Restful接口支持Flink Web页面在内的所有Rest请求,例如获取集群监控指标。如图3-5所示,WebMonitorEndpoint的创建过程比较简单,并且RestEndpoint实现了针对Session和JobCluster集群的SessionRestEndpointFactory和JobRestEndpointFactory两种工厂创建类。
从图3-5中也可以看出,WebMonitorEndpoint继承了RestServerEndpoint基本实现类,其中RestServerEndpoint基于Netty框架实现了Rest服务后端,并提供了自定义Handler的初始化和实现抽象方法。WebMonitorEndpoint和DispatcherRestEndpoint等子类能够拓展处理各自业务的Rest接口对应的Handlers实现。对于WebMonitorEndpoint的另一个实现类MiniDispatcherRestEndpoint,主要是针对本地执行实现的mini版DispatcherRestEndpoint,区别在于MiniDispatcherRestEndpoint不用加载JobGraph提交使用的Handlers,这是因为MiniDispatcherRestEndpoint不支持通过RestAPI提交JobGraph。在IDEA中执行作业时创建的实际上是MiniCluster,而在MiniCluster中对应的WebMonitorEndpoint实现是Mini-DispatcherRestEndpoint。
图3-5 WebMonitorEndpoint UML关系图
1.创建DispatcherRestEndpoint
如代码清单3-13所示,我们还是以StandaloneSession集群为例进行说明。Dispatcher-RestEndpoint主要通过SessionRestEndpointFactory创建,创建方法涉及参数如下。
·configuration:集群配置参数。
·dispatcherGatewayRetriever:DispatcherGateway服务地址获取器,用于获取当前活跃的dispatcherGateway地址。基于dispatcherGateway可以实现与Dispatcher的RPC通信,最终提交的JobGraph通过dispatcherGateway发送给Dispatcher组件。
·resourceManagerGatewayRetriever:ResourceManagerGateway服务地址获取器,用于获取当前活跃的ResourceManagerGateway地址,通过ResourceManagerGateway实现ResourceManager组件之间的RPC通信,例如在TaskManagersHandler中通过调用ResourceManagerGateway获取集群中的TaskManagers监控信息。
·transientBlobService:临时二进制对象数据存储服务,BlobServer接收数据后,会及时清理Cache中的对象数据。
·executor:用于处理WebMonitorEndpoint请求的线程池服务。
·metricFetcher:用于拉取JobManager和TaskManager上的Metric监控指标。
·leaderElectionService:用于在高可用集群中启动和选择服务的Leader节点,如通过leaderElectionService启动WebMonitorEndpoint RPC服务,然后将Leader节点注册至ZooKeeper,以此实现WebMonitorEndpoint服务的高可用。
·fatalErrorHandler:异常处理器,当WebMonitorEndpoint出现异常时调用fatalError-Handler的中处理接口。
代码清单3-13 SessionRestEndpointFactory.createRestEndpoint()方法
public WebMonitorEndpoint<DispatcherGateway> createRestEndpoint( Configuration configuration, LeaderGatewayRetriever<DispatcherGateway> dispatcherGatewayRetriever, LeaderGatewayRetriever<ResourceManagerGateway> resourceManagerGateway Retriever, TransientBlobService transientBlobService, ExecutorService executor, MetricFetcher metricFetcher, LeaderElectionService leaderElectionService, FatalErrorHandler fatalErrorHandler) throws Exception { // 通过Configuration获取RestHandlerConfiguration final RestHandlerConfiguration restHandlerConfiguration = RestHandlerConfiguration.fromConfiguration(configuration); //创建DispatcherRestEndpoint return new DispatcherRestEndpoint( RestServerEndpointConfiguration.fromConfiguration(configuration), dispatcherGatewayRetriever, configuration, restHandlerConfiguration, resourceManagerGatewayRetriever, transientBlobService, executor, metricFetcher, leaderElectionService, fatalErrorHandler); }
2.启动RestServerEndpoint
接下来我们深入了解RestServerEndpoint.start()方法的实现,如代码清单3-14所示,方法主要包含如下逻辑。
·检查RestServerEndpoint.state是否为CREATED状态。
·启动Rest Endpoint,创建Handler使用的路由类Router,用于根据地址寻找对应的Handlers。
·调用initializeHandlers()方法初始化子类注册的Handlers,例如WebMonitorEndpoint中的Handlers实现。
·调用registerHandler()方法注册已经加载的Handlers。
·创建ChannelInitializer服务,初始化Netty中的Channel,在initChannel()方法中设定SocketChannel中Pipeline使用的拦截器。在Netty中使用ServerBootstrap或者bootstrap启动服务端或者客户端时,会为每个Channel链接创建一个独立的Pipeline,此时需要将自定义的Handler加入Pipeline。这里实际上会将加载的Handlers加入创建的Pipeline。在Pipeline中也会按照顺序在尾部增加HttpServerCodec、FileUpload-Handler以及ChunkedWriteHandler等基础Handlers处理器。
·创建bossGroup和workerGroup两个NioEventLoopGroup实例,可以将其理解为两个线程池,bossGroup设置了一个用于处理连接请求和建立连接的线程,workGroup用于在连接建立之后处理I/O请求。
·创建ServerBootstrap启动类并绑定bossGroup、workerGroup和initializer等参数。
·为了防止出现端口占用的情况,从restBindPortRange中抽取端口范围。使用bootstrap.bind(chosenPort)按照顺序进行绑定,如果绑定成功则调用bind()方法,启动Server-Bootstrap服务,此时Web端口(默认为8081)就可以正常访问了。
·将RestServerEndpoint中的状态设定为RUNNING,调用WebMonitorEndpoint.startInternal()方法,启动RPC高可用服务。
代码清单3-14 RestServerEndpoint.start()方法
public final void start() throws Exception { synchronized (lock) { //检查RestServerEndpoint.state是否为CREATED Preconditions.checkState(state == State.CREATED, "The RestServerEndpoint cannot be restarted."); // 启动Rest Endpoint log.info("Starting rest endpoint."); final Router router = new Router(); final CompletableFuture<String> restAddressFuture = new CompletableFuture<>(); // 调用子类初始化Handlers handlers = initializeHandlers(restAddressFuture); // handlers进行排序处理 Collections.sort( handlers, RestHandlerUrlComparator.INSTANCE); // 调用registerHandler()方法 handlers.forEach(handler -> { registerHandler(router, handler, log); }); // 创建ChannelInitializer,初始化Channel ChannelInitializer<SocketChannel> initializer = new ChannelInitializer< SocketChannel>() { @Override protected void initChannel(SocketChannel ch) { // 创建路由RouterHandler,完成业务请求拦截 RouterHandler handler = new RouterHandler(router, responseHeaders); // 将SSL放置在第一个Handler上 if (isHttpsEnabled()) { ch.pipeline().addLast("ssl", new RedirectingSslHandler(restAddress, restAddressFuture, sslHandlerFactory)); } ch.pipeline() .addLast(new HttpServerCodec()) .addLast(new FileUploadHandler(uploadDir)) .addLast(new FlinkHttpObjectAggregator(maxContentLength, responseHeaders)) .addLast(new ChunkedWriteHandler()) .addLast(handler.getName(), handler) .addLast(new PipelineErrorHandler(log, responseHeaders)); } }; // 创建bossGroup和workerGroup NioEventLoopGroup bossGroup = new NioEventLoopGroup(1, new ExecutorThreadFactory("flink-rest-server-netty-boss")); NioEventLoopGroup workerGroup = new NioEventLoopGroup(0, new ExecutorThreadFactory("flink-rest-server-netty-worker")); // 创建ServerBootstrap启动类 bootstrap = new ServerBootstrap(); // 绑定创建的bossGroup和workerGroup以及initializer bootstrap .group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childHandler(initializer); //从restBindPortRange选择端口 Iterator<Integer> portsIterator; try { portsIterator = NetUtils.getPortRangeFromString(restBindPortRange); } catch (IllegalConfigurationException e) { throw e; } catch (Exception e) { throw new IllegalArgumentException("Invalid port range definition: " + restBindPortRange); } // 从portsIterator选择没有被占用的端口,作为bootstrap启动的端口 int chosenPort = 0; while (portsIterator.hasNext()) { try { chosenPort = portsIterator.next(); final ChannelFuture channel; if (restBindAddress == null) { channel = bootstrap.bind(chosenPort); } else { channel = bootstrap.bind(restBindAddress, chosenPort); } serverChannel = channel.syncUninterruptibly().channel(); break; } catch (final Exception e) { if (!(e instanceof org.jboss.netty.channel.ChannelException || e instanceof java.net.BindException)) { throw e; } } } // ServerBootstrap启动成功,输出restBindAddress和chosenPort log.debug("Binding rest endpoint to {}:{}.", restBindAddress, chosenPort); final InetSocketAddress bindAddress = (InetSocketAddress) serverChannel. localAddress(); final String advertisedAddress; if (bindAddress.getAddress().isAnyLocalAddress()) { advertisedAddress = this.restAddress; } else { advertisedAddress = bindAddress.getAddress().getHostAddress(); } final int port = bindAddress.getPort(); log.info("Rest endpoint listening at {}:{}", advertisedAddress, port); restBaseUrl = new URL(determineProtocol(), advertisedAddress, port, ""). toString(); restAddressFuture.complete(restBaseUrl); // 将状态设定为RUNNING state = State.RUNNING; // 调用内部启动方法,启动RestEndpoint服务 startInternal(); } }
3.Handlers的加载与注册
接下来我们看Handlers的初始化和加载过程。如代码清单3-15所示,初始化JobSubmitHandler主要有以下步骤。
·调用WebMonitorEndpoint.initializeHandlers()方法,加载WebMonitorEndpoint中用于监控指标展示的Handlers。
·创建JobSubmitHandler,用于任务提交,其中leaderRetriever参数用于获取Dispatcher-Gateway的Leader地址。
·如果集群允许通过Web提交JobGraph,就会通过WebSubmissionExtension加载Web提交任务相关的Handler。在WebSubmissionExtension中包含通过WebSubmission-Extension提交作业的全部Handler,如上传JAR包使用的JarUploadHandler、执行任务使用的JarRunHandler等。对Per-Job类型集群来讲,JarUploadHandler默认是不加载的,不允许提交和运行新的作业。
·在loadWebSubmissionExtension()方法中,实际上通过反射的方式构建WebSubmission-Extension,然后获取WebSubmissionExtension中的Handlers。
·将jobSubmitHandler添加到handlers中,并返回handlers集合。
代码清单3-15 DispatcherRestEndpoint.initializeHandlers()方法
protected List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> initializeHandlers(final CompletableFuture<String> localAddressFuture) { //调用WebMonitorEndpoint.initializeHandlers()方法 List<Tuple2<RestHandlerSpecification, ChannelInboundHandler>> handlers = super.initializeHandlers(localAddressFuture); final Time timeout = restConfiguration.getTimeout(); // 创建JobSubmitHandler用于任务提交 JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); // 如果允许通过Web提交JobGraph,就会通过WebSubmissionExtension加载Web提交 任务相关的Handler if (restConfiguration.isWebSubmitEnabled()) { try { webSubmissionExtension = WebMonitorUtils.loadWebSubmissionExtension( leaderRetriever, timeout, responseHeaders, localAddressFuture, uploadDir, executor, clusterConfiguration); // 将通过webSubmissionExtension加载的Handler添加到Handlers中 handlers.addAll(webSubmissionExtension.getHandlers()); } catch (FlinkException e) { if (log.isDebugEnabled()) { log.debug("Failed to load web based job submission extension.", e); } else { log.info("Failed to load web based job submission extension. " + "Probable reason: flink-runtime-web is not in the classpath."); } } } else { log.info("Web-based job submission is not enabled."); } // 将jobSubmitHandler添加到handlers中 handlers.add(Tuple2.of(jobSubmitHandler.getMessageHeaders(), jobSubmitHandler)); return handlers; }