Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
54 changes: 51 additions & 3 deletions datafusion/execution/src/memory_pool/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -20,10 +20,13 @@

use datafusion_common::{Result, internal_datafusion_err};
use std::fmt::Display;
use std::future::Future;
use std::hash::{Hash, Hasher};
use std::pin::Pin;
use std::{cmp::Ordering, sync::Arc, sync::atomic};

mod pool;
mod reclaimer;

#[cfg(feature = "arrow_buffer_pool")]
pub mod arrow;
Expand All @@ -36,6 +39,7 @@ pub use datafusion_common::{
human_readable_count, human_readable_duration, human_readable_size, units,
};
pub use pool::*;
pub use reclaimer::MemoryReclaimer;

/// Tracks and potentially limits memory use across operators during execution.
///
Expand Down Expand Up @@ -209,6 +213,17 @@ pub trait MemoryPool: Send + Sync + std::fmt::Debug + Display {
/// On error the `allocation` will not be increased in size
fn try_grow(&self, reservation: &MemoryReservation, additional: usize) -> Result<()>;

/// Async variant of [`Self::try_grow`]. Default delegates to the
/// sync version; reclaim-aware pools (e.g. [`TrackConsumersPool`])
/// override to invoke registered [`MemoryReclaimer`]s on OOM.
fn try_grow_async<'a>(
&'a self,
reservation: &'a MemoryReservation,
additional: usize,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move { self.try_grow(reservation, additional) })
}

/// Return the total amount of memory reserved
fn reserved(&self) -> usize;

Expand Down Expand Up @@ -249,6 +264,9 @@ pub struct MemoryConsumer {
name: String,
can_spill: bool,
id: usize,
/// Reclaimer collected by reclaim-aware pools at register time. Not
/// part of consumer identity (excluded from `Eq`/`Hash`).
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
}

impl PartialEq for MemoryConsumer {
Expand Down Expand Up @@ -287,20 +305,39 @@ impl MemoryConsumer {
name: name.into(),
can_spill: false,
id: Self::new_unique_id(),
reclaimer: None,
}
}

/// Returns a clone of this [`MemoryConsumer`] with a new unique id,
/// which can be registered with a [`MemoryPool`],
/// This new consumer is separate from the original.
/// Clone this [`MemoryConsumer`] with a new unique id.
///
/// Drops any attached reclaimer: it is bound to the original operator's
/// state and would target the wrong owner under a new id (and bypass
/// the id-keyed requestor-self-skip in `try_grow_async`).
pub fn clone_with_new_id(&self) -> Self {
Self {
name: self.name.clone(),
can_spill: self.can_spill,
id: Self::new_unique_id(),
reclaimer: None,
}
}

/// Attach a [`MemoryReclaimer`] and mark this consumer spill-capable.
/// Pools without reclaim support ignore the reclaimer.
pub fn with_reclaimer(self, reclaimer: Arc<dyn MemoryReclaimer>) -> Self {
Self {
can_spill: true,
reclaimer: Some(reclaimer),
..self
}
}

/// Returns the attached [`MemoryReclaimer`], if any.
pub fn reclaimer(&self) -> Option<&Arc<dyn MemoryReclaimer>> {
self.reclaimer.as_ref()
}

