refactor: introduce CometPlanner to replace CometScanRule and CometExecRule#4201
Draft
mbutrovich wants to merge 5 commits intoapache:mainfrom
Draft
refactor: introduce CometPlanner to replace CometScanRule and CometExecRule#4201mbutrovich wants to merge 5 commits intoapache:mainfrom
mbutrovich wants to merge 5 commits intoapache:mainfrom
Conversation
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Which issue does this PR close?
Closes #.
Rationale for this change
CometScanRuleandCometExecRuletogether do nine implicit phases coordinated through plan-tree wrappers (CometScanWrapper,CometSinkPlaceHolder), tag strings scattered across companion objects, retry-on-self recursion for broadcast convertibility, and arevertRedundantColumnarShufflepost-pass that exists only because the main rule decides convertibility before knowing whether parents and children will follow. The split also means scan validation lives in one rule, conversion in another, and aCometScanExecwrapper carries state between them. Hard to reason about, harder to extend.This PR replaces both rules with a single three-phase rule (
CometPlanner) and moves coordination state from wrapper plan nodes ontoTreeNodeTags.Why three phases solves the redundant-transition problem
The old rule walks bottom-up and commits per node based on local information. When it sees a
ShuffleExchangeExec, it asks "is the shuffle serde happy?". If yes, it converts toCometShuffleExchangeExecimmediately. Later, when it visits the parentHashAggregate, it might decide the aggregate cannot convert. The plan now readsSpark HashAgg → CometShuffle → Spark HashAgg: a columnar shuffle producing batches that get converted back to rows at both ends. PR #4010 addedrevertRedundantColumnarShuffleas a post-pass to detect that pattern and revert. That approach is whack-a-mole: every speculative conversion that does not pan out needs its own revert pass, and the broadcast path already had similar retry-on-self recursion to guess whether a parent BHJ would convert before deciding to convert the broadcast.The three phases remove the speculation:
selfLikely=true, parentLikely=false, childLikely=false→Passthrough. The conversion never happens, so there is nothing to revert.Convert.Same machinery catches the broadcast-without-Comet-consumer case (now a
BroadcastConsumerIndexbuilt once during Phase 1) and the spark-to-columnar-without-consumer case (Phase 2's S2C rule gates on parent LIKELY_COMET). Three patterns that previously required either a post-pass or a recursive pre-check now live in one place.Why this lets us delete the placeholder / wrapper classes
The placeholder classes existed because the old rule needed in-tree carriers for state during iteration:
CometScanExec: inter-rule carrier.CometScanRuledecides which kind of native scan and stampsscanImpl;CometExecRulereads that and producesCometNativeScanExec. AfterSCAN_NATIVE_ICEBERG_COMPATremoval,CometScanExechas zero execution-time consumers.CometBatchScanExec: same idea for V2 scans. CometPlanner stashes Iceberg metadata on theICEBERG_METADATAtag and Phase 3 builds the finalCometIcebergNativeScanExecdirectly from the rawBatchScanExec.CometSinkPlaceHolderandCometScanWrapper: serdecreateExecreturns these when the operator is JVM-orchestrated (CometShuffleExchangeExec,CometBroadcastExchangeExec,CometCollectLimitExec, S2C wrappers). The placeholder carries the protobuf so a parent can wire it as a child, then a separate post-pass strips the placeholder. CometPlanner'srunSerdeunwraps inline at emit time and writes the protobuf to theNATIVE_OPtag. No placeholder, no strip pass.What changes are included in this PR?
New
org.apache.comet.plannerpackage:CometPlanner(the rule). Pipeline:prePass(NaN/zero normalization, RewriteJoin) → Phase 1 → Phase 2 → Phase 3 →revertOrphanedBroadcasts→cleanupLogicalLinks→convertBlocks→postPass(subquery broadcast rewrite).phases/: the five passes above.gates/:V1ScanGate,V2ScanClassifier,S2CGate. Per-node classifiers returning ADTs.tags/CometTags.scala: formal vocabulary for the cross-phase tags (LIKELY_COMET,DECISION,COMET_CONVERTED,NATIVE_OP,ICEBERG_METADATA).PlanningContext.scala: state threaded through phases plusBroadcastConsumerIndex.Integration:
spark.comet.planner.enabled(defaulttrue) selects which one runs. Each rule asserts on the flag at entry to surface drift.CometNativeScanExecno longer holds a@transient CometScanExecfield. File partitions come fromoriginalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions. TheCometNativeScanserde now operates onFileSourceScanExecdirectly.CometShuffleExchangeExec.isCometPlanalso recognizes theNATIVE_OPtag, so JVM-orchestrated Comet ops withoutCometPlanmembership still register as native children.CometPlanAdaptiveDynamicPruningFiltersupdated to convertoriginalPlan.partitionFiltersinstead of the removedscan.partitionFilters(same dual-filter pattern, different source).Two non-obvious mechanics worth flagging during review:
cleanupLogicalLinks: explicitly unsetsLOGICAL_PLAN_TAGandLOGICAL_PLAN_INHERITED_TAGon Comet ops whose original Spark op had no logical link. Without this,setLogicalLinkpropagation leaves stale inherited links on exchanges, AQE'sreplaceWithQueryStagesInLogicalPlanpicks the wrong physical match, and a previously-emittedCometSortMergeJoinsurvives a re-plan that should have replaced it. Mirrors the legacy "Set up logical links" pass.convertBlockson the skip path: whenCOMET_EXEC_ENABLED=falseor the root is alreadyCOMET_CONVERTED, the planner still runsconvertBlocks. AQE re-planning can graft a previously-emittedCometNativeExecinterior node (with noSerializedPlan) into a freshly Spark-planned outer plan, and that node needs a serialized plan to execute as the new top of its block.Out of scope, follow-up PRs in dependency order:
CometScanRule+CometExecRuleafter a stabilization period.native_iceberg_compat+SCAN_AUTO+COMET_NATIVE_SCAN_IMPL.CometScanExecandCometBatchScanExec(dead after step 2).CometScanWrapper/CometSinkPlaceHolder(no producer left after step 1).How are these changes tested?
Existing tests.
Rollback is
--conf spark.comet.planner.enabled=false, which routes to the originalCometScanRule+CometExecRulepath unchanged.