Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/aggregates/no_grouping.rs
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@ use crate::aggregates::{
finalize_aggregation,
};
use crate::metrics::{BaselineMetrics, RecordOutput};
use crate::stream::EmptyRecordBatchStream;
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::datatypes::SchemaRef;
use arrow::record_batch::RecordBatch;
Expand Down Expand Up @@ -370,6 +371,9 @@ impl AggregateStream {
Some(Err(e)) => Err(e),
None => {
this.finished = true;
// Release the input pipeline's resources before finalization.
let input_schema = this.input.schema();
this.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
let timer = this.baseline_metrics.elapsed_compute().timer();
let result =
finalize_aggregation(&mut this.accumulators, &this.mode)
Expand Down
10 changes: 10 additions & 0 deletions datafusion/physical-plan/src/aggregates/row_hash.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ use crate::aggregates::{
use crate::metrics::{BaselineMetrics, MetricBuilder, MetricCategory, RecordOutput};
use crate::sorts::streaming_merge::{SortedSpillFile, StreamingMergeBuilder};
use crate::spill::spill_manager::{GetSlicedSize, SpillManager};
use crate::stream::EmptyRecordBatchStream;
use crate::{PhysicalExpr, aggregates, metrics};
use crate::{RecordBatchStream, SendableRecordBatchStream};

Expand Down Expand Up @@ -834,6 +835,10 @@ impl Stream for GroupedHashAggregateStream {
self.group_values.len()
)));
}
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input =
Box::pin(EmptyRecordBatchStream::new(input_schema));
self.exec_state = ExecutionState::Done;
}
}
Expand Down Expand Up @@ -1219,6 +1224,11 @@ impl GroupedHashAggregateStream {
fn set_input_done_and_produce_output(&mut self) -> Result<()> {
self.input_done = true;
self.group_ordering.input_done();
// Release the original input pipeline's resources now that we're done
// reading from it. In the spill branch below, `self.input` is replaced
// again with a stream that merges spill files.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
let elapsed_compute = self.baseline_metrics.elapsed_compute().clone();
let timer = elapsed_compute.timer();
self.exec_state = if self.spill_state.spills.is_empty() {
Expand Down
4 changes: 4 additions & 0 deletions datafusion/physical-plan/src/aggregates/topk_stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -26,6 +26,7 @@ use crate::aggregates::{
evaluate_many,
};
use crate::metrics::BaselineMetrics;
use crate::stream::EmptyRecordBatchStream;
use crate::{RecordBatchStream, SendableRecordBatchStream};
use arrow::array::{Array, ArrayRef, RecordBatch};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -205,6 +206,9 @@ impl Stream for GroupedTopKAggregateStream {
}
// inner is done, emit all rows and switch to producing output
None => {
// Release the input pipeline's resources before emitting.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
if self.priority_map.is_empty() {
trace!("partition {} emit None", self.partition);
return Poll::Ready(None);
Expand Down
1 change: 1 addition & 0 deletions datafusion/physical-plan/src/analyze.rs
Original file line number Diff line number Diff line change
Expand Up @@ -214,6 +214,7 @@ impl ExecutionPlan for AnalyzeExec {
while let Some(batch) = input_stream.next().await.transpose()? {
total_rows += batch.num_rows();
}
drop(input_stream);

let duration = Instant::now() - start;
create_output_batch(
Expand Down
6 changes: 5 additions & 1 deletion datafusion/physical-plan/src/async_func.rs
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@

use crate::coalesce::LimitedBatchCoalescer;
use crate::metrics::{ExecutionPlanMetricsSet, MetricsSet};
use crate::stream::RecordBatchStreamAdapter;
use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter};
use crate::{
DisplayAs, DisplayFormatType, ExecutionPlan, ExecutionPlanProperties, PlanProperties,
check_if_same_properties,
Expand Down Expand Up @@ -290,6 +290,10 @@ impl Stream for CoalesceInputStream {
}
None => {
completed = true;
// Release the input pipeline's resources.
let input_schema = self.input_stream.schema();
self.input_stream =
Box::pin(EmptyRecordBatchStream::new(input_schema));
if let Err(err) = self.batch_coalescer.finish() {
return Poll::Ready(Some(Err(err)));
}
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/coalesce_batches.rs
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ use std::task::{Context, Poll};
use super::metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet};
use super::{DisplayAs, ExecutionPlanProperties, PlanProperties, Statistics};
use crate::projection::ProjectionExec;
use crate::stream::EmptyRecordBatchStream;
use crate::{
DisplayFormatType, ExecutionPlan, RecordBatchStream, SendableRecordBatchStream,
check_if_same_properties,
Expand Down Expand Up @@ -343,6 +344,8 @@ impl CoalesceBatchesStream {
None => {
// Input stream is exhausted, finalize any remaining batches
self.completed = true;
self.input =
Box::pin(EmptyRecordBatchStream::new(self.coalescer.schema()));
self.coalescer.finish()?;
}
Some(Ok(batch)) => {
Expand All @@ -353,6 +356,9 @@ impl CoalesceBatchesStream {
PushBatchStatus::LimitReached => {
// limit was reached, so stop early
self.completed = true;
self.input = Box::pin(EmptyRecordBatchStream::new(
self.coalescer.schema(),
));
self.coalescer.finish()?;
}
}
Expand Down
8 changes: 8 additions & 0 deletions datafusion/physical-plan/src/filter.rs
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ use crate::projection::{
EmbeddedProjection, ProjectionExec, ProjectionExpr, make_with_child,
try_embed_projection, update_expr,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
DisplayFormatType, ExecutionPlan,
metrics::{BaselineMetrics, ExecutionPlanMetricsSet, MetricsSet, RatioMetrics},
Expand Down Expand Up @@ -1030,6 +1031,9 @@ impl Stream for FilterExecStream {
match ready!(self.input.poll_next_unpin(cx)) {
None => {
self.batch_coalescer.finish()?;
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input = Box::pin(EmptyRecordBatchStream::new(input_schema));
// continue draining the coalescer
}
Some(Ok(batch)) => {
Expand Down Expand Up @@ -1070,6 +1074,10 @@ impl Stream for FilterExecStream {
PushBatchStatus::LimitReached => {
// limit was reached, so stop early
self.batch_coalescer.finish()?;
// Release the input pipeline's resources.
let input_schema = self.input.schema();
self.input =
Box::pin(EmptyRecordBatchStream::new(input_schema));
// continue draining the coalescer
}
}
Expand Down
8 changes: 7 additions & 1 deletion datafusion/physical-plan/src/joins/cross_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -31,6 +31,7 @@ use crate::projection::{
ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
physical_to_column_exprs,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
ColumnStatistics, DisplayAs, DisplayFormatType, Distribution, ExecutionPlan,
ExecutionPlanProperties, PlanProperties, RecordBatchStream,
Expand Down Expand Up @@ -645,7 +646,12 @@ impl<T: BatchTransformer> CrossJoinStream<T> {
let right_data = match ready!(self.right.poll_next_unpin(cx)) {
Some(Ok(right_data)) => right_data,
Some(Err(e)) => return Poll::Ready(Err(e)),
None => return Poll::Ready(Ok(StatefulStreamResult::Ready(None))),
None => {
// Release the right (probe) input pipeline's resources.
let right_schema = self.right.schema();
self.right = Box::pin(EmptyRecordBatchStream::new(right_schema));
return Poll::Ready(Ok(StatefulStreamResult::Ready(None)));
}
};
self.join_metrics.input_batches.add(1);
self.join_metrics.input_rows.add(right_data.num_rows());
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/hash_join/stream.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,6 +35,7 @@ use crate::joins::hash_join::shared_bounds::{
use crate::joins::utils::{
OnceFut, equal_rows_arr, get_final_indices_from_shared_bitmap,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
RecordBatchStream, SendableRecordBatchStream, handle_state,
hash_utils::create_hashes,
Expand Down Expand Up @@ -587,6 +588,11 @@ impl HashJoinStream {
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
match ready!(self.right.poll_next_unpin(cx)) {
None => {
// Release the probe-side input pipeline's resources. The schema
// is preserved so callers that still query `self.right.schema()`
// (e.g. for unmatched-build emission) keep working.
let right_schema = self.right.schema();
self.right = Box::pin(EmptyRecordBatchStream::new(right_schema));
self.state = HashJoinStreamState::ExhaustedProbeSide;
}
Some(Ok(batch)) => {
Expand Down
6 changes: 6 additions & 0 deletions datafusion/physical-plan/src/joins/nested_loop_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1611,6 +1611,12 @@ impl NestedLoopJoinStream {
}
}

// If the left stream is fully exhausted, release its resources so the
// upstream pipeline can be torn down before we move on to probing.
if self.left_exhausted {
active.left_stream = None;
}
Comment on lines +1614 to +1618
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

maybe there are cases that left exhausted is true and left stream is not None, because what is the reason to have both exhausted flag and Option

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes replied above


if active.pending_batches.is_empty() {
// No data at all — go directly to Done
self.left_exhausted = true;
Comment on lines 1621 to 1622
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

you forgot to update this as well

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

No bc above it can be converted back to false.
I only placed it there bc if at the point I placed it it is set to true, it means the input is actually depleted.

Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -39,6 +39,7 @@ use crate::joins::piecewise_merge_join::exec::{BufferedSide, BufferedSideReadySt
use crate::joins::piecewise_merge_join::utils::need_produce_result_in_final;
use crate::joins::utils::{BuildProbeJoinMetrics, StatefulStreamResult};
use crate::joins::utils::{JoinKeyComparator, get_final_indices_from_shared_bitmap};
use crate::stream::EmptyRecordBatchStream;

pub(super) enum PiecewiseMergeJoinStreamState {
WaitBufferedSide,
Expand Down Expand Up @@ -212,6 +213,9 @@ impl ClassicPWMJStream {
) -> Poll<Result<StatefulStreamResult<Option<RecordBatch>>>> {
match ready!(self.streamed.poll_next_unpin(cx)) {
None => {
// Release the streamed input pipeline's resources.
let streamed_schema = self.streamed.schema();
self.streamed = Box::pin(EmptyRecordBatchStream::new(streamed_schema));
if self
.buffered_side
.try_as_ready_mut()?
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -125,12 +125,12 @@ use std::pin::Pin;
use std::sync::Arc;
use std::task::{Context, Poll};

use crate::RecordBatchStream;
use crate::joins::utils::{JoinFilter, JoinKeyComparator, compare_join_arrays};
use crate::metrics::{
BaselineMetrics, Count, ExecutionPlanMetricsSet, Gauge, MetricBuilder,
};
use crate::spill::spill_manager::SpillManager;
use crate::{EmptyRecordBatchStream, RecordBatchStream};
use arrow::array::{Array, ArrayRef, BooleanArray, BooleanBufferBuilder, RecordBatch};
use arrow::compute::{BatchCoalescer, SortOptions, filter_record_batch, not};
use arrow::datatypes::SchemaRef;
Expand Down Expand Up @@ -475,7 +475,12 @@ impl BitwiseSortMergeJoinStream {
fn poll_next_outer_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
loop {
match ready!(self.outer.poll_next_unpin(cx)) {
None => return Poll::Ready(Ok(false)),
None => {
// Release the outer input pipeline's resources.
let outer_schema = self.outer.schema();
self.outer = Box::pin(EmptyRecordBatchStream::new(outer_schema));
return Poll::Ready(Ok(false));
}
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(batch)) => {
let batch_num_rows = batch.num_rows();
Expand Down Expand Up @@ -503,7 +508,12 @@ impl BitwiseSortMergeJoinStream {
fn poll_next_inner_batch(&mut self, cx: &mut Context<'_>) -> Poll<Result<bool>> {
loop {
match ready!(self.inner.poll_next_unpin(cx)) {
None => return Poll::Ready(Ok(false)),
None => {
// Release the inner input pipeline's resources.
let inner_schema = self.inner.schema();
self.inner = Box::pin(EmptyRecordBatchStream::new(inner_schema));
return Poll::Ready(Ok(false));
}
Some(Err(e)) => return Poll::Ready(Err(e)),
Some(Ok(batch)) => {
let batch_num_rows = batch.num_rows();
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,7 @@ use crate::joins::sort_merge_join::metrics::SortMergeJoinMetrics;
use crate::joins::utils::{JoinFilter, JoinKeyComparator};
use crate::metrics::RecordOutput;
use crate::spill::spill_manager::SpillManager;
use crate::stream::EmptyRecordBatchStream;
use crate::{PhysicalExpr, RecordBatchStream, SendableRecordBatchStream};

use arrow::array::{types::UInt64Type, *};
Expand Down Expand Up @@ -919,6 +920,10 @@ impl MaterializingSortMergeJoinStream {
return Poll::Pending;
}
Poll::Ready(None) => {
// Release the streamed input pipeline's resources.
let streamed_schema = self.streamed.schema();
self.streamed =
Box::pin(EmptyRecordBatchStream::new(streamed_schema));
self.streamed_state = StreamedState::Exhausted;
}
Poll::Ready(Some(batch)) => {
Expand Down Expand Up @@ -1033,6 +1038,10 @@ impl MaterializingSortMergeJoinStream {
return Poll::Pending;
}
Poll::Ready(None) => {
// Release the buffered input pipeline's resources.
let buffered_schema = self.buffered.schema();
self.buffered =
Box::pin(EmptyRecordBatchStream::new(buffered_schema));
self.buffered_state = BufferedState::Exhausted;
return Poll::Ready(None);
}
Expand Down Expand Up @@ -1076,6 +1085,11 @@ impl MaterializingSortMergeJoinStream {
return Poll::Pending;
}
Poll::Ready(None) => {
// Release the buffered input pipeline's resources.
let buffered_schema = self.buffered.schema();
self.buffered = Box::pin(EmptyRecordBatchStream::new(
buffered_schema,
));
self.buffered_state = BufferedState::Ready;
}
Poll::Ready(Some(batch)) => {
Expand Down
18 changes: 18 additions & 0 deletions datafusion/physical-plan/src/joins/symmetric_hash_join.rs
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,7 @@ use crate::projection::{
ProjectionExec, join_allows_pushdown, join_table_borders, new_join_children,
physical_to_column_exprs, update_join_filter, update_join_on,
};
use crate::stream::EmptyRecordBatchStream;
use crate::{
DisplayAs, DisplayFormatType, Distribution, ExecutionPlan, ExecutionPlanProperties,
PlanProperties, RecordBatchStream, SendableRecordBatchStream,
Expand Down Expand Up @@ -1406,6 +1407,19 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
}
}

/// Release the right input pipeline's resources.
fn cleanup_depleted_right_stream(&mut self) {
let right_schema = self.right_stream.schema();
self.right_stream = Box::pin(EmptyRecordBatchStream::new(right_schema));
}

/// Release the left input pipeline's resources.
fn cleanup_depleted_left_stream(&mut self) {
let left_schema = self.left_stream.schema();
self.left_stream = Box::pin(EmptyRecordBatchStream::new(left_schema));
}

/// Asynchronously pulls the next batch from the right stream.
///
/// This default implementation checks for the next value in the right stream.
Expand All @@ -1429,6 +1443,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
Some(Err(e)) => Poll::Ready(Err(e)),
None => {
self.cleanup_depleted_right_stream();
self.set_state(SHJStreamState::RightExhausted);
Poll::Ready(Ok(StatefulStreamResult::Continue))
}
Expand Down Expand Up @@ -1458,6 +1473,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
Some(Err(e)) => Poll::Ready(Err(e)),
None => {
self.cleanup_depleted_left_stream();
self.set_state(SHJStreamState::LeftExhausted);
Poll::Ready(Ok(StatefulStreamResult::Continue))
}
Expand Down Expand Up @@ -1487,6 +1503,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
Some(Err(e)) => Poll::Ready(Err(e)),
None => {
self.cleanup_depleted_left_stream();
self.set_state(SHJStreamState::BothExhausted {
final_result: false,
});
Expand Down Expand Up @@ -1518,6 +1535,7 @@ impl<T: BatchTransformer> SymmetricHashJoinStream<T> {
}
Some(Err(e)) => Poll::Ready(Err(e)),
None => {
self.cleanup_depleted_right_stream();
self.set_state(SHJStreamState::BothExhausted {
final_result: false,
});
Expand Down
8 changes: 6 additions & 2 deletions datafusion/physical-plan/src/repartition/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -38,7 +38,7 @@ use crate::projection::{ProjectionExec, all_columns, make_with_child, update_exp
use crate::sorts::streaming_merge::StreamingMergeBuilder;
use crate::spill::spill_manager::SpillManager;
use crate::spill::spill_pool::{self, SpillPoolWriter};
use crate::stream::RecordBatchStreamAdapter;
use crate::stream::{EmptyRecordBatchStream, RecordBatchStreamAdapter};
use crate::{
DisplayFormatType, ExecutionPlan, Partitioning, PlanProperties, Statistics,
check_if_same_properties,
Expand Down Expand Up @@ -1757,7 +1757,11 @@ impl PerPartitionStream {
return Poll::Ready(Some(Err(e)));
}
Poll::Ready(None) => {
// Spill stream ended, keep draining the memory channel
// Spill stream ended — release its resources before
// we go back to draining the memory channel.
let spill_schema = self.spill_stream.schema();
self.spill_stream =
Box::pin(EmptyRecordBatchStream::new(spill_schema));
self.state = StreamState::ReadingMemory;
}
Poll::Pending => {
Expand Down
Loading
Loading