From f2ce0f36b9a56a3346481f35773fbc96297a6bbf Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 13:57:07 +0300 Subject: [PATCH 1/6] mem: Cleanup resources of done streams immediately --- .../physical-plan/src/aggregates/no_grouping.rs | 4 ++++ .../physical-plan/src/aggregates/row_hash.rs | 10 ++++++++++ .../physical-plan/src/aggregates/topk_stream.rs | 4 ++++ datafusion/physical-plan/src/async_func.rs | 6 +++++- datafusion/physical-plan/src/coalesce_batches.rs | 3 +++ datafusion/physical-plan/src/joins/cross_join.rs | 8 +++++++- .../physical-plan/src/joins/hash_join/stream.rs | 6 ++++++ .../physical-plan/src/joins/nested_loop_join.rs | 6 ++++++ .../joins/piecewise_merge_join/classic_join.rs | 4 ++++ .../src/joins/sort_merge_join/bitwise_stream.rs | 16 +++++++++++++--- .../sort_merge_join/materializing_stream.rs | 14 ++++++++++++++ .../src/joins/symmetric_hash_join.rs | 13 +++++++++++++ .../physical-plan/src/sorts/partial_sort.rs | 7 +++++++ datafusion/physical-plan/src/sorts/sort.rs | 2 ++ .../src/spill/replayable_spill_input.rs | 3 +++ datafusion/physical-plan/src/unnest.rs | 6 ++++++ .../src/windows/bounded_window_agg_exec.rs | 5 +++++ 17 files changed, 112 insertions(+), 5 deletions(-) diff --git a/datafusion/physical-plan/src/aggregates/no_grouping.rs b/datafusion/physical-plan/src/aggregates/no_grouping.rs index a7dd7c9a66cb1..ac7727b459300 100644 --- a/datafusion/physical-plan/src/aggregates/no_grouping.rs +++ b/datafusion/physical-plan/src/aggregates/no_grouping.rs @@ -23,6 +23,7 @@ use crate::aggregates::{ finalize_aggregation, }; use crate::metrics::{BaselineMetrics, RecordOutput}; +use crate::stream::EmptyRecordBatchStream; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::datatypes::SchemaRef; use arrow::record_batch::RecordBatch; @@ -370,6 +371,9 @@ impl AggregateStream { Some(Err(e)) => Err(e), None => { this.finished = true; + // Release the input pipeline's resources before finalization. + let input_schema = this.input.schema(); + this.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let timer = this.baseline_metrics.elapsed_compute().timer(); let result = finalize_aggregation(&mut this.accumulators, &this.mode) diff --git a/datafusion/physical-plan/src/aggregates/row_hash.rs b/datafusion/physical-plan/src/aggregates/row_hash.rs index 056a7f171a516..30cbd61ce4873 100644 --- a/datafusion/physical-plan/src/aggregates/row_hash.rs +++ b/datafusion/physical-plan/src/aggregates/row_hash.rs @@ -32,6 +32,7 @@ use crate::aggregates::{ use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput}; use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder}; use crate::spill::spill_manager::{GetSlicedSize, SpillManager}; +use crate::stream::EmptyRecordBatchStream; use crate::{PhysicalExpr, aggregates, metrics}; use crate::{RecordBatchStream, SendableRecordBatchStream}; @@ -834,6 +835,10 @@ impl Stream for GroupedHashAggregateStream { self.group_values.len() ))); } + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); self.exec_state = ExecutionState::Done; } } @@ -1219,6 +1224,11 @@ impl GroupedHashAggregateStream { fn set_input_done_and_produce_output(&mut self) -> Result<()> { self.input_done = true; self.group_ordering.input_done(); + // Release the original input pipeline's resources now that we're done + // reading from it. In the spill branch below, `self.input` is replaced + // again with a stream that merges spill files. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let elapsed_compute = self.baseline_metrics.elapsed_compute().clone(); let timer = elapsed_compute.timer(); self.exec_state = if self.spill_state.spills.is_empty() { diff --git a/datafusion/physical-plan/src/aggregates/topk_stream.rs b/datafusion/physical-plan/src/aggregates/topk_stream.rs index 4aa566ccfcd0a..9128844f1d1ef 100644 --- a/datafusion/physical-plan/src/aggregates/topk_stream.rs +++ b/datafusion/physical-plan/src/aggregates/topk_stream.rs @@ -26,6 +26,7 @@ use crate::aggregates::{ evaluate_many, }; use crate::metrics::BaselineMetrics; +use crate::stream::EmptyRecordBatchStream; use crate::{RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{Array, ArrayRef, RecordBatch}; use arrow::datatypes::SchemaRef; @@ -205,6 +206,9 @@ impl Stream for GroupedTopKAggregateStream { } // inner is done, emit all rows and switch to producing output None => { + // Release the input pipeline's resources before emitting. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); if self.priority_map.is_empty() { trace!("partition {} emit None", self.partition); return Poll::Ready(None); diff --git a/datafusion/physical-plan/src/async_func.rs b/datafusion/physical-plan/src/async_func.rs index 76a68bf5708db..8ad4ecb096962 100644 --- a/datafusion/physical-plan/src/async_func.rs +++ b/datafusion/physical-plan/src/async_func.rs @@ -17,7 +17,7 @@ use crate::coalesce::LimitedBatchCoalescer; use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter}; use crate::{ DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties, check_if_same_properties, @@ -290,6 +290,10 @@ impl Stream for CoalesceInputStream { } None => { completed = true; + // Release the input pipeline's resources. + let input_schema = self.input_stream.schema(); + self.input_stream = + Box::pin(EmptyRecordBatchStream::new(input_schema)); if let Err(err) = self.batch_coalescer.finish() { return Poll::Ready(Some(Err(err))); } diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 2bf046f03b6cf..76389ca3af4cd 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics}; use crate::projection::ProjectionExec; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, check_if_same_properties, @@ -343,6 +344,7 @@ impl CoalesceBatchesStream { None => { // Input stream is exhausted, finalize any remaining batches self.completed = true; + self.input = Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); self.coalescer.finish()?; } Some(Ok(batch)) => { @@ -353,6 +355,7 @@ impl CoalesceBatchesStream { PushBatchStatus::LimitReached => { // limit was reached, so stop early self.completed = true; + self.input = Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); self.coalescer.finish()?; } } diff --git a/datafusion/physical-plan/src/joins/cross_join.rs b/datafusion/physical-plan/src/joins/cross_join.rs index 3027fb130f087..ab66955dc6034 100644 --- a/datafusion/physical-plan/src/joins/cross_join.rs +++ b/datafusion/physical-plan/src/joins/cross_join.rs @@ -31,6 +31,7 @@ use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, @@ -645,7 +646,12 @@ impl CrossJoinStream { let right_data = match ready!(self.right.poll_next_unpin(cx)) { Some(Ok(right_data)) => right_data, Some(Err(e)) => return Poll::Ready(Err(e)), - None => return Poll::Ready(Ok(StatefulStreamResult::Ready(None))), + None => { + // Release the right (probe) input pipeline's resources. + let right_schema = self.right.schema(); + self.right = Box::pin(EmptyRecordBatchStream::new(right_schema)); + return Poll::Ready(Ok(StatefulStreamResult::Ready(None))); + } }; self.join_metrics.input_batches.add(1); self.join_metrics.input_rows.add(right_data.num_rows()); diff --git a/datafusion/physical-plan/src/joins/hash_join/stream.rs b/datafusion/physical-plan/src/joins/hash_join/stream.rs index 9885fb5c5c70a..040470c9be12b 100644 --- a/datafusion/physical-plan/src/joins/hash_join/stream.rs +++ b/datafusion/physical-plan/src/joins/hash_join/stream.rs @@ -35,6 +35,7 @@ use crate::joins::hash_join::shared_bounds::{ use crate::joins::utils::{ OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ RecordBatchStream, SendableRecordBatchStream, handle_state, hash_utils::create_hashes, @@ -587,6 +588,11 @@ impl HashJoinStream { ) -> Poll>>> { match ready!(self.right.poll_next_unpin(cx)) { None => { + // Release the probe-side input pipeline's resources. The schema + // is preserved so callers that still query `self.right.schema()` + // (e.g. for unmatched-build emission) keep working. + let right_schema = self.right.schema(); + self.right = Box::pin(EmptyRecordBatchStream::new(right_schema)); self.state = HashJoinStreamState::ExhaustedProbeSide; } Some(Ok(batch)) => { diff --git a/datafusion/physical-plan/src/joins/nested_loop_join.rs b/datafusion/physical-plan/src/joins/nested_loop_join.rs index db8c75b4a578b..feaf344200ac1 100644 --- a/datafusion/physical-plan/src/joins/nested_loop_join.rs +++ b/datafusion/physical-plan/src/joins/nested_loop_join.rs @@ -1611,6 +1611,12 @@ impl NestedLoopJoinStream { } } + // If the left stream is fully exhausted, release its resources so the + // upstream pipeline can be torn down before we move on to probing. + if self.left_exhausted { + active.left_stream = None; + } + if active.pending_batches.is_empty() { // No data at all — go directly to Done self.left_exhausted = true; diff --git a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs index da0d21f046daa..36a043cc7d16b 100644 --- a/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs +++ b/datafusion/physical-plan/src/joins/piecewise_merge_join/classic_join.rs @@ -39,6 +39,7 @@ use crate::joins::piecewise_merge_join::exec::{BufferedSide, BufferedSideReadySt use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final; use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult}; use crate::joins::utils::{JoinKeyComparator, get_final_indices_from_shared_bitmap}; +use crate::stream::EmptyRecordBatchStream; pub(super) enum PiecewiseMergeJoinStreamState { WaitBufferedSide, @@ -212,6 +213,9 @@ impl ClassicPWMJStream { ) -> Poll>>> { match ready!(self.streamed.poll_next_unpin(cx)) { None => { + // Release the streamed input pipeline's resources. + let streamed_schema = self.streamed.schema(); + self.streamed = Box::pin(EmptyRecordBatchStream::new(streamed_schema)); if self .buffered_side .try_as_ready_mut()? diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs index 3b409c98b2cf4..ad7312426bd18 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/bitwise_stream.rs @@ -125,12 +125,12 @@ use std::pin::Pin; use std::sync::Arc; use std::task::{Context, Poll}; -use crate::RecordBatchStream; use crate::joins::utils::{JoinFilter, JoinKeyComparator, compare_join_arrays}; use crate::metrics::{ BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder, }; use crate::spill::spill_manager::SpillManager; +use crate::{EmptyRecordBatchStream, RecordBatchStream}; use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch}; use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not}; use arrow::datatypes::SchemaRef; @@ -475,7 +475,12 @@ impl BitwiseSortMergeJoinStream { fn poll_next_outer_batch(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.outer.poll_next_unpin(cx)) { - None => return Poll::Ready(Ok(false)), + None => { + // Release the outer input pipeline's resources. + let outer_schema = self.outer.schema(); + self.outer = Box::pin(EmptyRecordBatchStream::new(outer_schema)); + return Poll::Ready(Ok(false)); + } Some(Err(e)) => return Poll::Ready(Err(e)), Some(Ok(batch)) => { let batch_num_rows = batch.num_rows(); @@ -503,7 +508,12 @@ impl BitwiseSortMergeJoinStream { fn poll_next_inner_batch(&mut self, cx: &mut Context<'_>) -> Poll> { loop { match ready!(self.inner.poll_next_unpin(cx)) { - None => return Poll::Ready(Ok(false)), + None => { + // Release the inner input pipeline's resources. + let inner_schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); + return Poll::Ready(Ok(false)); + } Some(Err(e)) => return Poll::Ready(Err(e)), Some(Ok(batch)) => { let batch_num_rows = batch.num_rows(); diff --git a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs index 4840b56f55fff..04d450a7013c8 100644 --- a/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs +++ b/datafusion/physical-plan/src/joins/sort_merge_join/materializing_stream.rs @@ -41,6 +41,7 @@ use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics; use crate::joins::utils::{JoinFilter, JoinKeyComparator}; use crate::metrics::RecordOutput; use crate::spill::spill_manager::SpillManager; +use crate::stream::EmptyRecordBatchStream; use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream}; use arrow::array::{types::UInt64Type, *}; @@ -919,6 +920,10 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the streamed input pipeline's resources. + let streamed_schema = self.streamed.schema(); + self.streamed = + Box::pin(EmptyRecordBatchStream::new(streamed_schema)); self.streamed_state = StreamedState::Exhausted; } Poll::Ready(Some(batch)) => { @@ -1033,6 +1038,10 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the buffered input pipeline's resources. + let buffered_schema = self.buffered.schema(); + self.buffered = + Box::pin(EmptyRecordBatchStream::new(buffered_schema)); self.buffered_state = BufferedState::Exhausted; return Poll::Ready(None); } @@ -1076,6 +1085,11 @@ impl MaterializingSortMergeJoinStream { return Poll::Pending; } Poll::Ready(None) => { + // Release the buffered input pipeline's resources. + let buffered_schema = self.buffered.schema(); + self.buffered = Box::pin(EmptyRecordBatchStream::new( + buffered_schema, + )); self.buffered_state = BufferedState::Ready; } Poll::Ready(Some(batch)) => { diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index 11e036434ee97..b1857ed00f2bc 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -50,6 +50,7 @@ use crate::projection::{ ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children, physical_to_column_exprs, update_join_filter, update_join_on, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, PlanProperties, RecordBatchStream, SendableRecordBatchStream, @@ -1429,6 +1430,9 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + // Release the right input pipeline's resources. + let right_schema = self.right_stream.schema(); + self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); self.set_state(SHJStreamState::RightExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1458,6 +1462,9 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + // Release the left input pipeline's resources. + let left_schema = self.left_stream.schema(); + self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); self.set_state(SHJStreamState::LeftExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1487,6 +1494,9 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + // Release the left input pipeline's resources. + let left_schema = self.left_stream.schema(); + self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); self.set_state(SHJStreamState::BothExhausted { final_result: false, }); @@ -1518,6 +1528,9 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { + // Release the right input pipeline's resources. + let right_schema = self.right_stream.schema(); + self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); self.set_state(SHJStreamState::BothExhausted { final_result: false, }); diff --git a/datafusion/physical-plan/src/sorts/partial_sort.rs b/datafusion/physical-plan/src/sorts/partial_sort.rs index 28b8745235918..abd9ebb142a66 100644 --- a/datafusion/physical-plan/src/sorts/partial_sort.rs +++ b/datafusion/physical-plan/src/sorts/partial_sort.rs @@ -58,6 +58,7 @@ use std::task::{Context, Poll}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; use crate::sorts::sort::sort_batch; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties, Partitioning, PlanProperties, SendableRecordBatchStream, Statistics, @@ -403,6 +404,9 @@ impl PartialSortStream { // Check if we've already reached the fetch limit if self.fetch == Some(0) { self.is_closed = true; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); return Poll::Ready(None); } @@ -436,6 +440,9 @@ impl PartialSortStream { Some(Err(e)) => return Poll::Ready(Some(Err(e))), None => { self.is_closed = true; + // Release the input pipeline's resources before sorting. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); // Once input is consumed, sort the rest of the inserted batches let remaining_batch = self.sort_in_mem_batch()?; return if remaining_batch.num_rows() > 0 { diff --git a/datafusion/physical-plan/src/sorts/sort.rs b/datafusion/physical-plan/src/sorts/sort.rs index 6c02af8dec6d3..044580a86f7f3 100644 --- a/datafusion/physical-plan/src/sorts/sort.rs +++ b/datafusion/physical-plan/src/sorts/sort.rs @@ -1240,6 +1240,7 @@ impl ExecutionPlan for SortExec { break; } } + drop(input); topk.emit() }) .try_flatten(), @@ -1264,6 +1265,7 @@ impl ExecutionPlan for SortExec { let batch = batch?; sorter.insert_batch(batch).await?; } + drop(input); sorter.sort().await }) .try_flatten(), diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index ddef15a639183..3c2e52a99647d 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -282,6 +282,9 @@ impl Stream for ReplayableSpillStream { } // The stream is exhausted, give the inner state ownership back to `ReplayableStreamSource` Poll::Ready(None) => { + // Release the input pipeline's resources. + this.inner = + Box::pin(EmptyRecordBatchStream::new(Arc::clone(&this.schema))); if let Some(spill_file) = this.spill_file.as_mut() { match spill_file.finish() { Ok(file) => { diff --git a/datafusion/physical-plan/src/unnest.rs b/datafusion/physical-plan/src/unnest.rs index c774ff09af33c..a01f0e5e306db 100644 --- a/datafusion/physical-plan/src/unnest.rs +++ b/datafusion/physical-plan/src/unnest.rs @@ -26,6 +26,7 @@ use super::metrics::{ MetricsSet, RecordOutput, }; use super::{DisplayAs, ExecutionPlanProperties, PlanProperties}; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, Distribution, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream, check_if_same_properties, @@ -392,6 +393,11 @@ impl UnnestStream { self.metrics.baseline_metrics.output_rows(), self.metrics.baseline_metrics.elapsed_compute(), ); + if other.is_none() { + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + } other } }); diff --git a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs index 14f8ce5e95ffd..f442bcea94be2 100644 --- a/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/bounded_window_agg_exec.rs @@ -28,6 +28,7 @@ use std::task::{Context, Poll}; use super::utils::create_schema; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::stream::EmptyRecordBatchStream; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -1083,6 +1084,10 @@ impl BoundedWindowAggStream { let _timer = elapsed_compute.timer(); self.finished = true; + // Release the input pipeline's resources before computing the + // final aggregates. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); for (_, partition_batch_state) in self.partition_buffers.iter_mut() { partition_batch_state.is_end = true; } From 33af744393638beee6f769f3d740facf3c1f2942 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 14:13:27 +0300 Subject: [PATCH 2/6] fmt --- datafusion/physical-plan/src/coalesce_batches.rs | 7 +++++-- 1 file changed, 5 insertions(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/coalesce_batches.rs b/datafusion/physical-plan/src/coalesce_batches.rs index 76389ca3af4cd..34cd770260915 100644 --- a/datafusion/physical-plan/src/coalesce_batches.rs +++ b/datafusion/physical-plan/src/coalesce_batches.rs @@ -344,7 +344,8 @@ impl CoalesceBatchesStream { None => { // Input stream is exhausted, finalize any remaining batches self.completed = true; - self.input = Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); + self.input = + Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); self.coalescer.finish()?; } Some(Ok(batch)) => { @@ -355,7 +356,9 @@ impl CoalesceBatchesStream { PushBatchStatus::LimitReached => { // limit was reached, so stop early self.completed = true; - self.input = Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema())); + self.input = Box::pin(EmptyRecordBatchStream::new( + self.coalescer.schema(), + )); self.coalescer.finish()?; } } From 5e610d60c989c8e4878a719dbe3847f56d15cee3 Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 15:03:01 +0300 Subject: [PATCH 3/6] add more areas --- datafusion/physical-plan/src/analyze.rs | 1 + datafusion/physical-plan/src/filter.rs | 8 +++ .../physical-plan/src/repartition/mod.rs | 8 ++- .../src/sorts/partitioned_topk.rs | 2 + datafusion/physical-plan/src/stream.rs | 53 +++++++++++++++++-- .../src/windows/window_agg_exec.rs | 5 ++ 6 files changed, 70 insertions(+), 7 deletions(-) diff --git a/datafusion/physical-plan/src/analyze.rs b/datafusion/physical-plan/src/analyze.rs index 491a0872a2f97..ea3abf439e4c1 100644 --- a/datafusion/physical-plan/src/analyze.rs +++ b/datafusion/physical-plan/src/analyze.rs @@ -214,6 +214,7 @@ impl ExecutionPlan for AnalyzeExec { while let Some(batch) = input_stream.next().await.transpose()? { total_rows += batch.num_rows(); } + drop(input_stream); let duration = Instant::now() - start; create_output_batch( diff --git a/datafusion/physical-plan/src/filter.rs b/datafusion/physical-plan/src/filter.rs index 1119d1b240788..c485e181f3826 100644 --- a/datafusion/physical-plan/src/filter.rs +++ b/datafusion/physical-plan/src/filter.rs @@ -42,6 +42,7 @@ use crate::projection::{ EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child, try_embed_projection, update_expr, }; +use crate::stream::EmptyRecordBatchStream; use crate::{ DisplayFormatType, ExecutionPlan, metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics}, @@ -1030,6 +1031,9 @@ impl Stream for FilterExecStream { match ready!(self.input.poll_next_unpin(cx)) { None => { self.batch_coalescer.finish()?; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); // continue draining the coalescer } Some(Ok(batch)) => { @@ -1070,6 +1074,10 @@ impl Stream for FilterExecStream { PushBatchStatus::LimitReached => { // limit was reached, so stop early self.batch_coalescer.finish()?; + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = + Box::pin(EmptyRecordBatchStream::new(input_schema)); // continue draining the coalescer } } diff --git a/datafusion/physical-plan/src/repartition/mod.rs b/datafusion/physical-plan/src/repartition/mod.rs index b4af6e2c09a5c..873ba8a5ee487 100644 --- a/datafusion/physical-plan/src/repartition/mod.rs +++ b/datafusion/physical-plan/src/repartition/mod.rs @@ -38,7 +38,7 @@ use crate::projection::{ProjectionExec, all_columns, make_with_child, update_exp use crate::sorts::streaming_merge::StreamingMergeBuilder; use crate::spill::spill_manager::SpillManager; use crate::spill::spill_pool::{self, SpillPoolWriter}; -use crate::stream::RecordBatchStreamAdapter; +use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter}; use crate::{ DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics, check_if_same_properties, @@ -1757,7 +1757,11 @@ impl PerPartitionStream { return Poll::Ready(Some(Err(e))); } Poll::Ready(None) => { - // Spill stream ended, keep draining the memory channel + // Spill stream ended — release its resources before + // we go back to draining the memory channel. + let spill_schema = self.spill_stream.schema(); + self.spill_stream = + Box::pin(EmptyRecordBatchStream::new(spill_schema)); self.state = StreamState::ReadingMemory; } Poll::Pending => { diff --git a/datafusion/physical-plan/src/sorts/partitioned_topk.rs b/datafusion/physical-plan/src/sorts/partitioned_topk.rs index f5d47f503bf9b..f4c2585ea790d 100644 --- a/datafusion/physical-plan/src/sorts/partitioned_topk.rs +++ b/datafusion/physical-plan/src/sorts/partitioned_topk.rs @@ -495,6 +495,8 @@ async fn do_partitioned_topk( topk.insert_batch(sub_batch)?; } } + // Release the input pipeline now that accumulation is complete. + drop(input); // ---------- Emit phase ---------- // Sort partition keys so output is ordered by (partition_keys, order_keys). diff --git a/datafusion/physical-plan/src/stream.rs b/datafusion/physical-plan/src/stream.rs index 9139a6dd04799..9d0b964886afd 100644 --- a/datafusion/physical-plan/src/stream.rs +++ b/datafusion/physical-plan/src/stream.rs @@ -411,8 +411,11 @@ pin_project! { pub struct RecordBatchStreamAdapter { schema: SchemaRef, + // Wrapped in Option so we can drop the inner stream as soon as it + // returns `None`, releasing any upstream pipeline resources before the + // adapter itself is dropped. #[pin] - stream: S, + stream: Option, } } @@ -441,7 +444,10 @@ impl RecordBatchStreamAdapter { /// // ... /// ``` pub fn new(schema: SchemaRef, stream: S) -> Self { - Self { schema, stream } + Self { + schema, + stream: Some(stream), + } } } @@ -460,11 +466,29 @@ where type Item = Result; fn poll_next(self: Pin<&mut Self>, cx: &mut Context<'_>) -> Poll> { - self.project().stream.poll_next(cx) + let mut this = self.project(); + let Some(inner) = this.stream.as_mut().as_pin_mut() else { + return Poll::Ready(None); + }; + let item = ready!(inner.poll_next(cx)); + if item.is_none() { + // Drop the inner stream in place to release its resources. + // SAFETY: the inner stream is dropped without moving it out of + // its pinned location; assigning `None` only runs the inner + // value's destructor in place, which is permitted for pinned + // values. + unsafe { + *this.stream.as_mut().get_unchecked_mut() = None; + } + } + Poll::Ready(item) } fn size_hint(&self) -> (usize, Option) { - self.stream.size_hint() + match self.stream.as_ref() { + Some(stream) => stream.size_hint(), + None => (0, Some(0)), + } } } @@ -538,6 +562,7 @@ impl ObservedStream { let Some(fetch) = self.fetch else { return poll }; if self.produced >= fetch { + self.release_inner(); return Poll::Ready(None); } @@ -545,12 +570,22 @@ impl ObservedStream { if self.produced + batch.num_rows() > fetch { let batch = batch.slice(0, fetch.saturating_sub(self.produced)); self.produced += batch.num_rows(); + if self.produced >= fetch { + self.release_inner(); + } return Poll::Ready(Some(Ok(batch))); }; self.produced += batch.num_rows() } poll } + + /// Replace the inner stream with an [`EmptyRecordBatchStream`], dropping + /// the original stream so its upstream pipeline can be torn down. + fn release_inner(&mut self) { + let schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(schema)); + } } impl RecordBatchStream for ObservedStream { @@ -678,7 +713,12 @@ impl BatchSplitStream { } } Some(Err(e)) => Poll::Ready(Some(Err(e))), - None => Poll::Ready(None), + None => { + // Release the input pipeline's resources. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); + Poll::Ready(None) + } } } } @@ -751,6 +791,9 @@ impl Stream for ReservationStream { None => { // Stream is done so free the reservation completely self.reservation.free(); + // Release the input pipeline's resources. + let inner_schema = self.inner.schema(); + self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); Poll::Ready(None) } } diff --git a/datafusion/physical-plan/src/windows/window_agg_exec.rs b/datafusion/physical-plan/src/windows/window_agg_exec.rs index 5098c84034062..9e8fc8a6ebb62 100644 --- a/datafusion/physical-plan/src/windows/window_agg_exec.rs +++ b/datafusion/physical-plan/src/windows/window_agg_exec.rs @@ -24,6 +24,7 @@ use std::task::{Context, Poll}; use super::utils::create_schema; use crate::execution_plan::{CardinalityEffect, EmissionType}; use crate::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet}; +use crate::stream::EmptyRecordBatchStream; use crate::windows::{ calc_requirements, get_ordered_partition_by_indices, get_partition_by_sort_exprs, window_equivalence_properties, @@ -444,6 +445,10 @@ impl WindowAggStream { } Some(Err(e)) => Err(e), None => { + // Release the input pipeline's resources before computing + // the final aggregates. + let input_schema = self.input.schema(); + self.input = Box::pin(EmptyRecordBatchStream::new(input_schema)); let Some(result) = self.compute_aggregates()? else { return Poll::Ready(None); }; From 761cf3360d459739f009b8c58005acf86f08f91e Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 15:04:09 +0300 Subject: [PATCH 4/6] address CR --- datafusion/physical-plan/src/spill/replayable_spill_input.rs | 3 ++- 1 file changed, 2 insertions(+), 1 deletion(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 3c2e52a99647d..97f4420f8c77e 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -283,8 +283,9 @@ impl Stream for ReplayableSpillStream { // The stream is exhausted, give the inner state ownership back to `ReplayableStreamSource` Poll::Ready(None) => { // Release the input pipeline's resources. + let inner_schema = this.inner.schema(); this.inner = - Box::pin(EmptyRecordBatchStream::new(Arc::clone(&this.schema))); + Box::pin(EmptyRecordBatchStream::new(inner_schema)); if let Some(spill_file) = this.spill_file.as_mut() { match spill_file.finish() { Ok(file) => { From efbe48c85e0b875da63bf4cb92808fc7997addeb Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 15:04:29 +0300 Subject: [PATCH 5/6] fmt --- datafusion/physical-plan/src/spill/replayable_spill_input.rs | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/datafusion/physical-plan/src/spill/replayable_spill_input.rs b/datafusion/physical-plan/src/spill/replayable_spill_input.rs index 97f4420f8c77e..fea998d268c59 100644 --- a/datafusion/physical-plan/src/spill/replayable_spill_input.rs +++ b/datafusion/physical-plan/src/spill/replayable_spill_input.rs @@ -284,8 +284,7 @@ impl Stream for ReplayableSpillStream { Poll::Ready(None) => { // Release the input pipeline's resources. let inner_schema = this.inner.schema(); - this.inner = - Box::pin(EmptyRecordBatchStream::new(inner_schema)); + this.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema)); if let Some(spill_file) = this.spill_file.as_mut() { match spill_file.finish() { Ok(file) => { From e2e437211a96ba419d79f09a7558f7e82cac538a Mon Sep 17 00:00:00 2001 From: Emily Matheys Date: Thu, 7 May 2026 16:25:26 +0300 Subject: [PATCH 6/6] CR: extract to helper function --- .../src/joins/symmetric_hash_join.rs | 29 +++++++++++-------- 1 file changed, 17 insertions(+), 12 deletions(-) diff --git a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs index b1857ed00f2bc..34af88ea4027b 100644 --- a/datafusion/physical-plan/src/joins/symmetric_hash_join.rs +++ b/datafusion/physical-plan/src/joins/symmetric_hash_join.rs @@ -1407,6 +1407,19 @@ impl SymmetricHashJoinStream { } } } + + /// Release the right input pipeline's resources. + fn cleanup_depleted_right_stream(&mut self) { + let right_schema = self.right_stream.schema(); + self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); + } + + /// Release the left input pipeline's resources. + fn cleanup_depleted_left_stream(&mut self) { + let left_schema = self.left_stream.schema(); + self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); + } + /// Asynchronously pulls the next batch from the right stream. /// /// This default implementation checks for the next value in the right stream. @@ -1430,9 +1443,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { - // Release the right input pipeline's resources. - let right_schema = self.right_stream.schema(); - self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); + self.cleanup_depleted_right_stream(); self.set_state(SHJStreamState::RightExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1462,9 +1473,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { - // Release the left input pipeline's resources. - let left_schema = self.left_stream.schema(); - self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); + self.cleanup_depleted_left_stream(); self.set_state(SHJStreamState::LeftExhausted); Poll::Ready(Ok(StatefulStreamResult::Continue)) } @@ -1494,9 +1503,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { - // Release the left input pipeline's resources. - let left_schema = self.left_stream.schema(); - self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema)); + self.cleanup_depleted_left_stream(); self.set_state(SHJStreamState::BothExhausted { final_result: false, }); @@ -1528,9 +1535,7 @@ impl SymmetricHashJoinStream { } Some(Err(e)) => Poll::Ready(Err(e)), None => { - // Release the right input pipeline's resources. - let right_schema = self.right_stream.schema(); - self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema)); + self.cleanup_depleted_right_stream(); self.set_state(SHJStreamState::BothExhausted { final_result: false, });