/// Return the unique id of this [`MemoryConsumer`]
pub fn id(&self) -> usize {
self.id
Expand Down Expand Up @@ -461,6 +498,17 @@ impl MemoryReservation {
Ok(())
}

/// Async variant of [`Self::try_grow`]. On a reclaim-aware pool,
/// triggers registered [`MemoryReclaimer`]s before surfacing OOM.
pub async fn try_grow_async(&self, capacity: usize) -> Result<()> {
self.registration
.pool
.try_grow_async(self, capacity)
.await?;
self.size.fetch_add(capacity, atomic::Ordering::Relaxed);
Ok(())
}

/// Splits off `capacity` bytes from this [`MemoryReservation`]
/// into a new [`MemoryReservation`] with the same
/// [`MemoryConsumer`].
Expand Down
71 changes: 70 additions & 1 deletion datafusion/execution/src/memory_pool/pool.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,13 +16,17 @@
// under the License.

use crate::memory_pool::{
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReservation, human_readable_size,
MemoryConsumer, MemoryLimit, MemoryPool, MemoryReclaimer, MemoryReservation,
human_readable_size,
};
use datafusion_common::HashMap;
use datafusion_common::{DataFusionError, Result, resources_datafusion_err};
use log::debug;
use parking_lot::Mutex;
use std::fmt::{Display, Formatter};
use std::future::Future;
use std::pin::Pin;
use std::sync::Arc;
use std::{
num::NonZeroUsize,
sync::atomic::{AtomicUsize, Ordering},
Expand Down Expand Up @@ -324,6 +328,7 @@ struct TrackedConsumer {
can_spill: bool,
reserved: AtomicUsize,
peak: AtomicUsize,
reclaimer: Option<Arc<dyn MemoryReclaimer>>,
}

impl TrackedConsumer {
Expand Down Expand Up @@ -533,6 +538,7 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
can_spill: consumer.can_spill(),
reserved: Default::default(),
peak: Default::default(),
reclaimer: consumer.reclaimer().cloned(),
},
);

Expand Down Expand Up @@ -593,6 +599,69 @@ impl<I: MemoryPool> MemoryPool for TrackConsumersPool<I> {
Ok(())
}

fn try_grow_async<'a>(
&'a self,
reservation: &'a MemoryReservation,
additional: usize,
) -> Pin<Box<dyn Future<Output = Result<()>> + Send + 'a>> {
Box::pin(async move {
// Fast path.
let initial_err = match self.try_grow(reservation, additional) {
Ok(()) => return Ok(()),
Err(e) => e,
};

// Snapshot reclaimers. Skip the requestor (self-reclaim risks
// deadlock) and zero-byte consumers.
let requestor_id = reservation.consumer().id();
let mut candidates = {
let guard = self.tracked_consumers.lock();
guard
.iter()
.filter_map(|(cid, tc)| {
let reclaimer = tc.reclaimer.as_ref()?;
if *cid == requestor_id || tc.reserved() == 0 {
return None;
}
Some((tc.reserved(), Arc::clone(reclaimer)))
})
.collect::<Vec<_>>()
};
// Order: priority desc, then reservation size desc.
candidates.sort_by(|(left_reserved, left), (right_reserved, right)| {
right
.priority()
.cmp(&left.priority())
.then_with(|| right_reserved.cmp(left_reserved))
});

// Reclaim, retry after each, exit on success.
for (_, reclaimer) in candidates {
if let Err(e) = reclaimer.reclaim(additional).await {
debug!("memory reclaimer returned error: {e}");
continue;
}
if self.try_grow(reservation, additional).is_ok() {
return Ok(());
}
}

// Fall through to the inner pool's own reclaim path, if any.
// Default impl just re-runs try_grow — cheap.
self.inner
.try_grow_async(reservation, additional)
.await
.map_err(|_| initial_err)?;
self.tracked_consumers
.lock()
.entry(reservation.consumer().id())
.and_modify(|tracked_consumer| {
tracked_consumer.grow(additional);
});
Ok(())
})
}

fn reserved(&self) -> usize {
self.inner.reserved()
}
Expand Down
61 changes: 61 additions & 0 deletions datafusion/execution/src/memory_pool/reclaimer.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,61 @@
// Licensed to the Apache Software Foundation (ASF) under one
// or more contributor license agreements. See the NOTICE file
// distributed with this work for additional information
// regarding copyright ownership. The ASF licenses this file
// to you under the Apache License, Version 2.0 (the
// "License"); you may not use this file except in compliance
// with the License. You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing,
// software distributed under the License is distributed on an
// "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
// KIND, either express or implied. See the License for the
// specific language governing permissions and limitations
// under the License.

//! Operator hook used by a [`MemoryPool`] to free memory before an
//! allocation fails.
//!
//! [`MemoryPool`]: super::MemoryPool

use datafusion_common::Result;
use std::fmt::Debug;

/// Hook attached to a [`MemoryConsumer`] via
/// [`MemoryConsumer::with_reclaimer`]. On
/// [`MemoryPool::try_grow_async`] failure the pool walks registered
/// reclaimers in descending [`Self::priority`] and asks each to free bytes.
///
/// Implementations MUST:
///
/// - Spill data **before** shrinking the reservation, so reported bytes
/// are recoverable.
/// - Release bytes via [`MemoryReservation::shrink`] /
/// [`MemoryReservation::free`] and return at most `target`.
/// - Not call `try_grow*` on the pool — risks reentrancy/deadlock.
/// - Not capture `Arc<MemoryReservation>` / `Arc<MemoryConsumer>`
/// (creates a cycle that blocks `unregister`); use channels or `Weak`.
///
/// [`MemoryConsumer`]: super::MemoryConsumer
/// [`MemoryConsumer::with_reclaimer`]: super::MemoryConsumer::with_reclaimer
/// [`MemoryPool::try_grow_async`]: super::MemoryPool::try_grow_async
/// [`MemoryReservation::shrink`]: super::MemoryReservation::shrink
/// [`MemoryReservation::free`]: super::MemoryReservation::free
#[async_trait::async_trait]
pub trait MemoryReclaimer: Send + Sync + Debug {
/// Upper bound on bytes this reclaimer can free. `None` = unknown.
fn reclaimable_bytes(&self) -> Option<usize> {
None
}

/// Free up to `target` bytes; return the amount actually released.
/// See trait-level contract.
async fn reclaim(&self, target: usize) -> Result<usize>;

/// Higher priorities reclaim first. Negative = last resort.
fn priority(&self) -> i32 {
0
}
}
Loading
Loading