博客
关于我
强烈建议你试试无所不能的chatGPT,快点击我
深入理解spark-DAGscheduler源码分析(下)
阅读量:4626 次
发布时间:2019-06-09

本文共 8560 字,大约阅读时间需要 28 分钟。

 

上篇中已经分析了DAGscheduler的监听机制,以及job的划分,这次我们再来看一看stage是如何划分以及stage的最终提交;

 

当jobsubmit 加入到DAGscheduler的event队列中的时候,

就会将job的stage划分为resultstage 和 shufflestage,其中一个job只会有一个resultstage;

 

DAGScheduler#handleJobSubmitted

 stage的划分上,首先从最后一个stage开始,最先创建一个resultstage,然后依次向前递归实现stage的划分。

private[scheduler] def handleJobSubmitted(jobId: Int,      finalRDD: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      callSite: CallSite,      listener: JobListener,      properties: Properties) {    var finalStage: ResultStage = null    try {      // New stage creation may throw an exception if, for example, jobs are run on a      // HadoopRDD whose underlying HDFS files have been deleted.      // Stage划分过程是从最后一个Stage开始往前执行的,最后一个Stage的类型是ResultStage      finalStage = newResultStage(finalRDD, func, partitions, jobId, callSite)    } catch {      case e: Exception =>        logWarning("Creating new stage failed due to exception - job: " + jobId, e)        listener.jobFailed(e)        return    }    //为该Job生成一个ActiveJob对象,并准备计算这个finalStage    val job = new ActiveJob(jobId, finalStage, callSite, listener, properties)    clearCacheLocs()    logInfo("Got job %s (%s) with %d output partitions".format(      job.jobId, callSite.shortForm, partitions.length))    logInfo("Final stage: " + finalStage + " (" + finalStage.name + ")")    logInfo("Parents of final stage: " + finalStage.parents)    logInfo("Missing parents: " + getMissingParentStages(finalStage))    val jobSubmissionTime = clock.getTimeMillis()    jobIdToActiveJob(jobId) = job // 该job进入active状态    activeJobs += job    finalStage.setActiveJob(job)     val stageIds = jobIdToStageIds(jobId).toArray    val stageInfos = stageIds.flatMap(id => stageIdToStage.get(id).map(_.latestInfo))    listenerBus.post( // 向LiveListenerBus发送Job提交事件      SparkListenerJobStart(job.jobId, jobSubmissionTime, stageInfos, properties))    submitStage(finalStage)   //提交当前Stage    submitWaitingStages()  }

  

 

DAGScheduler#newResultStage

在划分中,根据创建的resultstage,去获取result的parentstage进行递归调用;

private def newResultStage(      rdd: RDD[_],      func: (TaskContext, Iterator[_]) => _,      partitions: Array[Int],      jobId: Int,      callSite: CallSite): ResultStage = {    // 获取当前Stage的parent Stage,这个方法是划分Stage的核心实现 (递归调用实现)    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, jobId)    val stage = new ResultStage(id, rdd, func, partitions, parentStages, jobId, callSite)// 创建当前最后的ResultStage    stageIdToStage(id) = stage // 将ResultStage与stageId相关联    updateJobIdStageIdMaps(jobId, stage) // 更新该job中包含的stage    stage  }

 

DAGScheduler#getParentStagesAndId

递归调用的终点,获取parentstage 和 stageid 的结果返回,由于这个是由后向前的递归调用(使用广度优先策略),那么最先执行的stageid 则是最小的0

private def getParentStagesAndId(rdd: RDD[_], firstJobId: Int): (List[Stage], Int) = {    val parentStages = getParentStages(rdd, firstJobId) // 传入rdd和jobId,生成parentStage    // 生成当前stage的stageId。同一Application中Stage初始编号为0    val id = nextStageId.getAndIncrement()     (parentStages, id)  }

  

DAGScheduler#getParentStages

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    val parents = new HashSet[Stage] // 存储当前stage的所有parent stage    val visited = new HashSet[RDD[_]] // 存储访问过的rdd    // We are manually maintaining a stack here to prevent StackOverflowError    // caused by recursively visiting    val waitingForVisit = new Stack[RDD[_]] // 一个栈,保存未访问过的rdd,先进后出    def visit(r: RDD[_]) {      if (!visited(r)) { // 如果栈中弹出的rdd被未访问过        visited += r // 首先将其标记为已访问        // Kind of ugly: need to register RDDs with the cache here since        // we can't do it in its constructor because # of partitions is unknown        for (dep <- r.dependencies) { // 读取当然rdd的依赖          dep match {            case shufDep: ShuffleDependency[_, _, _] => // 如果是宽依赖,则获取依赖rdd所在的ShuffleMapStage              parents += getShuffleMapStage(shufDep, firstJobId)            case _ =>              // 如果是窄依赖,将依赖的rdd也压入栈中,下次循环时会探索该rdd的依赖情况,直到找到款依赖划分新的stage为止              waitingForVisit.push(dep.rdd)           }        }      }    }    waitingForVisit.push(rdd) // 将当前rdd压入栈中    while (waitingForVisit.nonEmpty) { // 如果栈中有未被访问的rdd      visit(waitingForVisit.pop()) //     }    parents.toList  }

 

 

DAGScheduler#getParentStages

根据广度优先遍历该rdd来判断是否生成新的parentstage, 如果窄依赖,则压入当前waitingforvisit 的栈里 后进先出去执行,等待执行,如果是宽依赖,则调用 shufflemapstage加入parent 里面,

建立依赖关系;

private def getParentStages(rdd: RDD[_], firstJobId: Int): List[Stage] = {    val parents = new HashSet[Stage] // 存储当前stage的所有parent stage    val visited = new HashSet[RDD[_]] // 存储访问过的rdd    // We are manually maintaining a stack here to prevent StackOverflowError    // caused by recursively visiting    val waitingForVisit = new Stack[RDD[_]] // 一个栈,保存未访问过的rdd,先进后出    def visit(r: RDD[_]) {      if (!visited(r)) { // 如果栈中弹出的rdd被未访问过        visited += r // 首先将其标记为已访问        // Kind of ugly: need to register RDDs with the cache here since        // we can't do it in its constructor because # of partitions is unknown        for (dep <- r.dependencies) { // 读取当然rdd的依赖          dep match {            case shufDep: ShuffleDependency[_, _, _] => // 如果是宽依赖,则获取依赖rdd所在的ShuffleMapStage              parents += getShuffleMapStage(shufDep, firstJobId)            case _ =>              // 如果是窄依赖,将依赖的rdd也压入栈中,下次循环时会探索该rdd的依赖情况,直到找到款依赖划分新的stage为止              waitingForVisit.push(dep.rdd)           }        }      }    }    waitingForVisit.push(rdd) // 将当前rdd压入栈中    while (waitingForVisit.nonEmpty) { // 如果栈中有未被访问的rdd      visit(waitingForVisit.pop()) //     }    parents.toList  }

 

 

DAGScheduler#getShuffleMapStage

private def getShuffleMapStage(      shuffleDep: ShuffleDependency[_, _, _],      firstJobId: Int): ShuffleMapStage = {    shuffleToMapStage.get(shuffleDep.shuffleId) match { // 从Shuffle和Stage映射中取出当前Shuffle对应的Stage      case Some(stage) => stage // 如果该shuffle已经生成过stage,则直接返回      case None => // 否则为当前shuffle生成新的stage        // We are going to register ancestor shuffle dependencies        getAncestorShuffleDependencies(shuffleDep.rdd).foreach { dep =>           // 为当前shuffle的父shuffle都生成一个ShuffleMapStage          shuffleToMapStage(dep.shuffleId) = newOrUsedShuffleStage(dep, firstJobId)        }        // Then register current shuffleDep        val stage = newOrUsedShuffleStage(shuffleDep, firstJobId) // 为当前shuffle生成一个ShuffleMapStage        shuffleToMapStage(shuffleDep.shuffleId) = stage // 更新Shuffle和Stage的映射关系        stage    }  }

 

 

DAGScheduler.newOrUsedShuffleStage

private def newOrUsedShuffleStage(      shuffleDep: ShuffleDependency[_, _, _],      firstJobId: Int): ShuffleMapStage = {    val rdd = shuffleDep.rdd    val numTasks = rdd.partitions.length // 根据当前rdd的paritions个数,计算出当前Stage的task个数。    // 为当前rdd生成ShuffleMapStage    val stage = newShuffleMapStage(rdd, numTasks, shuffleDep, firstJobId, rdd.creationSite)     if (mapOutputTracker.containsShuffle(shuffleDep.shuffleId)) {       // 如果当前shuffle已经在MapOutputTracker中注册过      val serLocs = mapOutputTracker.getSerializedMapOutputStatuses(shuffleDep.shuffleId)      val locs = MapOutputTracker.deserializeMapStatuses(serLocs)      (0 until locs.length).foreach { i => // 更新Shuffle的Shuffle Write路径        if (locs(i) ne null) {          // locs(i) will be null if missing          stage.addOutputLoc(i, locs(i))        }      }    } else { // 如果当前Shuffle没有在MapOutputTracker中注册过      // Kind of ugly: need to register RDDs with the cache and map output tracker here      // since we can't do it in the RDD constructor because # of partitions is unknown      logInfo("Registering RDD " + rdd.id + " (" + rdd.getCreationSite + ")")      mapOutputTracker.registerShuffle(shuffleDep.shuffleId, rdd.partitions.length) // 注册    }    stage  }

 

DAGScheduler#newShuffleMapStage

最后生成shufflemapstage

private def newShuffleMapStage(      rdd: RDD[_],      numTasks: Int,      shuffleDep: ShuffleDependency[_, _, _],      firstJobId: Int,      callSite: CallSite): ShuffleMapStage = {    // 获取当前rdd的父Stage和stageId    val (parentStages: List[Stage], id: Int) = getParentStagesAndId(rdd, firstJobId)     // 生成新的ShuffleMapStage    val stage: ShuffleMapStage = new ShuffleMapStage(id, rdd, numTasks, parentStages,       firstJobId, callSite, shuffleDep)    stageIdToStage(id) = stage // 将ShuffleMapStage与stageId相关联    updateJobIdStageIdMaps(firstJobId, stage) // 更新该job中包含的stage    stage  }

 

 这样就构成一个DAG的图。再将stage进行提交。

stage的提交顺序由DAG生成的stage依赖决定,同时在stage下划分的task是由rdd 的 partitions 来决定的。

 

下次在分析一下taskscheduler是如何分配任务的。

 

参考资料:https://blog.csdn.net/dabokele/article/details/51902617

转载于:https://www.cnblogs.com/yankang/p/9771778.html

你可能感兴趣的文章
【Sort】希尔排序
查看>>
机器人关节数学模型
查看>>
解决无法wifi上网的问题
查看>>
uvalive 5731 Qin Shi Huang’s National Road System
查看>>
SULLEY安装与使用
查看>>
洛谷 1144 最短路计数 bfs
查看>>
C++ 单例模式
查看>>
C++ 我想这样用(四)
查看>>
T-2-java面向对象
查看>>
URL重定向及跳转漏洞
查看>>
springboot使用fastjson中文乱码解决方法 【转载】
查看>>
第一次项目上Linux服务器(四:CentOS6下Mysql数据库的安装与配置(转))
查看>>
Java基础——网络编程(二)
查看>>
读书笔记-1 《人月神话》
查看>>
Scrum冲刺阶段6
查看>>
OpenStack neutron删除网络设备出错解决办法
查看>>
[源码和文档分享]基于JSP同城校友网的设计与实现
查看>>
导弹拦截
查看>>
【模板】树状数组 1
查看>>
idea配置の隐藏参数
查看>>