Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -366,6 +366,7 @@ public enum LogKeys implements LogKey {
LOG_TYPE,
LOSSES,
LOWER_BOUND,
MAINTENANCE_TASK_TYPE,
MALFORMATTED_STRING,
MAP_ID,
MASTER_URL,
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2900,13 +2900,27 @@ object SQLConf {
val NUM_STATE_STORE_MAINTENANCE_THREADS =
buildConf("spark.sql.streaming.stateStore.numStateStoreMaintenanceThreads")
.internal()
.doc("Number of threads in the thread pool that perform clean up and snapshotting tasks " +
"for stateful streaming queries. The default value is the number of cores * 0.25 " +
"so that this thread pool doesn't take too many resources " +
"away from the query and affect performance.")
.doc("Total number of threads split between the snapshot and cleanup " +
"maintenance pools for stateful streaming queries. Each pool needs at least " +
"1 thread, so the minimum is 2. The default value is the number of " +
"cores * 0.25 so that the pools don't take too many resources away from the " +
"query and affect performance. Use snapshotToCleanupThreadRatio to " +
"configure the split between snapshot and cleanup pools.")
.intConf
.checkValue(_ > 0, "Must be greater than 0")
.createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 4, 1))
.checkValue(_ > 1, "Must be greater than 1")
.createWithDefault(Math.max(Runtime.getRuntime.availableProcessors() / 4, 2))

val STATE_STORE_MAINTENANCE_SNAPSHOT_THREAD_RATIO =
buildConf("spark.sql.streaming.stateStore.snapshotToCleanupThreadRatio")
.internal()
.doc("Ratio of total maintenance threads allocated to the snapshot " +

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Please add config .version

"pool. The remainder goes to the cleanup pool. The snapshot " +
"count is rounded to the nearest integer and clamped so each " +
"pool gets at least 1 thread and the total is never exceeded.")
.withBindingPolicy(ConfigBindingPolicy.NOT_APPLICABLE)
.doubleConf
.checkValue(v => v > 0 && v < 1, "Must be between 0 and 1 (exclusive)")
.createWithDefault(0.5)

val STATE_STORE_MAINTENANCE_SHUTDOWN_TIMEOUT =
buildConf("spark.sql.streaming.stateStore.maintenanceShutdownTimeout")
Expand Down Expand Up @@ -7788,6 +7802,9 @@ class SQLConf extends Serializable with Logging with SqlApiConf {

def numStateStoreMaintenanceThreads: Int = getConf(NUM_STATE_STORE_MAINTENANCE_THREADS)

def snapshotToCleanupThreadRatio: Double =
getConf(STATE_STORE_MAINTENANCE_SNAPSHOT_THREAD_RATIO)

def numStateStoreInstanceMetricsToReport: Int =
getConf(STATE_STORE_INSTANCE_METRICS_REPORT_LIMIT)

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -478,12 +478,27 @@ private[sql] class HDFSBackedStateStoreProvider extends StateStoreProvider with

/** Do maintenance backing data files, including creating snapshots and cleaning up old files */
override def doMaintenance(): Unit = {
doSnapshotMaintenance()
doCleanupMaintenance()
}

/** Run only the snapshot upload portion of maintenance. */
override def doSnapshotMaintenance(): Unit = {
try {
doSnapshot("maintenance")
} catch {
case NonFatal(e) =>
logWarning(log"Error performing snapshot maintenance", e)
}
}

/** Run only the cleanup portion of maintenance. */
override def doCleanupMaintenance(): Unit = {
try {
cleanup()
} catch {
case NonFatal(e) =>
logWarning(log"Error performing snapshot and cleaning up")
logWarning(log"Error performing cleanup maintenance", e)
}
}

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2045,7 +2045,8 @@ class RocksDB(
logInfo(log"Rolled back to ${MDC(LogKeys.VERSION_NUM, loadedVersion)}")
}

def doMaintenance(): Unit = {
/** Run only the snapshot upload portion of maintenance. */
def doSnapshotMaintenance(): Unit = {
if (enableChangelogCheckpointing) {

var mostRecentSnapshot: Option[RocksDBSnapshot] = None
Expand Down Expand Up @@ -2076,6 +2077,10 @@ class RocksDB(
uploadSnapshot(snapshotToUpload)
}
}
}

/** Run only the cleanup portion of maintenance. */
def doCleanupMaintenance(): Unit = {
val cleanupTime = timeTakenMs {
fileManager.deleteOldVersions(
numVersionsToRetain = conf.minVersionsToRetain,
Expand All @@ -2085,6 +2090,11 @@ class RocksDB(
logInfo(log"Cleaned old data, time taken: ${MDC(LogKeys.TIME_UNITS, cleanupTime)} ms")
}

def doMaintenance(): Unit = {
doSnapshotMaintenance()
doCleanupMaintenance()
}

/**
* This replaces stale reused files in the snapshot with new ones to be uploaded.
* Stale means they are potential candidates for deletion by another
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1047,15 +1047,32 @@ private[sql] class RocksDBStateStoreProvider
}

override def doMaintenance(): Unit = {
doSnapshotMaintenance()
doCleanupMaintenance()
}

/** Run only the snapshot upload portion of maintenance. */
override def doSnapshotMaintenance(): Unit = {
doMaintenanceOp(rocksDB.doSnapshotMaintenance(), "snapshot maintenance")
}

/** Run only the cleanup portion of maintenance. */
override def doCleanupMaintenance(): Unit = {
doMaintenanceOp(rocksDB.doCleanupMaintenance(), "cleanup maintenance")
}

/**
* Common wrapper for maintenance operations: verifies the state machine and swallows non-fatal
* exceptions (SPARK-46547) to avoid deadlock between the maintenance thread and the streaming
* aggregation operator.
*/
private def doMaintenanceOp(op: => Unit, opName: String): Unit = {
stateMachine.verifyForMaintenance()
try {
rocksDB.doMaintenance()
op
} catch {
// SPARK-46547 - Swallow non-fatal exception in maintenance task to avoid deadlock between
// maintenance thread and streaming aggregation operator
case NonFatal(ex) =>
logWarning(s"Ignoring error while performing maintenance operations with exception=",
ex)
logWarning(s"Ignoring error while performing $opName with exception=", ex)
}
}

Expand Down
Loading