Skip to content

[SPARK-57711][SQL][TEST] Deflake SQLAppStatusListenerMemoryLeakSuite "no memory leak"#56790

Open
HyukjinKwon wants to merge 2 commits into
apache:masterfrom
HyukjinKwon:ci-fix/agent5-sqlappstatus-leak
Open

[SPARK-57711][SQL][TEST] Deflake SQLAppStatusListenerMemoryLeakSuite "no memory leak"#56790
HyukjinKwon wants to merge 2 commits into
apache:masterfrom
HyukjinKwon:ci-fix/agent5-sqlappstatus-leak

Conversation

@HyukjinKwon

@HyukjinKwon HyukjinKwon commented Jun 25, 2026

Copy link
Copy Markdown
Member

What changes were proposed in this pull request?

Wrap the final noLiveData() assertion in SQLAppStatusListenerMemoryLeakSuite
("no memory leak") in an eventually(...) block so it polls for the trailing
cleanup instead of asserting once immediately.

Why are the changes needed?

The test is flaky on the SBT sql - other tests job, failing with
noLiveData() was false:

assert(statusStore.listener.get.noLiveData())

SQLAppStatusListener removes an execution's entries from liveExecutions /
stageMetrics only once its end-event count reaches jobs.size + 1, i.e. once
all of its SparkListenerJobEnd events and its SparkListenerSQLExecutionEnd
have been processed.

The test runs 100 failing jobs (df.foreach { throw ... }). For a failed job,
DAGScheduler.failJobAndIndependentStages notifies the job waiter -- which
unblocks the failing action on the test thread -- before it posts
SparkListenerJobEnd to the listener bus:

cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)                                  // unblocks the action
listenerBus.post(SparkListenerJobEnd(job.jobId, ..., JobFailed(error)))  // posted afterwards

So the trailing JobEnd for the last failed execution can still be in flight on
the DAGScheduler event-loop thread when the test thread races ahead, exits the
loop, and calls sc.listenerBus.waitUntilEmpty(). If that JobEnd is enqueued
just after the bus is drained, the failed execution never reaches the cleanup
threshold and lingers in liveExecutions, so asserting noLiveData()
immediately intermittently observes leftover live data.

Note this is unrelated to the kvstore.doAsync metrics aggregation in
onExecutionEnd: the test sets ASYNC_TRACKING_ENABLED=false, so that callback
runs inline on the listener thread (sameThreadExecutorService) and is already
covered by waitUntilEmpty(). The residual race is purely in the delivery of
the end events.

Polling with eventually lets the trailing event get delivered and the live
entries drain without weakening the assertion. The timeout is kept modest
(5.seconds) since the trailing event lands within milliseconds in practice.

Does this PR introduce any user-facing change?

No, test only.

How was this patch tested?

Existing test, run repeatedly. This reproduces under SBT, so the standard fork
"Build" (SBT) exercises it.

CI evidence

Was this patch authored or co-authored using generative AI tooling?

Yes, drafted with assistance from Isaac.

… "no memory leak"

### What changes were proposed in this pull request?
Wrap the final `noLiveData()` assertion in `SQLAppStatusListenerMemoryLeakSuite`
in an `eventually(...)` block.

### Why are the changes needed?
The test fails intermittently on the SBT `sql - other tests` job (e.g. master
runs 28000645341, 28004073554) with `noLiveData() was false`. The cleanup that
removes entries from `liveExecutions`/`stageMetrics` is finalized by the metrics
aggregation triggered on `SparkListenerSQLExecutionEnd`; asserting immediately
after `waitUntilEmpty()` is racy. Polling with `eventually` removes the race
without weakening the assertion.

### Does this PR introduce any user-facing change?
No, test only.

### How was this patch tested?
Existing test, repeated.

Co-authored-by: Isaac
@HyukjinKwon HyukjinKwon changed the title [DO-NOT-MERGE][SQL][TEST] Deflake SQLAppStatusListenerMemoryLeakSuite "no memory leak" [SPARK-57711][SQL][TEST] Deflake SQLAppStatusListenerMemoryLeakSuite "no memory leak" Jun 26, 2026
@HyukjinKwon HyukjinKwon marked this pull request as ready for review June 26, 2026 05:57
assert(statusStore.planGraphCount() <= 50)
// No live data should be left behind after all executions end.
assert(statusStore.listener.get.noLiveData())
// No live data should be left behind after all executions end. The cleanup of live

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The added comment / PR body pin the flake on the kvstore.doAsync cleanup completing after the bus drains, but the test sets ASYNC_TRACKING_ENABLED=false, which makes doAsync run synchronously on the listener thread (sameThreadExecutor); so waitUntilEmpty() should already cover that path. The stated root cause likely doesn't hold under this config; the real residual (e.g. stageMetrics cleanup) isn't identified. The fix is still safe, but please clarify the actual race (or confirm the CI env doesn't apply the config) so this isn't masking an undiagnosed condition. Tighten the inline comment accordingly.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Good catch, you're right. With ASYNC_TRACKING_ENABLED=false the kvstore.doAsync block in onExecutionEnd runs inline (ElementTrackingStore uses sameThreadExecutorService), so it's already covered by waitUntilEmpty() and isn't the cause.

The actual race is in end-event delivery. The test runs failing jobs (df.foreach { throw ... }), and for a failed job DAGScheduler.failJobAndIndependentStages notifies the job waiter -- unblocking the failing action on the test thread -- before it posts SparkListenerJobEnd:

cleanupStateForJobAndIndependentStages(job)
job.listener.jobFailed(error)                                          // unblocks the action
listenerBus.post(SparkListenerJobEnd(job.jobId, ..., JobFailed(error))) // posted afterwards

So the test thread can race ahead, exit the loop, and call waitUntilEmpty() before the trailing JobEnd for the last failed execution is enqueued. If it lands just after the bus drains, that execution never reaches the jobs.size + 1 cleanup threshold and lingers in liveExecutions, so noLiveData() is intermittently false. I've rewritten the inline comment and PR description accordingly.

// executions/stage metrics is finalized when the metrics aggregation triggered by the
// SQLExecutionEnd event completes, so wait for the listener to drain rather than asserting
// immediately to avoid a timing race.
eventually(timeout(10.seconds), interval(10.milliseconds)) {

Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Note: stacking a 10s eventually on top of the existing 10s waitUntilEmpty() doubles worst-case hang to ~20s on a genuinely broken system.

Copy link
Copy Markdown
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Reduced the eventually timeout to 5.seconds. The trailing end event lands within milliseconds in practice, so the worst-case additive hang is now ~15s rather than ~20s.

…meout

Address review feedback: the doAsync metrics-aggregation explanation was wrong
under ASYNC_TRACKING_ENABLED=false (it runs inline). The real race is a trailing
SparkListenerJobEnd for a failed job posted after the job waiter is notified, so
it can arrive after waitUntilEmpty() drains the bus. Reduce the eventually
timeout from 10s to 5s to avoid doubling the worst-case hang.

Co-authored-by: Isaac
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

3 participants