Spark2.0.2源码分析——Driver的注册和启动

Spark2.0.2源码分析——Driver的注册和启动

2023年6月26日发(作者:)

Spark2.0.2源码分析——Driver的注册和启动以

standalone - cluster 模式为例从

mainMethod 执⾏之后⼀. Rpc 创建并注册这⾥可以参考:⼆.注册 Driver最终,执⾏

t() :将

Driver 的信息封装给

DriverDescription ,然后异步发送到

Master:在

Master 类中:创建

Driver,返回封装好的

DriverInfo:接下来看⼀下

schedule ⽅法1.判断当前

Maste 的状态2.对当前

ALIVE 状态

worker 进⾏

shuffle 打乱(随机洗牌)workers 集合内元素,得到打乱后的

worker 集合3.遍历

waitingDrivers,给每个

driver 在打乱后的

Alive Workers 集合中选出⼀个

Worker,如果选出的

worker 空闲资源需求⼤于

driver 所需资源,则调⽤

launchDriver ⽅法,在该

worker 上启动

driver,在

drivers 等待队列中移除该

driver ,并标记该

Driver 为已启动4.最后调⽤

startExecutorsOnWorkers/** * 在等待的应⽤程序中分配当前可⽤的资源 * 每当⼀个新的应⽤程序加⼊或资源可⽤性改变时,这个⽅法将被调⽤。 */ private def schedule(): Unit = { // 先来判断,当前master的状态,如果不是Alive,什么都不做 if (state != ) { return } // Driver程序⽐Executor优先 // 对当前ALIVE状态worker进⾏随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合 val shuffledAliveWorkers = e((_.state == )) // 获取现在ALIVE的Worker数 val numWorkersAlive = var curPos = 0 // 遍历⼀个waitingDrivers集合的复制 // 将worker分配给每个等待的driver // 对于每⼀个driver,从分配给driver的最后⼀个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker for (driver <- ) {

var launched = false var numWorkersVisited = 0 // 如果Worker Visited数⼩于Worker Alive数,并且launched为true while (numWorkersVisited < numWorkersAlive && !launched) { // 在打乱后的Alive Workers中选出⼀个Worker val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 // 如果选出的worker空闲资源需求⼤于driver所需资源,则在该worker上启动driver if (Free >= && ree >= ) { // 调⽤launchDriver⽅法 launchDriver(worker, driver) // 在waitingDrivers等待队列中移除该driver,注意这个 // waitingDrivers是⼀个DriverInfo类型的数组 waitingDrivers -= driver // 将launched修改为true launched = true } curPos = (curPos + 1) % numWorkersAlive } } // 调⽤startExecutorsOnWorkers⽅法 startExecutorsOnWorkers() }接下来看⼀下,launchDriver⽅法,⽅法执⾏步骤如下:1.在WorkerInfo中添加driver信息2.在DriverInfo中添加worker信息3.调⽤worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver4.将driver状态更新为RUNNING// 开始driver private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { // 打印⽇志,在xx worker上启动driver logInfo("Launching driver " + + " on worker " + ) // 在WorkerInfo中添加driver信息 ver(driver) // 在DriverInfo中添加worker信息 = Some(worker) // 调⽤worker RpcEndPointRef向Worker发送LaunchDriver消息 // 注意 中的参数 (LaunchDriver(, )) // driver状态更新为RUNNING = G }⾄此,Driver 注册完毕,前往

orker 接收到

LaunchDriver 消息后1.将

driver 信息封装为

DriverRunner 对象2.将创建的

DriverRunner 对象添加到

drivers HashMap 中保存3.调⽤

DriverRunner 的

start ⽅法4.记录

cores 使⽤情况,和

memory 使⽤情况在

Worker 中进⾏模式匹配:case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") // 将driver信息封装为DriverRunner对象 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, (command = pdateSSLSettings(d, conf)), self, workerUri, securityMgr) // 将创建的DriverRunner对象添加到drivers HashMap中保存 drivers(driverId) = driver // 调⽤start⽅法 () // 记录coresUsed和memoryUsed数据 coresUsed += memoryUsed += ver 注册成功!三.启动 () :launchDriver ⽅法中,重定向将

process 的

InputStream 和

ErrorStream 重定向到指定的

stdout、stderr ⽂件中:当

Worker 遇到

LaunchDriver 指令时,创建并启动⼀个

DriverRunnerDriverRunner 运⾏

DriverWrapperDriverWrapper 在

Worker 上启动还会启动⼀个

WorkerWatcher,主要⽬的为监控

Worker 节点是否正常,如果出现异常就直接退出DriverWrapper 运⾏提交的应⽤的

main ⽅法SparkContext 初始化Driver log 如下:2019-02-10 17:52:19 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for using builtin-java classes where applicable2019-02-10 17:52:20 INFO SecurityManager:54 - Changing view acls to: root2019-02-10 17:52:20 INFO SecurityManager:54 - Changing modify acls to: root2019-02-10 17:52:20 INFO SecurityManager:54 - Changing view acls groups to:

2019-02-10 17:52:20 INFO SecurityManager:54 - Changing modify acls groups to:

2019-02-10 17:52:20 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()2019-02-10 17:52:21 INFO Utils:54 - Successfully started service 'Driver' on port 43632.2019-02-10 17:52:21 INFO DriverWrapper:54 - Driver address: 192.168.0.102:436322019-02-10 17:52:21 INFO WorkerWatcher:54 - Connecting to worker spark://Worker@192.168.0.102:340962019-02-10 17:52:21 INFO SecurityManager:54 - Changing view acls to: root2019-02-10 17:52:21 INFO SecurityManager:54 - Changing modify acls to: root2019-02-10 17:52:21 INFO SecurityManager:54 - Changing view acls groups to:

2019-02-10 17:52:21 INFO SecurityManager:54 - Changing modify acls groups to:

2019-02-10 17:52:21 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()2019-02-10 17:52:21 INFO TransportClientFactory:267 - Successfully created connection to /192.168.0.102:34096 after 163 ms (0 ms spent in bootstraps)2019-02-10 17:52:21 INFO WorkerWatcher:54 - Successfully connected to spark://Worker@192.168.0.102:340962019-02-10 17:52:24 INFO SparkContext:54 - Running Spark version 2.4.02019-02-10 17:52:24 INFO SparkContext:54 - Submitted application: Spark Pi接下来就是

SparkContext 的初始化了这⾥就⽂字描述了,DriverRunner 的源码没有细看,以后有时间补充

发布者:admin,转转请注明出处:http://www.yc00.com/news/1687753825a39590.html

相关推荐

发表回复

评论列表(0条)

  • 暂无评论

联系我们

400-800-8888

在线咨询: QQ交谈

邮件:admin@example.com

工作时间:周一至周五,9:30-18:30,节假日休息

关注微信