Skip to content

feat: memory-budget-aware SortMergeJoin to ShuffledHashJoin rewrite#4186

Draft
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:rewrite-join-heuristic
Draft

feat: memory-budget-aware SortMergeJoin to ShuffledHashJoin rewrite#4186
andygrove wants to merge 1 commit intoapache:mainfrom
andygrove:rewrite-join-heuristic

Conversation

@andygrove
Copy link
Copy Markdown
Member

Which issue does this PR close?

Closes #.

Rationale for this change

Forcing every SortMergeJoinExec to be rewritten as ShuffledHashJoinExec
(via spark.comet.exec.replaceSortMergeJoin=true) can OOM on large joins
because Comet's native HashJoinExec cannot spill its hash table. The rule
previously had no size-based safety net, so enabling it on queries with
multi-GB build sides (e.g. TPC-H q9's lineitem joins) aborted the stage.

What changes are included in this PR?

  • RewriteJoin consults a per-join-side build-size budget before replacing
    a SortMergeJoinExec with a ShuffledHashJoinExec. Joins whose build
    side stats.sizeInBytes exceeds the budget are kept as SMJ.
  • The budget is either explicit (maxBuildSize) or derived from Spark conf:
    offHeap.size / executor.cores * memoryFraction / hashTableOverhead.
  • Three new configs under spark.comet.exec.replaceSortMergeJoin.*:
    maxBuildSize (absolute cap, 0 = auto-derive, -1 = disable),
    memoryFraction (default 0.25), and hashTableOverhead (default 3.0).
  • Rule rejections emit a withInfo message naming the sizes and configs so
    users can see why a join was not rewritten.

How are these changes tested?

  • New RewriteJoinSuite covers the budget computation: explicit-value,
    auto-derive, disable-check, and scaling with memoryFraction /
    hashTableOverhead.
  • End-to-end rewrite correctness is already covered by CometJoinSuite and
    CometExecSuite, which were rerun locally with replaceSortMergeJoin=true
    and the new defaults, all passing.
  • RewriteJoinSuite added to the exec suite lists in the Linux and macOS
    PR build workflows.

The RewriteJoin rule now gates the SMJ-to-ShuffledHashJoin rewrite on a
per-join-side build-size budget derived from
  spark.memory.offHeap.size / spark.executor.cores
scaled by a memoryFraction (default 0.25) and a hashTableOverhead (default
3.0). Joins whose build side stats.sizeInBytes exceeds the budget are left
as SortMergeJoin instead of being rewritten, avoiding the non-spillable
HashJoinExec OOM that was previously reachable on large joins (e.g. TPC-H
q9's lineitem joins).

Three new configs:
- spark.comet.exec.replaceSortMergeJoin.maxBuildSize: absolute cap, or 0
  for auto-derive (default), or -1 to disable the check entirely.
- spark.comet.exec.replaceSortMergeJoin.memoryFraction: fraction of the
  per-task off-heap share allowed for one hash-join build (default 0.25).
- spark.comet.exec.replaceSortMergeJoin.hashTableOverhead: multiplier
  over raw build bytes to estimate hash-table memory (default 3.0).

Rule rejections emit a withInfo explanation naming the sizes and the
configs so users can debug why a join wasn't rewritten.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant