Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions src/core/src/cache/budget.rs
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,14 @@ impl BudgetAccounting {
}
}

/// Release previously reserved memory bytes — used when an entry is
/// dropped from the cache without being persisted.
pub(super) fn release_memory(&self, bytes: usize) {
if bytes != 0 {
self.used_memory_bytes.fetch_sub(bytes, Ordering::Relaxed);
}
}

pub fn memory_usage_bytes(&self) -> usize {
self.used_memory_bytes.load(Ordering::Relaxed)
}
Expand Down
30 changes: 18 additions & 12 deletions src/core/src/cache/core.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,7 +302,9 @@ impl LiquidCache {
}
}

/// Insert a batch into the cache, it will run cache replacement policy until the batch is inserted.
/// Insert a batch into the cache. RAM-only: when full, evict the LRU
/// victim by dropping it; never spill to disk. If the new entry doesn't
/// fit even after the cache is empty, silently skip caching it.
pub(crate) async fn insert_inner(&self, entry_id: EntryID, mut batch_to_cache: CacheEntry) {
loop {
let Err(not_inserted) = self.try_insert(entry_id, batch_to_cache) else {
Expand All @@ -313,24 +315,28 @@ impl LiquidCache {
kind: CachedBatchType::from(&not_inserted),
});

let victims = self.cache_policy.find_victim(8);
// Evict one victim per iteration: dropped data can't come back, so
// over-eviction would silently discard recently-used entries.
let victims = self.cache_policy.find_victim(1);
if victims.is_empty() {
// no advice, because the cache is already empty
// this can happen if the entry to be inserted is too large, in that case,
// we write it to disk
let on_disk_batch = self
.write_in_memory_batch_to_disk(entry_id, not_inserted)
.await;
batch_to_cache = on_disk_batch;
continue;
// Nothing left to free; the entry is larger than the budget.
return;
}
Comment on lines +320 to 324
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Two existing tests assume the now-removed squeeze/spill path and will fail under this change:

  • src/core/src/cache/core.rs:982 (test_cache_advice_strategies) does store.index().get(&entry_id1).unwrap() and asserts MemoryLiquid after eviction. Under RAM-only behavior entry_id1 is dropped, so unwrap() panics.
  • src/core/src/cache/tests/policies.rs:26 (default_policies) inserts 5 entries into a 2-entry-capacity cache and reads back all 5 — only the last 2 survive now, the earlier reads will return None and the snapshot will diverge.
  • src/core/src/cache/tests/policies.rs:51 (insert_wont_fit_cache) inserts an oversized entry and unwraps the read result; under the new code the insert is silently skipped (the return here on victims.is_empty()), so the read returns None and the test panics. The corresponding liquid_cache__cache__tests__policies__insert_wont_fit_cache.snap is also stale.

Please update or replace these tests to verify the new RAM-only behavior (entries are dropped on eviction; oversized entries are not cached) and refresh the affected snapshot.

self.squeeze_victims(victims).await;

self.drop_victims(&victims);
batch_to_cache = not_inserted;
crate::utils::yield_now_if_shuttle();
}
}

fn drop_victims(&self, victims: &[EntryID]) {
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: (not blocking) with the new RAM-only insert path, several pieces of code are no longer reachable: squeeze_victims / squeeze_victim_inner (~lines 411–458), write_in_memory_batch_to_disk (~lines 260–303), and the squeeze_policy field on LiquidCache together with LiquidCacheBuilder::with_squeeze_policy. Either delete them, or — if "RAM-only" is meant to be a configurable mode rather than the only mode — wire up an explicit toggle on the builder. As-is, callers can still pass a SqueezePolicy that silently has no effect, which is misleading.

for victim in victims {
self.trace(InternalEvent::SqueezeVictim { entry: *victim });
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: (not blocking) the trace event is still InternalEvent::SqueezeVictim, but the new path drops the victim rather than squeezing it. Consider renaming the variant (or adding a new DropVictim event) so traces accurately describe what happened.

if let Some(entry) = self.index.remove(victim) {
self.budget.release_memory(entry.memory_usage_bytes());
}
}
}

/// Create a new instance of CacheStorage.
pub(crate) fn new(
batch_size: usize,
Expand Down
11 changes: 11 additions & 0 deletions src/core/src/cache/index.rs
Original file line number Diff line number Diff line change
Expand Up @@ -51,6 +51,17 @@ impl ArtIndex {
}
}

/// Remove an entry from the index. Returns the removed entry, or `None` if
/// the key was not present.
pub(crate) fn remove(&self, entry_id: &EntryID) -> Option<Arc<CacheEntry>> {
let guard = self.art.pin();
let removed = self.art.remove(*entry_id, &guard);
if removed.is_some() {
self.entry_count.fetch_sub(1, Ordering::Relaxed);
}
removed
}

pub(crate) fn reset(&self) {
let guard = self.art.pin();
self.art.keys().into_iter().for_each(|k| {
Expand Down
Loading