FlinkJAR包上传和运行逻辑

FlinkJAR包上传和运行逻辑

2023年7月3日发(作者:)

FlinkJAR包上传和运⾏逻辑⽂章⽬录说明1. ⽬标:⾛读Flink Clint中Upload jar、Run jar相关代码2. 源码版本:1.6.13. 部属模式:Standalone4. 相关知识点:Netty、 CompletedFuture启动注册Handler代码From otected List> initializeHandlers(CompletableFuture restAddressFuture) { List> handlers = lizeHandlers(restAddressFuture); ... JobSubmitHandler jobSubmitHandler = new JobSubmitHandler( restAddressFuture, leaderRetriever, timeout, responseHeaders, executor, clusterConfiguration); if (lean(_ENABLE)) { try { //

此处注册了JAR Upload和Run的处理⽅法 webSubmissionExtension = bSubmissionExtension( leaderRetriever, restAddressFuture, timeout, responseHeaders, uploadDir, executor, clusterConfiguration); // register extension handlers (dlers()); } catch (FlinkException e) { ... } } else { ("Web-based job submission is not enabled."); } ... return handlers;}在WebSubmissionExtension中,可以看到定义了Upload、Run、List、Delete、Plan的HandlerUpload JAR处理代码在JarUploadHandler的handleRequest⽅法中。Jar包存放路径:e(UUID() + "_" + eName());⽅法本⾝逻辑简单,⽐较隐蔽的是jarDir的值。通过倒推寻找该值的赋值过程。1. JarUploadHandler 构造时赋值属性jarDir;2. JarUploadHandler由WebSubmissionExtension通过bSubmissionExtension构造,jarDir源⾃⽗类RestServerEndpoint中的变量uploadDir;3. RestServerEndpoint中uploadDir通过oadDir()初始化4. 在RestServerEndpointConfiguration中找到了源头:final Path uploadDir = ( ing(_DIR, ing(_DIR)), "flink-web-upload");⼀般情况下,⼤家都不会改写配置项_DIR(对应配置项“”),所以JAR包存放到了"$_DIR/flink-web-upload"_DIR的赋值⽐较隐蔽,只从配置⽂件看,是在/tmp⽬录。但是在ClusterEntrypoint的generateClusterConfiguration中,其实对该值进⾏了改写:final String webTmpDir = ing(_DIR);final File uniqueWebTmpDir = new File(webTmpDir, "flink-web-" + UUID());ing(_DIR, olutePath());最终的效果JAR包存放⽬录是"/tmp/flink-web-UUID/flink-web-upload"存放在tmp⽬录⾥⾯是有风险的,过期后会被删除。Run Jar同上,重点关注JarRunHandler的handleRequest@Overrideprotected CompletableFuture handleRequest( @Nonnull final HandlerRequest request, @Nonnull final DispatcherGateway gateway) throws RestHandlerException { ... # 产⽣JobGraph final CompletableFuture jobGraphFuture = getJobGraphAsync( jarFile, entryClass, programArgs, savepointRestoreSettings, parallelism); CompletableFuture blobServerPortFuture = bServerPort(timeout); # Jar上传JobGraph,UserJar和UserArtifact CompletableFuture jarUploadFuture = mbine(blobServerPortFuture, (jobGraph, blobServerPort) -> { final InetSocketAddress address = new InetSocketAddress(tname(), blobServerPort); try { tAndUploadJobGraphFiles(jobGraph, () -> new BlobClient(address, configuration)); } catch (FlinkException e) { throw new CompletionException(e); } return jobGraph; }); CompletableFuture jobSubmissionFuture = mpose(jobGraph -> { // we have to enable queued scheduling because slots will be allocated lazily owQueuedScheduling(true); # 提交Job return Job(jobGraph, timeout); }); return jobSubmissionFuture .thenCombine(jarUploadFuture, (ack, jobGraph) -> new JarRunResponseBody(ID())) .exceptionally(throwable -> { throw new CompletionException(new RestHandlerException( sage(), AL_SERVER_ERROR, throwable)); });}⽣成JobGraph的过程/*

在JarRunHandler的getJobGraphAsync中构造了PackagedProgram */final PackagedProgram packagedProgram = new PackagedProgram( (), entryClass, y(new String[()])); jobGraph = JobGraph(packagedProgram, configuration, parallelism);/* From */public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism) throws ProgramInvocationException { .... if (gProgramEntryPoint()) { ... } else if (gInteractiveMode()) { /*

⼀般提交的流程序会⾛这个分⽀,判断原则是⽤户程序的main Class是否isAssignableFrom ProgramDescription */ final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer); allelism(defaultParallelism); //

会触发main函数调⽤ flinkPlan = imizedPlan(packagedProgram); } else { throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode."); } if (flinkPlan instanceof StreamingPlan) { //

获取JobGraph jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); epointRestoreSettings(epointSettings()); } else { ... } ... return jobGraph;}调⽤⽤户程序main⽅法/* From */public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException { ...

/*

设置ContextEnviormentFacoty对应的env为OptimizerPlanEnvironment */ setAsContext(); try { /*

调⽤⽤户程序main⽅法 */ InteractiveModeForExecution(); } ...}执⾏⽤户程序main⽅法//

⼀个常见的main

结构public static void main(String[] args) throws Exception { /*

此处获取的是上⼀步setAsContext中设置的OptimizerPlanEnvironment */ StreamExecutionEnvironment env = cutionEnvironment(); ... /*

对应的是执⾏OptimizerPlanEnvironment的execute */ e();}执⾏execute (和接触过⼀个概念很类似-打桩测试)public JobExecutionResult execute(String jobName) throws Exception { /*

反馈Compile后的FlinkPlan */ Plan plan = createProgramPlan(jobName); zerPlan = e(plan); // execute后不要带其他的⽤户程序 // do not go on with anything now! throw new ProgramAbortException();}提交JobGraphOK,已经得到了JobGraph,再细看提交JobGraph的过程/* From */public CompletableFuture submitJob(JobGraph jobGraph, Time timeout) { ... if (jobSchedulingStatus == || nsKey(jobId)) { return tedExceptionally( new JobSubmissionException(jobId, ("Job has already been submitted and is in state %s.", jobSchedulingStatus))); } else { //重点关注persistAndRunJob final CompletableFuture persistAndRunFuture = waitForTerminatingJobManager(jobId, jobGraph, this::persistAndRunJob) .thenApply(ignored -> ()); return ionally( (Throwable throwable) -> { final Throwable strippedThrowable = ompletionException(throwable); ("Failed to submit job {}.", jobId, strippedThrowable); throw new CompletionException( new JobSubmissionException(jobId, "Failed to submit job.", strippedThrowable)); }); }}省略⼀些⽅法间调⽤,调⽤顺序如下:1. tAndRunJob2. 3. JobManagerRunner,创建JobMaster4. AndRestoreExecutionGraph终于看到了ExecutionGraphExectionGraph Deploy的过程⽅法间调⽤关系:1. 上接JobManagerRunner2. obManagerRunner3. 4. 5. eadership6. JobSchedulingStatusAndStartJobManager7. 8. obExecution9. ndScheduleExecutionGraph10. leExecutionGraph11. leForExecution12. leEager13.

发布者:admin,转转请注明出处:http://www.yc00.com/news/1688382768a129707.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信