diff --git a/src/core/src/cache/budget.rs b/src/core/src/cache/budget.rs index 921b388f..59271e56 100644 --- a/src/core/src/cache/budget.rs +++ b/src/core/src/cache/budget.rs @@ -58,6 +58,14 @@ impl BudgetAccounting { } } + /// Release previously reserved memory bytes — used when an entry is + /// dropped from the cache without being persisted. + pub(super) fn release_memory(&self, bytes: usize) { + if bytes != 0 { + self.used_memory_bytes.fetch_sub(bytes, Ordering::Relaxed); + } + } + pub fn memory_usage_bytes(&self) -> usize { self.used_memory_bytes.load(Ordering::Relaxed) } diff --git a/src/core/src/cache/core.rs b/src/core/src/cache/core.rs index 7a5d76df..c343d432 100644 --- a/src/core/src/cache/core.rs +++ b/src/core/src/cache/core.rs @@ -302,7 +302,9 @@ impl LiquidCache { } } - /// Insert a batch into the cache, it will run cache replacement policy until the batch is inserted. + /// Insert a batch into the cache. RAM-only: when full, evict the LRU + /// victim by dropping it; never spill to disk. If the new entry doesn't + /// fit even after the cache is empty, silently skip caching it. pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CacheEntry) { loop { let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else { @@ -313,24 +315,28 @@ impl LiquidCache { kind: CachedBatchType::from(¬_inserted), }); - let victims = self.cache_policy.find_victim(8); + // Evict one victim per iteration: dropped data can't come back, so + // over-eviction would silently discard recently-used entries. + let victims = self.cache_policy.find_victim(1); if victims.is_empty() { - // no advice, because the cache is already empty - // this can happen if the entry to be inserted is too large, in that case, - // we write it to disk - let on_disk_batch = self - .write_in_memory_batch_to_disk(entry_id, not_inserted) - .await; - batch_to_cache = on_disk_batch; - continue; + // Nothing left to free; the entry is larger than the budget. + return; } - self.squeeze_victims(victims).await; - + self.drop_victims(&victims); batch_to_cache = not_inserted; crate::utils::yield_now_if_shuttle(); } } + fn drop_victims(&self, victims: &[EntryID]) { + for victim in victims { + self.trace(InternalEvent::SqueezeVictim { entry: *victim }); + if let Some(entry) = self.index.remove(victim) { + self.budget.release_memory(entry.memory_usage_bytes()); + } + } + } + /// Create a new instance of CacheStorage. pub(crate) fn new( batch_size: usize, diff --git a/src/core/src/cache/index.rs b/src/core/src/cache/index.rs index b2a84edd..5561fe9e 100644 --- a/src/core/src/cache/index.rs +++ b/src/core/src/cache/index.rs @@ -51,6 +51,17 @@ impl ArtIndex { } } + /// Remove an entry from the index. Returns the removed entry, or `None` if + /// the key was not present. + pub(crate) fn remove(&self, entry_id: &EntryID) -> Option> { + let guard = self.art.pin(); + let removed = self.art.remove(*entry_id, &guard); + if removed.is_some() { + self.entry_count.fetch_sub(1, Ordering::Relaxed); + } + removed + } + pub(crate) fn reset(&self) { let guard = self.art.pin(); self.art.keys().into_iter().for_each(|k| {