2023年7月3日发(作者:)
FlinkJAR包上传和运⾏逻辑⽂章⽬录说明1. ⽬标:⾛读Flink Clint中Upload jar、Run jar相关代码2. 源码版本:1.6.13. 部属模式:Standalone4. 相关知识点:Netty、 CompletedFuture启动注册Handler代码From otected List
此处注册了JAR Upload和Run的处理⽅法 webSubmissionExtension = bSubmissionExtension( leaderRetriever, restAddressFuture, timeout, responseHeaders, uploadDir, executor, clusterConfiguration); // register extension handlers (dlers()); } catch (FlinkException e) { ... } } else { ("Web-based job submission is not enabled."); } ... return handlers;}在WebSubmissionExtension中,可以看到定义了Upload、Run、List、Delete、Plan的HandlerUpload JAR处理代码在JarUploadHandler的handleRequest⽅法中。Jar包存放路径:e(UUID() + "_" + eName());⽅法本⾝逻辑简单,⽐较隐蔽的是jarDir的值。通过倒推寻找该值的赋值过程。1. JarUploadHandler 构造时赋值属性jarDir;2. JarUploadHandler由WebSubmissionExtension通过bSubmissionExtension构造,jarDir源⾃⽗类RestServerEndpoint中的变量uploadDir;3. RestServerEndpoint中uploadDir通过oadDir()初始化4. 在RestServerEndpointConfiguration中找到了源头:final Path uploadDir = ( ing(_DIR, ing(_DIR)), "flink-web-upload");⼀般情况下,⼤家都不会改写配置项_DIR(对应配置项“”),所以JAR包存放到了"$_DIR/flink-web-upload"_DIR的赋值⽐较隐蔽,只从配置⽂件看,是在/tmp⽬录。但是在ClusterEntrypoint的generateClusterConfiguration中,其实对该值进⾏了改写:final String webTmpDir = ing(_DIR);final File uniqueWebTmpDir = new File(webTmpDir, "flink-web-" + UUID());ing(_DIR, olutePath());最终的效果JAR包存放⽬录是"/tmp/flink-web-UUID/flink-web-upload"存放在tmp⽬录⾥⾯是有风险的,过期后会被删除。Run Jar同上,重点关注JarRunHandler的handleRequest@Overrideprotected CompletableFuture
在JarRunHandler的getJobGraphAsync中构造了PackagedProgram */final PackagedProgram packagedProgram = new PackagedProgram( (), entryClass, y(new String[()])); jobGraph = JobGraph(packagedProgram, configuration, parallelism);/* From */public static JobGraph createJobGraph( PackagedProgram packagedProgram, Configuration configuration, int defaultParallelism) throws ProgramInvocationException { .... if (gProgramEntryPoint()) { ... } else if (gInteractiveMode()) { /*
⼀般提交的流程序会⾛这个分⽀,判断原则是⽤户程序的main Class是否isAssignableFrom ProgramDescription */ final OptimizerPlanEnvironment optimizerPlanEnvironment = new OptimizerPlanEnvironment(optimizer); allelism(defaultParallelism); //
会触发main函数调⽤ flinkPlan = imizedPlan(packagedProgram); } else { throw new ProgramInvocationException("PackagedProgram does not have a valid invocation mode."); } if (flinkPlan instanceof StreamingPlan) { //
获取JobGraph jobGraph = ((StreamingPlan) flinkPlan).getJobGraph(); epointRestoreSettings(epointSettings()); } else { ... } ... return jobGraph;}调⽤⽤户程序main⽅法/* From */public FlinkPlan getOptimizedPlan(PackagedProgram prog) throws ProgramInvocationException { ...
/*
设置ContextEnviormentFacoty对应的env为OptimizerPlanEnvironment */ setAsContext(); try { /*
调⽤⽤户程序main⽅法 */ InteractiveModeForExecution(); } ...}执⾏⽤户程序main⽅法//
⼀个常见的main
结构public static void main(String[] args) throws Exception { /*
此处获取的是上⼀步setAsContext中设置的OptimizerPlanEnvironment */ StreamExecutionEnvironment env = cutionEnvironment(); ... /*
对应的是执⾏OptimizerPlanEnvironment的execute */ e();}执⾏execute (和接触过⼀个概念很类似-打桩测试)public JobExecutionResult execute(String jobName) throws Exception { /*
反馈Compile后的FlinkPlan */ Plan plan = createProgramPlan(jobName); zerPlan = e(plan); // execute后不要带其他的⽤户程序 // do not go on with anything now! throw new ProgramAbortException();}提交JobGraphOK,已经得到了JobGraph,再细看提交JobGraph的过程/* From */public CompletableFuture
发布者:admin,转转请注明出处:http://www.yc00.com/news/1688382768a129707.html
评论列表(0条)