[SPARK-56499][CORE] Deduplicate RDD graph BFS traversal pattern in DAGScheduler#55361
[SPARK-56499][CORE] Deduplicate RDD graph BFS traversal pattern in DAGScheduler#55361jiangxb1987 wants to merge 2 commits intoapache:masterfrom
Conversation
Ngone51
left a comment
There was a problem hiding this comment.
Nice cleanup! LGTM except one minior comment.
| val ancestors = new ListBuffer[ShuffleDependency[_, _, _]] | ||
| /** | ||
| * Traverses the RDD dependency graph using a manually maintained stack to prevent | ||
| * StackOverflowError caused by recursive traversal. For each unvisited RDD, calls |
There was a problem hiding this comment.
Maybe we should move the comment "Traverses the RDD dependency graph using a manually maintained stack to prevent StackOverflowError caused by recursive traversal." to traverseRDDGraphUntil().
| true | ||
| } | ||
| if (rddHasUncachedPartitions) { | ||
| for (dep <- rdd.dependencies) { |
There was a problem hiding this comment.
it will loop over all the dependencies, regardless the path has been visited before
There was a problem hiding this comment.
This matches the original behavior, I don't think we should change it in this PR.
cloud-fan
left a comment
There was a problem hiding this comment.
Clean refactor: introduces two local helpers (traverseRDDGraph / traverseRDDGraphUntil) that factor out the iterative-stack traversal pattern duplicated across six DAGScheduler methods. Traced each call site — semantics are preserved, including the visited-set discipline and the final return value of traverseParentRDDsWithinStage. stageDependsOn additionally gains early termination (a perf improvement over walking the full reachable set). All changes are private to DAGScheduler; no public-API surface change; existing DAGSchedulerSuite coverage is appropriate.
LGTM with two minor suggestions inline.
| true | ||
| } | ||
|
|
||
| /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ |
There was a problem hiding this comment.
The field is shuffleIdToMapStage (line 160). Since this comment is re-added in the diff, worth fixing the stale name while we're here.
| /** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */ | |
| /** Find ancestor shuffle dependencies that are not registered in shuffleIdToMapStage yet */ |
| def visit(rdd: RDD[_]): Unit = { | ||
| if (!visitedRdds(rdd)) { | ||
| visitedRdds += rdd | ||
| var found = false |
There was a problem hiding this comment.
Minor: traverseRDDGraphUntil already returns false iff the visitor terminated early, so we can drop the var found and return the negation of the helper directly — matching the shape of traverseParentRDDsWithinStage which returns the helper's result directly:
!traverseRDDGraphUntil(stage.rdd) { (rdd, enqueue) =>
if (rdd == target.rdd) {
false
} else {
for (dep <- rdd.dependencies) {
dep match {
case shufDep: ShuffleDependency[_, _, _] =>
val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
if (!mapStage.isAvailable) {
enqueue(mapStage.rdd)
} // Otherwise there's no need to follow the dependency back
case narrowDep: NarrowDependency[_] =>
enqueue(narrowDep.rdd)
}
}
true
}
}
What changes were proposed in this pull request?
DAGScheduler:traverseRDDGraph(rdd)(visitor)— traverses the RDD dependency graph using an explicit stack, callingvisitor(rdd, enqueue)for each unvisited RDD.traverseRDDGraphUntil(rdd)(visitor)— like above but supports early termination: the visitor returnsfalseto stop traversal; returns whether traversal completed.visited + waitingForVisitboilerplate:getMissingAncestorShuffleDependenciesgetShuffleDependenciesAndResourceProfilestraverseParentRDDsWithinStagegetMissingParentStageseagerlyComputePartitionsForRddAndAncestorsstageDependsOnWhy are the changes needed?
The same BFS traversal pattern (maintain a
visitedset and awaitingForVisitstack, remove-and-check-visited in a loop) was duplicated across many methods inDAGScheduler, adding unnecessary boilerplate and making the code harder to read and maintain.Does this PR introduce any user-facing change?
No.
How was this patch tested?
Covered by existing
DAGSchedulerSuitetests. No behavior changes were made.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude (claude-sonnet-4-6)