Skip to content

[SPARK-56498][SS] Implement decoupled state store maintenance#56793

Open
liviazhu wants to merge 5 commits into
apache:masterfrom
liviazhu:liviazhu-db/state-store-decoupled-maintenance
Open

[SPARK-56498][SS] Implement decoupled state store maintenance#56793
liviazhu wants to merge 5 commits into
apache:masterfrom
liviazhu:liviazhu-db/state-store-decoupled-maintenance

Conversation

@liviazhu

@liviazhu liviazhu commented Jun 25, 2026

Copy link
Copy Markdown
Contributor

What changes were proposed in this pull request?

This redesigns state store maintenance so that snapshotting and cleanup run as independent operations that cannot starve each other, and so that closing a provider cannot race in-flight maintenance.

Maintenance is split into two operations on StateStoreProviderdoSnapshotMaintenance() and doCleanupMaintenance() (with doMaintenance() retained as the combination of the two) — implemented for the HDFS and RocksDB providers. The maintenance scheduler submits these as separate tasks per provider, each tracked by its own partition set (snapshotPartitions / cleanupPartitions) so that concurrency is bounded per provider per operation type. The two operation types run on separate thread pools — a high-priority snapshot pool and a low-priority cleanup pool — sized from the total maintenance thread count by a new snapshotToCleanupThreadRatio config, so a saturated pool of one type cannot block the other.

Each provider carries a fair ReentrantReadWriteLock. Maintenance operations hold the read lock and close holds the write lock, so close waits for any in-flight maintenance to finish, and a maintenance task skips itself (without blocking) when a close is already in progress.

Providers that need to be unloaded are placed on a queue (unloadedProvidersToClose). Each queue entry is tagged with the maintenance still required before close, and the query thread triggers an immediate scheduler cycle (MaintenanceTask.triggerNow) when it queues a provider, so unload happens promptly instead of waiting for the next periodic maintenance tick.

Why are the changes needed?

The previous maintenance model ran snapshotting and cleanup as a single operation on a shared thread pool, so a slow or blocked operation of one kind could starve the other, and close could race with in-flight maintenance on the same provider. Decoupling the two operations, bounding concurrency per provider, isolating them on separate pools, and guarding close with a read/write lock addresses these issues.

Does this PR introduce any user-facing change?

Yes, two internal configurations change:

  • New spark.sql.streaming.stateStore.snapshotToCleanupThreadRatio (default 0.5): the fraction of maintenance threads allocated to the snapshot pool; the remainder go to the cleanup pool.
  • spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads is now the total split across both pools, with a minimum of 2 (previously 1); its default changes from max(cores / 4, 1) to max(cores / 4, 2).

Both are internal configuration knobs; there is no change to query results.

How was this patch tested?

  • New StateStoreDecoupledMaintenanceSuite (HDFS and RocksDB variants) covering the split operations, the decoupled scheduler, the RW lock, the dual pools, triggerNow, and the full queue-to-close lifecycle.
  • Updated StateStoreSuite, StateStoreInstanceMetricSuite, and RocksDBSuite.
  • sql/core compiles and the suites above pass locally.

Was this patch authored or co-authored using generative AI tooling?

Generated-by: Claude Code (Opus 4.8)

This pull request and its description were written by Isaac.

liviazhu added 5 commits June 25, 2026 18:04
… operations

Prep work for decoupled state store maintenance:
- Introduce a MaintenanceOpRequest enum (All/Snapshot/Cleanup) used to tag
  entries in the unloadedProvidersToClose queue, which is widened from a
  (id, provider) pair to a (id, provider, opRequest) triple.
- Split doMaintenance into doSnapshotMaintenance and doCleanupMaintenance in the
  StateStoreProvider trait, HDFSBackedStateStoreProvider, RocksDBStateStoreProvider,
  and RocksDB, so snapshot upload and old-file cleanup can be invoked independently.
  doMaintenance now calls both, preserving existing behavior.

Ported from databricks-eng/runtime#200656 (SC-221356).

Co-authored-by: Isaac
…anup tasks

Core of decoupled state store maintenance, building on the split maintenance
operations:
- Submit separate snapshot and cleanup tasks per provider, tracked by two
  independent partition sets (snapshotPartitions/cleanupPartitions) so the two
  operation types never block each other.
- Route the unloadedProvidersToClose queue by MaintenanceOpRequest: All submits
  one op as FromTaskThread which re-queues the other op; Snapshot/Cleanup submit
  that op as FromUnloadedProvidersQueue and close the provider afterward.
- Rewrite the pool task body around the three source paths, replacing the
  await/timeout (awaitProcessThisPartition) and single maintenancePartitions set
  with tryClaimPartition/tryClaimAndSubmit.
- Have the query thread queue providers for close instead of submitting directly.
- Add an `unloaded` flag (setUnloaded) so maintenance skips providers being torn
  down; split removeFromLoadedProvidersAndClose into closeProvider + remove-by-key.

Ported from databricks-eng/runtime#201011 (SC-225784).

Co-authored-by: Isaac
…tate store maintenance

Builds on the decoupled maintenance scheduler:
- Add a per-provider fair ReentrantReadWriteLock (maintenanceLock). Maintenance ops hold the
  read lock; close holds the write lock, so close waits for in-flight maintenance and a
  maintenance op skips (tryLock with zero timeout) when a close is in progress.
- Split the single maintenance thread pool into separate snapshot (high-priority) and cleanup
  (low-priority) pools, sized via snapshotToCleanupThreadRatio (getPoolSizes), so one operation
  type cannot starve the other. numStateStoreMaintenanceThreads is now a minimum of 2.
- Add MaintenanceTask.triggerNow (at-most-one pending, processUnloadedOnly) and have the query
  thread trigger an immediate scheduler cycle when it queues a provider for unload, instead of
  waiting for the next periodic tick.

Move the decoupled-maintenance tests out of StateStoreSuite into a new
StateStoreDecoupledMaintenanceSuite and add tests for the RW lock, dual pools, and triggerNow.

Ported from databricks-eng/runtime#201749 (SC-225785).

Co-authored-by: Isaac
… post-op behavior

The FromTaskThread maintenance source is no longer needed now that the query thread queues
providers instead of submitting directly. Replace it with an explicit nextOp on the submitted
task:
- Remove MaintenanceTaskType.FromTaskThread.
- Add nextOp: Option[MaintenanceOpRequest] to submitMaintenanceWorkForProvider/tryClaimAndSubmit
  (only meaningful for FromUnloadedProvidersQueue). For an All request, the first op is submitted
  as FromUnloadedProvidersQueue with nextOp = Some(otherOp); on completion the pool thread
  enqueues the remaining op with nextOp = None, which closes the provider after it runs.

Ported from databricks-eng/runtime#209771 (SC-226849).

Co-authored-by: Isaac
…ource design

Match the comment wording/wrapping and the StateStoreConf docstrings to the original design, and
factor the duplicated otherMaintenanceOpRequest(opType) call in the FromLoadedProviders unload
path into a local `remaining` val. No behavior change.

Co-authored-by: Isaac
val STATE_STORE_MAINTENANCE_SNAPSHOT_THREAD_RATIO =
buildConf("spark.sql.streaming.stateStore.snapshotToCleanupThreadRatio")
.internal()
.doc("Ratio of total maintenance threads allocated to the snapshot " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add config .version

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants