2023年6月26日发(作者:)
Spark2.0.2源码分析——Driver的注册和启动以
standalone - cluster 模式为例从
mainMethod 执⾏之后⼀. Rpc 创建并注册这⾥可以参考:⼆.注册 Driver最终,执⾏
t() :将
Driver 的信息封装给
DriverDescription ,然后异步发送到
Master:在
Master 类中:创建
Driver,返回封装好的
DriverInfo:接下来看⼀下
schedule ⽅法1.判断当前
Maste 的状态2.对当前
ALIVE 状态
worker 进⾏
shuffle 打乱(随机洗牌)workers 集合内元素,得到打乱后的
worker 集合3.遍历
waitingDrivers,给每个
driver 在打乱后的
Alive Workers 集合中选出⼀个
Worker,如果选出的
worker 空闲资源需求⼤于
driver 所需资源,则调⽤
launchDriver ⽅法,在该
worker 上启动
driver,在
drivers 等待队列中移除该
driver ,并标记该
Driver 为已启动4.最后调⽤
startExecutorsOnWorkers/** * 在等待的应⽤程序中分配当前可⽤的资源 * 每当⼀个新的应⽤程序加⼊或资源可⽤性改变时,这个⽅法将被调⽤。 */ private def schedule(): Unit = { // 先来判断,当前master的状态,如果不是Alive,什么都不做 if (state != ) { return } // Driver程序⽐Executor优先 // 对当前ALIVE状态worker进⾏随机洗牌(shuffle)打乱workers集合内元素,得到打乱后的worker集合 val shuffledAliveWorkers = e((_.state == )) // 获取现在ALIVE的Worker数 val numWorkersAlive = var curPos = 0 // 遍历⼀个waitingDrivers集合的复制 // 将worker分配给每个等待的driver // 对于每⼀个driver,从分配给driver的最后⼀个worker开始,然后继续向前遍历,直到探索到所有的Alive Worker for (driver <- ) {
var launched = false var numWorkersVisited = 0 // 如果Worker Visited数⼩于Worker Alive数,并且launched为true while (numWorkersVisited < numWorkersAlive && !launched) { // 在打乱后的Alive Workers中选出⼀个Worker val worker = shuffledAliveWorkers(curPos) numWorkersVisited += 1 // 如果选出的worker空闲资源需求⼤于driver所需资源,则在该worker上启动driver if (Free >= && ree >= ) { // 调⽤launchDriver⽅法 launchDriver(worker, driver) // 在waitingDrivers等待队列中移除该driver,注意这个 // waitingDrivers是⼀个DriverInfo类型的数组 waitingDrivers -= driver // 将launched修改为true launched = true } curPos = (curPos + 1) % numWorkersAlive } } // 调⽤startExecutorsOnWorkers⽅法 startExecutorsOnWorkers() }接下来看⼀下,launchDriver⽅法,⽅法执⾏步骤如下:1.在WorkerInfo中添加driver信息2.在DriverInfo中添加worker信息3.调⽤worker RpcEndPointRef向Worker发送LaunchDriver消息,告诉worker启动driver4.将driver状态更新为RUNNING// 开始driver private def launchDriver(worker: WorkerInfo, driver: DriverInfo) { // 打印⽇志,在xx worker上启动driver logInfo("Launching driver " + + " on worker " + ) // 在WorkerInfo中添加driver信息 ver(driver) // 在DriverInfo中添加worker信息 = Some(worker) // 调⽤worker RpcEndPointRef向Worker发送LaunchDriver消息 // 注意 中的参数 (LaunchDriver(, )) // driver状态更新为RUNNING = G }⾄此,Driver 注册完毕,前往
orker 接收到
LaunchDriver 消息后1.将
driver 信息封装为
DriverRunner 对象2.将创建的
DriverRunner 对象添加到
drivers HashMap 中保存3.调⽤
DriverRunner 的
start ⽅法4.记录
cores 使⽤情况,和
memory 使⽤情况在
Worker 中进⾏模式匹配:case LaunchDriver(driverId, driverDesc) => logInfo(s"Asked to launch driver $driverId") // 将driver信息封装为DriverRunner对象 val driver = new DriverRunner( conf, driverId, workDir, sparkHome, (command = pdateSSLSettings(d, conf)), self, workerUri, securityMgr) // 将创建的DriverRunner对象添加到drivers HashMap中保存 drivers(driverId) = driver // 调⽤start⽅法 () // 记录coresUsed和memoryUsed数据 coresUsed += memoryUsed += ver 注册成功!三.启动 () :launchDriver ⽅法中,重定向将
process 的
InputStream 和
ErrorStream 重定向到指定的
stdout、stderr ⽂件中:当
Worker 遇到
LaunchDriver 指令时,创建并启动⼀个
DriverRunnerDriverRunner 运⾏
DriverWrapperDriverWrapper 在
Worker 上启动还会启动⼀个
WorkerWatcher,主要⽬的为监控
Worker 节点是否正常,如果出现异常就直接退出DriverWrapper 运⾏提交的应⽤的
main ⽅法SparkContext 初始化Driver log 如下:2019-02-10 17:52:19 WARN NativeCodeLoader:62 - Unable to load native-hadoop library for using builtin-java classes where applicable2019-02-10 17:52:20 INFO SecurityManager:54 - Changing view acls to: root2019-02-10 17:52:20 INFO SecurityManager:54 - Changing modify acls to: root2019-02-10 17:52:20 INFO SecurityManager:54 - Changing view acls groups to:
2019-02-10 17:52:20 INFO SecurityManager:54 - Changing modify acls groups to:
2019-02-10 17:52:20 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()2019-02-10 17:52:21 INFO Utils:54 - Successfully started service 'Driver' on port 43632.2019-02-10 17:52:21 INFO DriverWrapper:54 - Driver address: 192.168.0.102:436322019-02-10 17:52:21 INFO WorkerWatcher:54 - Connecting to worker spark://Worker@192.168.0.102:340962019-02-10 17:52:21 INFO SecurityManager:54 - Changing view acls to: root2019-02-10 17:52:21 INFO SecurityManager:54 - Changing modify acls to: root2019-02-10 17:52:21 INFO SecurityManager:54 - Changing view acls groups to:
2019-02-10 17:52:21 INFO SecurityManager:54 - Changing modify acls groups to:
2019-02-10 17:52:21 INFO SecurityManager:54 - SecurityManager: authentication disabled; ui acls disabled; users with view permissions: Set(root); groups with view permissions: Set(); users with modify permissions: Set(root); groups with modify permissions: Set()2019-02-10 17:52:21 INFO TransportClientFactory:267 - Successfully created connection to /192.168.0.102:34096 after 163 ms (0 ms spent in bootstraps)2019-02-10 17:52:21 INFO WorkerWatcher:54 - Successfully connected to spark://Worker@192.168.0.102:340962019-02-10 17:52:24 INFO SparkContext:54 - Running Spark version 2.4.02019-02-10 17:52:24 INFO SparkContext:54 - Submitted application: Spark Pi接下来就是
SparkContext 的初始化了这⾥就⽂字描述了,DriverRunner 的源码没有细看,以后有时间补充
发布者:admin,转转请注明出处:http://www.yc00.com/news/1687753825a39590.html
评论列表(0条)