Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import org.apache.hadoop.conf.Configuration
import org.apache.hadoop.fs.{FileSystem, Path}
import org.json4s.jackson.JsonMethods._
import org.scalatest.BeforeAndAfter
import org.scalatest.concurrent.Eventually._
import org.scalatest.time.SpanSugar._

import org.apache.spark._
Expand Down Expand Up @@ -1119,8 +1120,19 @@ class SQLAppStatusListenerMemoryLeakSuite extends SparkFunSuite {
val statusStore = spark.sharedState.statusStore
assert(statusStore.executionsCount() <= 50)
assert(statusStore.planGraphCount() <= 50)
// No live data should be left behind after all executions end.
assert(statusStore.listener.get.noLiveData())
// No live data should be left behind after all executions end. A SQL execution's live
// entries are removed only once its end-event count reaches jobs.size + 1 (the
// SparkListenerJobEnd(s) plus SparkListenerSQLExecutionEnd). For a failed job the
// DAGScheduler notifies the job waiter -- unblocking the failing action on this thread --
// *before* it posts SparkListenerJobEnd to the listener bus (see
// DAGScheduler.failJobAndIndependentStages). That trailing JobEnd can therefore still be in
// flight when this thread calls waitUntilEmpty() above; if it is enqueued just after the bus
// is drained, the failed execution never reaches the cleanup threshold and lingers in
// liveExecutions. Poll with eventually() so the trailing end event is delivered and the live
// entries drain, rather than asserting once immediately.
eventually(timeout(5.seconds), interval(10.milliseconds)) {
assert(statusStore.listener.get.noLiveData())
}
}
}
}
Expand Down