mem: Cleanup resources of done streams immediately#22064
mem: Cleanup resources of done streams immediately#22064EmilyMatt wants to merge 6 commits intoapache:mainfrom
Conversation
| // No data at all — go directly to Done | ||
| self.left_exhausted = true; |
There was a problem hiding this comment.
you forgot to update this as well
There was a problem hiding this comment.
No bc above it can be converted back to false.
I only placed it there bc if at the point I placed it it is set to true, it means the input is actually depleted.
| // If the left stream is fully exhausted, release its resources so the | ||
| // upstream pipeline can be torn down before we move on to probing. | ||
| if self.left_exhausted { | ||
| active.left_stream = None; | ||
| } |
There was a problem hiding this comment.
maybe there are cases that left exhausted is true and left stream is not None, because what is the reason to have both exhausted flag and Option
| // Release the input pipeline's resources. | ||
| this.inner = | ||
| Box::pin(EmptyRecordBatchStream::new(Arc::clone(&this.schema))); |
There was a problem hiding this comment.
I think to avoid bugs the schema for inner should be kept the same and not use this.schema (even when they are the same schema) because you only want to release the stream
There was a problem hiding this comment.
Yeah fair point, applied
Applied |
| // Release the right input pipeline's resources. | ||
| let right_schema = self.right_stream.schema(); | ||
| self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); |
There was a problem hiding this comment.
nit:
extract as a helper function
|
run benchmarks |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: tpcds File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: clickbench_partitioned File an issue against this benchmark runner |
|
🤖 Benchmark running (GKE) | trigger CPU Details (lscpu)Comparing resource-cleanup (e2e4372) to 3b634aa (merge-base) diff using: tpch File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpch — base (merge-base)
tpch — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usagetpcds — base (merge-base)
tpcds — branch
File an issue against this benchmark runner |
|
🤖 Benchmark completed (GKE) | trigger Instance: CPU Details (lscpu)Details
Resource Usageclickbench_partitioned — base (merge-base)
clickbench_partitioned — branch
File an issue against this benchmark runner |
Which issue does this PR close?
Rationale for this change
Reduces memory pressure, cleans up resources eagerly, and makes pools aware that operators are done by dropping their MemoryReservation and MemoryConsumers.
What changes are included in this PR?
Whenever a stream is polled and returns None(is depleted), drops that stream, or replaces it with EmptyRecordBatchStream.
Are these changes tested?
This should have no effect on logic, as the streams are already depleted.
Are there any user-facing changes?
No, users implementing their own memory pool can expect to see the consumer count drop whenever a stream is released, but that is well within parameters and I don't think is considered a change at all.