[SPARK-56498][SS] Implement decoupled state store maintenance#56793
Open
liviazhu wants to merge 5 commits into
Open
[SPARK-56498][SS] Implement decoupled state store maintenance#56793liviazhu wants to merge 5 commits into
liviazhu wants to merge 5 commits into
Conversation
… operations Prep work for decoupled state store maintenance: - Introduce a MaintenanceOpRequest enum (All/Snapshot/Cleanup) used to tag entries in the unloadedProvidersToClose queue, which is widened from a (id, provider) pair to a (id, provider, opRequest) triple. - Split doMaintenance into doSnapshotMaintenance and doCleanupMaintenance in the StateStoreProvider trait, HDFSBackedStateStoreProvider, RocksDBStateStoreProvider, and RocksDB, so snapshot upload and old-file cleanup can be invoked independently. doMaintenance now calls both, preserving existing behavior. Ported from databricks-eng/runtime#200656 (SC-221356). Co-authored-by: Isaac
…anup tasks Core of decoupled state store maintenance, building on the split maintenance operations: - Submit separate snapshot and cleanup tasks per provider, tracked by two independent partition sets (snapshotPartitions/cleanupPartitions) so the two operation types never block each other. - Route the unloadedProvidersToClose queue by MaintenanceOpRequest: All submits one op as FromTaskThread which re-queues the other op; Snapshot/Cleanup submit that op as FromUnloadedProvidersQueue and close the provider afterward. - Rewrite the pool task body around the three source paths, replacing the await/timeout (awaitProcessThisPartition) and single maintenancePartitions set with tryClaimPartition/tryClaimAndSubmit. - Have the query thread queue providers for close instead of submitting directly. - Add an `unloaded` flag (setUnloaded) so maintenance skips providers being torn down; split removeFromLoadedProvidersAndClose into closeProvider + remove-by-key. Ported from databricks-eng/runtime#201011 (SC-225784). Co-authored-by: Isaac
…tate store maintenance Builds on the decoupled maintenance scheduler: - Add a per-provider fair ReentrantReadWriteLock (maintenanceLock). Maintenance ops hold the read lock; close holds the write lock, so close waits for in-flight maintenance and a maintenance op skips (tryLock with zero timeout) when a close is in progress. - Split the single maintenance thread pool into separate snapshot (high-priority) and cleanup (low-priority) pools, sized via snapshotToCleanupThreadRatio (getPoolSizes), so one operation type cannot starve the other. numStateStoreMaintenanceThreads is now a minimum of 2. - Add MaintenanceTask.triggerNow (at-most-one pending, processUnloadedOnly) and have the query thread trigger an immediate scheduler cycle when it queues a provider for unload, instead of waiting for the next periodic tick. Move the decoupled-maintenance tests out of StateStoreSuite into a new StateStoreDecoupledMaintenanceSuite and add tests for the RW lock, dual pools, and triggerNow. Ported from databricks-eng/runtime#201749 (SC-225785). Co-authored-by: Isaac
… post-op behavior The FromTaskThread maintenance source is no longer needed now that the query thread queues providers instead of submitting directly. Replace it with an explicit nextOp on the submitted task: - Remove MaintenanceTaskType.FromTaskThread. - Add nextOp: Option[MaintenanceOpRequest] to submitMaintenanceWorkForProvider/tryClaimAndSubmit (only meaningful for FromUnloadedProvidersQueue). For an All request, the first op is submitted as FromUnloadedProvidersQueue with nextOp = Some(otherOp); on completion the pool thread enqueues the remaining op with nextOp = None, which closes the provider after it runs. Ported from databricks-eng/runtime#209771 (SC-226849). Co-authored-by: Isaac
…ource design Match the comment wording/wrapping and the StateStoreConf docstrings to the original design, and factor the duplicated otherMaintenanceOpRequest(opType) call in the FromLoadedProviders unload path into a local `remaining` val. No behavior change. Co-authored-by: Isaac
uros-b
reviewed
Jun 26, 2026
| val STATE_STORE_MAINTENANCE_SNAPSHOT_THREAD_RATIO = | ||
| buildConf("spark.sql.streaming.stateStore.snapshotToCleanupThreadRatio") | ||
| .internal() | ||
| .doc("Ratio of total maintenance threads allocated to the snapshot " + |
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
What changes were proposed in this pull request?
This redesigns state store maintenance so that snapshotting and cleanup run as independent operations that cannot starve each other, and so that closing a provider cannot race in-flight maintenance.
Maintenance is split into two operations on
StateStoreProvider—doSnapshotMaintenance()anddoCleanupMaintenance()(withdoMaintenance()retained as the combination of the two) — implemented for the HDFS and RocksDB providers. The maintenance scheduler submits these as separate tasks per provider, each tracked by its own partition set (snapshotPartitions/cleanupPartitions) so that concurrency is bounded per provider per operation type. The two operation types run on separate thread pools — a high-priority snapshot pool and a low-priority cleanup pool — sized from the total maintenance thread count by a newsnapshotToCleanupThreadRatioconfig, so a saturated pool of one type cannot block the other.Each provider carries a fair
ReentrantReadWriteLock. Maintenance operations hold the read lock and close holds the write lock, so close waits for any in-flight maintenance to finish, and a maintenance task skips itself (without blocking) when a close is already in progress.Providers that need to be unloaded are placed on a queue (
unloadedProvidersToClose). Each queue entry is tagged with the maintenance still required before close, and the query thread triggers an immediate scheduler cycle (MaintenanceTask.triggerNow) when it queues a provider, so unload happens promptly instead of waiting for the next periodic maintenance tick.Why are the changes needed?
The previous maintenance model ran snapshotting and cleanup as a single operation on a shared thread pool, so a slow or blocked operation of one kind could starve the other, and close could race with in-flight maintenance on the same provider. Decoupling the two operations, bounding concurrency per provider, isolating them on separate pools, and guarding close with a read/write lock addresses these issues.
Does this PR introduce any user-facing change?
Yes, two internal configurations change:
spark.sql.streaming.stateStore.snapshotToCleanupThreadRatio(default0.5): the fraction of maintenance threads allocated to the snapshot pool; the remainder go to the cleanup pool.spark.sql.streaming.stateStore.numStateStoreMaintenanceThreadsis now the total split across both pools, with a minimum of 2 (previously 1); its default changes frommax(cores / 4, 1)tomax(cores / 4, 2).Both are internal configuration knobs; there is no change to query results.
How was this patch tested?
StateStoreDecoupledMaintenanceSuite(HDFS and RocksDB variants) covering the split operations, the decoupled scheduler, the RW lock, the dual pools,triggerNow, and the full queue-to-close lifecycle.StateStoreSuite,StateStoreInstanceMetricSuite, andRocksDBSuite.sql/corecompiles and the suites above pass locally.Was this patch authored or co-authored using generative AI tooling?
Generated-by: Claude Code (Opus 4.8)
This pull request and its description were written by Isaac.