Skip to content

refactor: introduce CometPlanner to replace CometScanRule and CometExecRule#4201

Draft
mbutrovich wants to merge 5 commits intoapache:mainfrom
mbutrovich:cometplanner
Draft

refactor: introduce CometPlanner to replace CometScanRule and CometExecRule#4201
mbutrovich wants to merge 5 commits intoapache:mainfrom
mbutrovich:cometplanner

Conversation

@mbutrovich
Copy link
Copy Markdown
Contributor

@mbutrovich mbutrovich commented May 4, 2026

Which issue does this PR close?

Closes #.

Rationale for this change

CometScanRule and CometExecRule together do nine implicit phases coordinated through plan-tree wrappers (CometScanWrapper, CometSinkPlaceHolder), tag strings scattered across companion objects, retry-on-self recursion for broadcast convertibility, and a revertRedundantColumnarShuffle post-pass that exists only because the main rule decides convertibility before knowing whether parents and children will follow. The split also means scan validation lives in one rule, conversion in another, and a CometScanExec wrapper carries state between them. Hard to reason about, harder to extend.

This PR replaces both rules with a single three-phase rule (CometPlanner) and moves coordination state from wrapper plan nodes onto TreeNodeTags.

Why three phases solves the redundant-transition problem

The old rule walks bottom-up and commits per node based on local information. When it sees a ShuffleExchangeExec, it asks "is the shuffle serde happy?". If yes, it converts to CometShuffleExchangeExec immediately. Later, when it visits the parent HashAggregate, it might decide the aggregate cannot convert. The plan now reads Spark HashAgg → CometShuffle → Spark HashAgg: a columnar shuffle producing batches that get converted back to rows at both ends. PR #4010 added revertRedundantColumnarShuffle as a post-pass to detect that pattern and revert. That approach is whack-a-mole: every speculative conversion that does not pan out needs its own revert pass, and the broadcast path already had similar retry-on-self recursion to guess whether a parent BHJ would convert before deciding to convert the broadcast.

The three phases remove the speculation:

  • Phase 1 (LIKELY_COMET) asks the per-node question once, in isolation: "would the serde accept this op if we ignored what its parents and children do?". Pure tagging, no commitment.
  • Phase 2 (DECISION) walks with both parent and child predictions in hand. The shuffle rule reads "convert iff parent LIKELY_COMET or any child LIKELY_COMET". A shuffle between two JVM aggregates is selfLikely=true, parentLikely=false, childLikely=falsePassthrough. The conversion never happens, so there is nothing to revert.
  • Phase 3 (Emit) mutates the plan only where Phase 2 said Convert.

Same machinery catches the broadcast-without-Comet-consumer case (now a BroadcastConsumerIndex built once during Phase 1) and the spark-to-columnar-without-consumer case (Phase 2's S2C rule gates on parent LIKELY_COMET). Three patterns that previously required either a post-pass or a recursive pre-check now live in one place.

Why this lets us delete the placeholder / wrapper classes

The placeholder classes existed because the old rule needed in-tree carriers for state during iteration:

  • CometScanExec: inter-rule carrier. CometScanRule decides which kind of native scan and stamps scanImpl; CometExecRule reads that and produces CometNativeScanExec. After SCAN_NATIVE_ICEBERG_COMPAT removal, CometScanExec has zero execution-time consumers.
  • CometBatchScanExec: same idea for V2 scans. CometPlanner stashes Iceberg metadata on the ICEBERG_METADATA tag and Phase 3 builds the final CometIcebergNativeScanExec directly from the raw BatchScanExec.
  • CometSinkPlaceHolder and CometScanWrapper: serde createExec returns these when the operator is JVM-orchestrated (CometShuffleExchangeExec, CometBroadcastExchangeExec, CometCollectLimitExec, S2C wrappers). The placeholder carries the protobuf so a parent can wire it as a child, then a separate post-pass strips the placeholder. CometPlanner's runSerde unwraps inline at emit time and writes the protobuf to the NATIVE_OP tag. No placeholder, no strip pass.

What changes are included in this PR?

New org.apache.comet.planner package:

  • CometPlanner (the rule). Pipeline: prePass (NaN/zero normalization, RewriteJoin) → Phase 1 → Phase 2 → Phase 3 → revertOrphanedBroadcastscleanupLogicalLinksconvertBlockspostPass (subquery broadcast rewrite).
  • phases/: the five passes above.
  • gates/: V1ScanGate, V2ScanClassifier, S2CGate. Per-node classifiers returning ADTs.
  • tags/CometTags.scala: formal vocabulary for the cross-phase tags (LIKELY_COMET, DECISION, COMET_CONVERTED, NATIVE_OP, ICEBERG_METADATA).
  • PlanningContext.scala: state threaded through phases plus BroadcastConsumerIndex.

Integration:

  • Both rule paths are registered; spark.comet.planner.enabled (default true) selects which one runs. Each rule asserts on the flag at entry to surface drift.
  • CometNativeScanExec no longer holds a @transient CometScanExec field. File partitions come from originalPlan.inputRDD.asInstanceOf[FileScanRDD].filePartitions. The CometNativeScan serde now operates on FileSourceScanExec directly.
  • CometShuffleExchangeExec.isCometPlan also recognizes the NATIVE_OP tag, so JVM-orchestrated Comet ops without CometPlan membership still register as native children.
  • CometPlanAdaptiveDynamicPruningFilters updated to convert originalPlan.partitionFilters instead of the removed scan.partitionFilters (same dual-filter pattern, different source).

Two non-obvious mechanics worth flagging during review:

  • cleanupLogicalLinks: explicitly unsets LOGICAL_PLAN_TAG and LOGICAL_PLAN_INHERITED_TAG on Comet ops whose original Spark op had no logical link. Without this, setLogicalLink propagation leaves stale inherited links on exchanges, AQE's replaceWithQueryStagesInLogicalPlan picks the wrong physical match, and a previously-emitted CometSortMergeJoin survives a re-plan that should have replaced it. Mirrors the legacy "Set up logical links" pass.
  • convertBlocks on the skip path: when COMET_EXEC_ENABLED=false or the root is already COMET_CONVERTED, the planner still runs convertBlocks. AQE re-planning can graft a previously-emitted CometNativeExec interior node (with no SerializedPlan) into a freshly Spark-planned outer plan, and that node needs a serialized plan to execute as the new top of its block.

Out of scope, follow-up PRs in dependency order:

  1. Delete CometScanRule + CometExecRule after a stabilization period.
  2. Remove native_iceberg_compat + SCAN_AUTO + COMET_NATIVE_SCAN_IMPL.
  3. Delete CometScanExec and CometBatchScanExec (dead after step 2).
  4. Remove CometScanWrapper / CometSinkPlaceHolder (no producer left after step 1).

How are these changes tested?

Existing tests.

Rollback is --conf spark.comet.planner.enabled=false, which routes to the original CometScanRule + CometExecRule path unchanged.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant