feat: memory-budget-aware SortMergeJoin to ShuffledHashJoin rewrite#4186
Draft
andygrove wants to merge 1 commit intoapache:mainfrom
Draft
feat: memory-budget-aware SortMergeJoin to ShuffledHashJoin rewrite#4186andygrove wants to merge 1 commit intoapache:mainfrom
andygrove wants to merge 1 commit intoapache:mainfrom
Conversation
The RewriteJoin rule now gates the SMJ-to-ShuffledHashJoin rewrite on a per-join-side build-size budget derived from spark.memory.offHeap.size / spark.executor.cores scaled by a memoryFraction (default 0.25) and a hashTableOverhead (default 3.0). Joins whose build side stats.sizeInBytes exceeds the budget are left as SortMergeJoin instead of being rewritten, avoiding the non-spillable HashJoinExec OOM that was previously reachable on large joins (e.g. TPC-H q9's lineitem joins). Three new configs: - spark.comet.exec.replaceSortMergeJoin.maxBuildSize: absolute cap, or 0 for auto-derive (default), or -1 to disable the check entirely. - spark.comet.exec.replaceSortMergeJoin.memoryFraction: fraction of the per-task off-heap share allowed for one hash-join build (default 0.25). - spark.comet.exec.replaceSortMergeJoin.hashTableOverhead: multiplier over raw build bytes to estimate hash-table memory (default 3.0). Rule rejections emit a withInfo explanation naming the sizes and the configs so users can debug why a join wasn't rewritten.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
Forcing every
SortMergeJoinExecto be rewritten asShuffledHashJoinExec(via
spark.comet.exec.replaceSortMergeJoin=true) can OOM on large joinsbecause Comet's native
HashJoinExeccannot spill its hash table. The rulepreviously had no size-based safety net, so enabling it on queries with
multi-GB build sides (e.g. TPC-H q9's
lineitemjoins) aborted the stage.What changes are included in this PR?
RewriteJoinconsults a per-join-side build-size budget before replacinga
SortMergeJoinExecwith aShuffledHashJoinExec. Joins whose buildside
stats.sizeInBytesexceeds the budget are kept as SMJ.maxBuildSize) or derived from Spark conf:offHeap.size / executor.cores * memoryFraction / hashTableOverhead.spark.comet.exec.replaceSortMergeJoin.*:maxBuildSize(absolute cap,0= auto-derive,-1= disable),memoryFraction(default0.25), andhashTableOverhead(default3.0).withInfomessage naming the sizes and configs sousers can see why a join was not rewritten.
How are these changes tested?
RewriteJoinSuitecovers the budget computation: explicit-value,auto-derive, disable-check, and scaling with
memoryFraction/hashTableOverhead.CometJoinSuiteandCometExecSuite, which were rerun locally withreplaceSortMergeJoin=trueand the new defaults, all passing.
RewriteJoinSuiteadded to theexecsuite lists in the Linux and macOSPR build workflows.