From a9c876a9560dc888a9a935cc67cdb7a68c2d2a32 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Mon, 6 Apr 2026 13:09:08 +0530 Subject: [PATCH 1/2] Bind stable vector handles during planning --- src/planner.rs | 62 ++++++++++++++++++++++++++----------------------- src/registry.rs | 37 ++++++++++++++++------------- src/udtf.rs | 4 ++-- 3 files changed, 56 insertions(+), 47 deletions(-) diff --git a/src/planner.rs b/src/planner.rs index 9cf5dc6..36b6fa5 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -140,18 +140,8 @@ impl ExtensionPlanner for USearchExecPlanner { None => return Ok(None), }; - // Async: ensure the index is loaded (downloads on cache miss). - self.registry.ensure_loaded(&node.table_name).await?; - - let registered = match self.registry.resolve(&node.table_name) { - Some(r) => r, - None => { - return Err(DataFusionError::Execution(format!( - "USearchExecPlanner: table '{}' not available after ensure_loaded", - node.table_name - ))); - } - }; + // Async: bind a query-stable registry entry during planning. + let registered = self.registry.prepare(&node.table_name).await?; let exec_props = session_state.execution_props(); @@ -223,17 +213,22 @@ impl ExtensionPlanner for USearchExecPlanner { None }; + let schema = registered.schema.clone(); + let key_col = registered.key_col.clone(); + let scalar_kind = registered.scalar_kind; + let brute_force_threshold = registered.config.brute_force_selectivity_threshold; + Ok(Some(Arc::new(USearchExec::new(SearchParams { table_name: node.table_name.clone(), - registry: self.registry.clone(), + registered, query_vec: node.query_vec_f64(), k: node.k, distance_type: node.distance_type.clone(), has_filters: !node.filters.is_empty(), - schema: registered.schema.clone(), - key_col: registered.key_col.clone(), - scalar_kind: registered.scalar_kind, - brute_force_threshold: registered.config.brute_force_selectivity_threshold, + schema, + key_col, + scalar_kind, + brute_force_threshold, provider_scan, })))) } @@ -242,10 +237,10 @@ impl ExtensionPlanner for USearchExecPlanner { // ── Search parameters ───────────────────────────────────────────────────────── /// All parameters needed to run a USearch query, cloned cheaply into execute(). -#[derive(Debug, Clone)] +#[derive(Clone)] struct SearchParams { table_name: String, - registry: Arc, + registered: Arc, query_vec: Vec, k: usize, distance_type: DistanceType, @@ -261,6 +256,24 @@ struct SearchParams { provider_scan: Option>, } +impl fmt::Debug for SearchParams { + fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result { + f.debug_struct("SearchParams") + .field("table_name", &self.table_name) + .field("k", &self.k) + .field("has_filters", &self.has_filters) + .field("schema", &self.schema) + .field("key_col", &self.key_col) + .field("scalar_kind", &self.scalar_kind) + .field("brute_force_threshold", &self.brute_force_threshold) + .field( + "provider_scan", + &self.provider_scan.as_ref().map(|_| "Some(..)"), + ) + .finish_non_exhaustive() + } +} + // ── Physical execution node ─────────────────────────────────────────────────── /// Leaf execution plan that defers all I/O to execute() time. @@ -361,16 +374,7 @@ async fn usearch_execute( params: SearchParams, task_ctx: Arc, ) -> Result> { - // Re-fetch at execute time and reload on cache miss so eviction between - // plan and execute is handled correctly for async-backed resolvers. - params.registry.ensure_loaded(¶ms.table_name).await?; - - let registered = params.registry.resolve(¶ms.table_name).ok_or_else(|| { - DataFusionError::Execution(format!( - "USearchExec: table '{}' not available after ensure_loaded at execute time", - params.table_name - )) - })?; + let registered = params.registered.clone(); if !params.has_filters { // ── Unfiltered path ─────────────────────────────────────────────── diff --git a/src/registry.rs b/src/registry.rs index 21157a1..928aecd 100644 --- a/src/registry.rs +++ b/src/registry.rs @@ -230,14 +230,16 @@ pub struct VectorIndexMeta { /// Trait for resolving vector index entries by name. /// -/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline: +/// DataFusion's optimizer is synchronous but physical planning is async, so the +/// resolver exposes two levels: /// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies -/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path) -/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor) +/// - `resolve()` — sync, cache-only: used by synchronous callers such as the UDTF +/// - `prepare()` — async: returns a query-stable loaded entry for planning/execution /// -/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups -/// (always cached). Production systems can implement catalog-backed loading in -/// `ensure_loaded`. +/// The important contract is that `prepare()` returns the exact loaded +/// [`RegisteredTable`] the query should execute against. This avoids a second +/// lookup during execution and removes the planner/execute handoff gap where a +/// newer generation could replace the registry entry mid-query. #[async_trait::async_trait] pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug { /// Sync, cheap: check if a vector index exists and return its metadata. @@ -245,15 +247,17 @@ pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug { /// Must NOT do I/O — only check local cache or lightweight state. fn peek(&self, name: &str) -> Option; - /// Sync: return a fully loaded entry from cache. - /// Returns `None` on cache miss — the caller should call `ensure_loaded` first. + /// Sync, cache-only: return a fully loaded entry from local cache. + /// Returns `None` on cache miss. fn resolve(&self, name: &str) -> Option>; - /// Async: ensure the index is loaded and available in cache. - /// On cache miss or stale entry, downloads files and loads the index. - /// On cache hit with current generation, returns immediately. - /// Called from the async physical planner before building the execution plan. - async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>; + /// Async: return a query-stable loaded entry. + /// + /// On cache miss or stale entry, implementations may download files and + /// load the current generation. On success, the returned [`Arc`] must stay + /// valid for the lifetime of the query even if a newer generation is loaded + /// concurrently afterwards. + async fn prepare(&self, name: &str) -> datafusion::common::Result>; } // ── USearchRegistry ─────────────────────────────────────────────────────────── @@ -438,9 +442,10 @@ impl VectorIndexResolver for USearchRegistry { self.get(name) } - async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> { - // USearchRegistry is always fully cached — nothing to load. - Ok(()) + async fn prepare(&self, name: &str) -> datafusion::common::Result> { + self.get(name).ok_or_else(|| { + DataFusionError::Execution(format!("USearchRegistry: table '{name}' is not loaded")) + }) } } diff --git a/src/udtf.rs b/src/udtf.rs index 6ee686f..fd4da2b 100644 --- a/src/udtf.rs +++ b/src/udtf.rs @@ -44,8 +44,8 @@ use crate::registry::VectorIndexResolver; /// /// This entry point is synchronous. For async-backed [`VectorIndexResolver`] /// implementations, it only works when the target index is already loaded in -/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and -/// cannot trigger async index loads. +/// the local cache. `vector_usearch()` does not call `prepare()` and cannot +/// trigger async index loads. pub struct USearchUDTF { registry: Arc, } From 85a83a667fa54f9158158c302fba7c691a9f6fa6 Mon Sep 17 00:00:00 2001 From: Anoop Narang Date: Mon, 6 Apr 2026 13:24:17 +0530 Subject: [PATCH 2/2] refactor(planner): remove redundant flat fields from SearchParams Access schema, key_col, scalar_kind, and brute_force_threshold through params.registered.* instead of duplicating them as flat fields on SearchParams. --- src/planner.rs | 44 +++++++++++++++++--------------------------- 1 file changed, 17 insertions(+), 27 deletions(-) diff --git a/src/planner.rs b/src/planner.rs index 36b6fa5..b863e53 100644 --- a/src/planner.rs +++ b/src/planner.rs @@ -213,11 +213,6 @@ impl ExtensionPlanner for USearchExecPlanner { None }; - let schema = registered.schema.clone(); - let key_col = registered.key_col.clone(); - let scalar_kind = registered.scalar_kind; - let brute_force_threshold = registered.config.brute_force_selectivity_threshold; - Ok(Some(Arc::new(USearchExec::new(SearchParams { table_name: node.table_name.clone(), registered, @@ -225,10 +220,6 @@ impl ExtensionPlanner for USearchExecPlanner { k: node.k, distance_type: node.distance_type.clone(), has_filters: !node.filters.is_empty(), - schema, - key_col, - scalar_kind, - brute_force_threshold, provider_scan, })))) } @@ -247,10 +238,6 @@ struct SearchParams { /// Whether the query has WHERE-clause filters. Used to choose between the /// unfiltered HNSW path and the adaptive filtered path. has_filters: bool, - schema: SchemaRef, - key_col: String, - scalar_kind: ScalarKind, - brute_force_threshold: f64, /// Pre-planned provider scan for the filtered path (_key + filter cols only). /// Used for selectivity estimation. None for the unfiltered path. provider_scan: Option>, @@ -262,10 +249,13 @@ impl fmt::Debug for SearchParams { .field("table_name", &self.table_name) .field("k", &self.k) .field("has_filters", &self.has_filters) - .field("schema", &self.schema) - .field("key_col", &self.key_col) - .field("scalar_kind", &self.scalar_kind) - .field("brute_force_threshold", &self.brute_force_threshold) + .field("schema", &self.registered.schema) + .field("key_col", &self.registered.key_col) + .field("scalar_kind", &self.registered.scalar_kind) + .field( + "brute_force_threshold", + &self.registered.config.brute_force_selectivity_threshold, + ) .field( "provider_scan", &self.provider_scan.as_ref().map(|_| "Some(..)"), @@ -286,7 +276,7 @@ pub struct USearchExec { impl USearchExec { fn new(params: SearchParams) -> Self { let properties = PlanProperties::new( - EquivalenceProperties::new(params.schema.clone()), + EquivalenceProperties::new(params.registered.schema.clone()), Partitioning::UnknownPartitioning(1), EmissionType::Incremental, Boundedness::Bounded, @@ -353,7 +343,7 @@ impl ExecutionPlan for USearchExec { }); Ok(Box::pin(RecordBatchStreamAdapter::new( - self.params.schema.clone(), + self.params.registered.schema.clone(), stream, ))) } @@ -389,7 +379,7 @@ async fn usearch_execute( ®istered.index, ¶ms.query_vec, params.k, - params.scalar_kind, + registered.scalar_kind, )? }; @@ -408,7 +398,7 @@ async fn usearch_execute( let data_batches = async { registered .lookup_provider - .fetch_by_keys(&matches.keys, ¶ms.key_col, None) + .fetch_by_keys(&matches.keys, ®istered.key_col, None) .await } .instrument(tracing::info_span!( @@ -419,7 +409,7 @@ async fn usearch_execute( let key_col_idx = provider_key_col_idx(®istered)?; let _span = tracing::info_span!("usearch_attach_distances").entered(); - attach_distances(data_batches, key_col_idx, &key_to_dist, ¶ms.schema) + attach_distances(data_batches, key_col_idx, &key_to_dist, ®istered.schema) } else { // ── Adaptive filtered path ──────────────────────────────────────── let scan = params.provider_scan.clone().ok_or_else(|| { @@ -493,7 +483,7 @@ async fn adaptive_filtered_execute( let total = registered.index.size(); let selectivity = valid_keys.len() as f64 / total.max(1) as f64; - let threshold = params.brute_force_threshold; + let threshold = registered.config.brute_force_selectivity_threshold; let path = if selectivity <= threshold { "index-get" @@ -539,7 +529,7 @@ async fn adaptive_filtered_execute( let data_batches = async { registered .lookup_provider - .fetch_by_keys(&fetch_keys, ¶ms.key_col, None) + .fetch_by_keys(&fetch_keys, ®istered.key_col, None) .await } .instrument(tracing::info_span!( @@ -554,7 +544,7 @@ async fn adaptive_filtered_execute( data_batches, lookup_key_col_idx, &key_to_dist, - ¶ms.schema, + ®istered.schema, )? }; @@ -595,7 +585,7 @@ async fn adaptive_filtered_execute( let data_batches = async { registered .lookup_provider - .fetch_by_keys(&matches.keys, ¶ms.key_col, None) + .fetch_by_keys(&matches.keys, ®istered.key_col, None) .await } .instrument(tracing::info_span!( @@ -610,7 +600,7 @@ async fn adaptive_filtered_execute( data_batches, lookup_key_col_idx, &key_to_dist, - ¶ms.schema, + ®istered.schema, )? };