Skip to content

[SPARK-56499][CORE] Deduplicate RDD graph BFS traversal pattern in DAGScheduler#55361

Open
jiangxb1987 wants to merge 2 commits intoapache:masterfrom
jiangxb1987:SPARK-56499
Open

[SPARK-56499][CORE] Deduplicate RDD graph BFS traversal pattern in DAGScheduler#55361
jiangxb1987 wants to merge 2 commits intoapache:masterfrom
jiangxb1987:SPARK-56499

Conversation

@jiangxb1987
Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

  • Introduced two private helper methods in DAGScheduler:
    • traverseRDDGraph(rdd)(visitor) — traverses the RDD dependency graph using an explicit stack, calling visitor(rdd, enqueue) for each unvisited RDD.
    • traverseRDDGraphUntil(rdd)(visitor) — like above but supports early termination: the visitor returns false to stop traversal; returns whether traversal completed.
  • Refactored 6 methods to use these helpers, eliminating duplicated visited + waitingForVisit boilerplate:
    • getMissingAncestorShuffleDependencies
    • getShuffleDependenciesAndResourceProfiles
    • traverseParentRDDsWithinStage
    • getMissingParentStages
    • eagerlyComputePartitionsForRddAndAncestors
    • stageDependsOn

Why are the changes needed?

The same BFS traversal pattern (maintain a visited set and a waitingForVisit stack, remove-and-check-visited in a loop) was duplicated across many methods in DAGScheduler, adding unnecessary boilerplate and making the code harder to read and maintain.

Does this PR introduce any user-facing change?

No.

How was this patch tested?

Covered by existing DAGSchedulerSuite tests. No behavior changes were made.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude (claude-sonnet-4-6)

Copy link
Copy Markdown
Member

@Ngone51 Ngone51 left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Nice cleanup! LGTM except one minior comment.

val ancestors = new ListBuffer[ShuffleDependency[_, _, _]]
/**
* Traverses the RDD dependency graph using a manually maintained stack to prevent
* StackOverflowError caused by recursive traversal. For each unvisited RDD, calls
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Maybe we should move the comment "Traverses the RDD dependency graph using a manually maintained stack to prevent StackOverflowError caused by recursive traversal." to traverseRDDGraphUntil().

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

updated

true
}
if (rddHasUncachedPartitions) {
for (dep <- rdd.dependencies) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

it will loop over all the dependencies, regardless the path has been visited before

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This matches the original behavior, I don't think we should change it in this PR.

Copy link
Copy Markdown
Contributor

@cloud-fan cloud-fan left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Clean refactor: introduces two local helpers (traverseRDDGraph / traverseRDDGraphUntil) that factor out the iterative-stack traversal pattern duplicated across six DAGScheduler methods. Traced each call site — semantics are preserved, including the visited-set discipline and the final return value of traverseParentRDDsWithinStage. stageDependsOn additionally gains early termination (a perf improvement over walking the full reachable set). All changes are private to DAGScheduler; no public-API surface change; existing DAGSchedulerSuite coverage is appropriate.

LGTM with two minor suggestions inline.

true
}

/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The field is shuffleIdToMapStage (line 160). Since this comment is re-added in the diff, worth fixing the stale name while we're here.

Suggested change
/** Find ancestor shuffle dependencies that are not registered in shuffleToMapStage yet */
/** Find ancestor shuffle dependencies that are not registered in shuffleIdToMapStage yet */

def visit(rdd: RDD[_]): Unit = {
if (!visitedRdds(rdd)) {
visitedRdds += rdd
var found = false
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Minor: traverseRDDGraphUntil already returns false iff the visitor terminated early, so we can drop the var found and return the negation of the helper directly — matching the shape of traverseParentRDDsWithinStage which returns the helper's result directly:

!traverseRDDGraphUntil(stage.rdd) { (rdd, enqueue) =>
  if (rdd == target.rdd) {
    false
  } else {
    for (dep <- rdd.dependencies) {
      dep match {
        case shufDep: ShuffleDependency[_, _, _] =>
          val mapStage = getOrCreateShuffleMapStage(shufDep, stage.firstJobId)
          if (!mapStage.isAvailable) {
            enqueue(mapStage.rdd)
          }  // Otherwise there's no need to follow the dependency back
        case narrowDep: NarrowDependency[_] =>
          enqueue(narrowDep.rdd)
      }
    }
    true
  }
}

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

4 participants