2023年7月9日发(作者:)
Seatunnel源码解析(3)-加载插件Seatunnel源码解析(3)-加载插件需求公司在使⽤Seatunnel的过程中,规划将Seatunnel集成在平台中,提供可视化操作。因此⽬前有如下⼏个相关的需求:1. 可以通过Web接⼝,传递参数,启动⼀个Seatunnel应⽤2. 可以⾃定义⽇志,收集相关指标,⽬前想到的包括:应⽤的⼊流量、出流量;启动时间、结束时间等3. 在任务结束后,可以⽤applicationId⾃动从yarn上收集⽇志(⼀是⼿动收集太⿇烦,⼆是时间稍长⽇志就没了)材料1. Seatunnel:2.0.52. Spark:2.4.83. Hadoop:2.7任意门导读本章将从源码⾓度,解读Seatunnel如何根据配置,加载指定的插件代码如何加载插件entryPointpublic class Seatunnel { ... private static void entryPoint(String configFile, Engine engine) throws Exception { //
根据.conf配置⽂件的路径,加载解析配置,并封装成ConfigBuilder
ConfigBuilder configBuilder = new ConfigBuilder(configFile, engine); //
通过ConfigBuilder,加载配置⽂件中指定的Source、Transform、Sink插件 List
通过ConfigBuilder,创建对应执⾏引擎(Spark/Flink)和执⾏模式(Batch/Flink)的执⾏器Execution Execution execution = Execution(); //
调⽤插件⾃定义的检查配置的逻辑 baseCheckConfig(sources, transforms, sinks); //
调⽤插件⾃定义的插件执⾏的前置初始化逻辑 prepare((), sources, transforms, sinks); //
打印应⽤启动LOGO showAsciiLogo(); // Execution提交Spark/Flink应⽤ (sources, transforms, sinks); }}在上⼀节中,介绍了Seatunnel将配置⽂件内容,封装到ConfigBuilder中,顺带创建了Spark/Flink的上下⽂环境。从entryPoint⽅法中,看出通过Plugins()函数创建Source、Transform、Sink插件Plugins()以构建Source插件为例,我们在第⼀节《启动⼀个应⽤》的配置⽂件中,指定Source插件为Fake1. ⾸先从配置中,拿到所有source的配置2. 遍历每个source插件的配置,⽣成对应的插件实例,并添加到List中public class ConfigBuilder{ ... public
以Source为例,则这⾥的type即为 List
拿到conf⽂件⾥,所有source的配置 List extends Config> configList = figList(e()); h(plugin -> { //
遍历每个source的配置 try { //
这个t就是要构建的插件,即BaseSource的⼦类 T t = createPluginInstanceIgnoreCase(ing(PLUGIN_NAME_KEY), type); //
将这个插件的配置传递到插件中,⽅便在⾃定义插件的其他函数中使⽤ fig(plugin); //
保存已经⽣成好的插件 (t); } catch (Exception e) { throw new RuntimeException(e); } }); return basePluginList; } ...}PluginInstanceIgnoreCase()在createPluginInstanceIgnoreCase中,创建⼀个具体的插件参数是name:配置中的插件名称(Fake),pluginType:插件的类型(Source)判断是否为⽤户上传的⾃定义插件,判断标准是:插件名是否被.分割1. ⽤户⾃定义上传的插件,则配置中的插件名称为全类名,直接通过反射创建,返回创建好的插件2. 官⽅包⾃带插件,则配置中的插件名为插件类的类名,需要通过官⽅的包命名规则,拼接出全类名3. 根据传⼊的PluginType,通过反射构建出所有的BaseSource(以Source为例)实现类的实例4. 遍历所有实现类,找到与通过配置⽂件插件名拼接的全类名相等的插件实例,并返回创建好的插件5. 找不到则抛出异常public class ConfigBuilder{ ... private
函数参数即Fake,Source if ((".").length != 1) { //
这⾥判断是⽤户⾃定义的插件(⾮官⽅包⾃带插件) return (T) e(name).newInstance(); } //
官⽅包⾃带插件⽣成⽅式 String packageName; ServiceLoader
根据前⾯解析load()中的代码,得知此处packageName是: // packageName = rcePackage(); ("eName:" + packageName); // eSourceClass()= // arkSource // T: BaseSource Class
这⾥循环遍历每⼀个被加载出来的插件 //
通过判断插件类名与配置中的插件名是否相等,判断是否为要被创建的插件,并返回,因此 T plugin = (); Class> serviceClass = ss(); String serviceClassName = e(); String clsNameToLower = rCase(); ("eClassName:" + serviceClassName); if ((rCase())) { return plugin; } } catch (ServiceConfigurationError e) { // () may throw ServiceConfigurationError, // but maybe caused by a not used plugin in this job ("Error when load plugin: [{}]", canonicalName, e); } } throw new ClassNotFoundException("Plugin class not found by name :[" + canonicalName + "]"); } ...}Source、Transform、Sink插件都是相同的构建过程
发布者:admin,转转请注明出处:http://www.yc00.com/web/1688906674a182254.html
评论列表(0条)