Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
80 changes: 37 additions & 43 deletions src/planner.rs
Original file line number Diff line number Diff line change
Expand Up @@ -140,18 +140,8 @@ impl ExtensionPlanner for USearchExecPlanner {
None => return Ok(None),
};

// Async: ensure the index is loaded (downloads on cache miss).
self.registry.ensure_loaded(&node.table_name).await?;

let registered = match self.registry.resolve(&node.table_name) {
Some(r) => r,
None => {
return Err(DataFusionError::Execution(format!(
"USearchExecPlanner: table '{}' not available after ensure_loaded",
node.table_name
)));
}
};
// Async: bind a query-stable registry entry during planning.
let registered = self.registry.prepare(&node.table_name).await?;

let exec_props = session_state.execution_props();

Expand Down Expand Up @@ -225,15 +215,11 @@ impl ExtensionPlanner for USearchExecPlanner {

Ok(Some(Arc::new(USearchExec::new(SearchParams {
table_name: node.table_name.clone(),
registry: self.registry.clone(),
registered,
query_vec: node.query_vec_f64(),
k: node.k,
distance_type: node.distance_type.clone(),
has_filters: !node.filters.is_empty(),
schema: registered.schema.clone(),
key_col: registered.key_col.clone(),
scalar_kind: registered.scalar_kind,
brute_force_threshold: registered.config.brute_force_selectivity_threshold,
provider_scan,
}))))
}
Expand All @@ -242,25 +228,42 @@ impl ExtensionPlanner for USearchExecPlanner {
// ── Search parameters ─────────────────────────────────────────────────────────

/// All parameters needed to run a USearch query, cloned cheaply into execute().
#[derive(Debug, Clone)]
#[derive(Clone)]
struct SearchParams {
table_name: String,
registry: Arc<dyn VectorIndexResolver>,
registered: Arc<crate::registry::RegisteredTable>,
query_vec: Vec<f64>,
k: usize,
distance_type: DistanceType,
/// Whether the query has WHERE-clause filters. Used to choose between the
/// unfiltered HNSW path and the adaptive filtered path.
has_filters: bool,
schema: SchemaRef,
key_col: String,
scalar_kind: ScalarKind,
brute_force_threshold: f64,
/// Pre-planned provider scan for the filtered path (_key + filter cols only).
/// Used for selectivity estimation. None for the unfiltered path.
provider_scan: Option<Arc<dyn ExecutionPlan>>,
}
Comment on lines 232 to 244
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Now that registered is stored directly in SearchParams, the four fields schema, key_col, scalar_kind, and brute_force_threshold are redundant copies of registered.schema, registered.key_col, registered.scalar_kind, and registered.config.brute_force_selectivity_threshold. Since RegisteredTable is immutable after construction there's no correctness risk, but it doubles the storage for these values and means future readers have to wonder whether they might diverge.

Suggestion: remove the four flat fields and access them via params.registered.* at the call sites. The only call site that needs schema independently of registered is USearchExec::new (for PlanProperties) and RecordBatchStreamAdapter::new — both can use params.registered.schema.clone() directly.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Addressed in 85a83a6 — removed the four redundant flat fields from SearchParams and access them via params.registered.* at all call sites.


impl fmt::Debug for SearchParams {
fn fmt(&self, f: &mut fmt::Formatter<'_>) -> fmt::Result {
f.debug_struct("SearchParams")
.field("table_name", &self.table_name)
.field("k", &self.k)
.field("has_filters", &self.has_filters)
.field("schema", &self.registered.schema)
.field("key_col", &self.registered.key_col)
.field("scalar_kind", &self.registered.scalar_kind)
.field(
"brute_force_threshold",
&self.registered.config.brute_force_selectivity_threshold,
)
.field(
"provider_scan",
&self.provider_scan.as_ref().map(|_| "Some(..)"),
)
.finish_non_exhaustive()
}
}

// ── Physical execution node ───────────────────────────────────────────────────

/// Leaf execution plan that defers all I/O to execute() time.
Expand All @@ -273,7 +276,7 @@ pub struct USearchExec {
impl USearchExec {
fn new(params: SearchParams) -> Self {
let properties = PlanProperties::new(
EquivalenceProperties::new(params.schema.clone()),
EquivalenceProperties::new(params.registered.schema.clone()),
Partitioning::UnknownPartitioning(1),
EmissionType::Incremental,
Boundedness::Bounded,
Expand Down Expand Up @@ -340,7 +343,7 @@ impl ExecutionPlan for USearchExec {
});

Ok(Box::pin(RecordBatchStreamAdapter::new(
self.params.schema.clone(),
self.params.registered.schema.clone(),
stream,
)))
}
Expand All @@ -361,16 +364,7 @@ async fn usearch_execute(
params: SearchParams,
task_ctx: Arc<TaskContext>,
) -> Result<Vec<RecordBatch>> {
// Re-fetch at execute time and reload on cache miss so eviction between
// plan and execute is handled correctly for async-backed resolvers.
params.registry.ensure_loaded(&params.table_name).await?;

let registered = params.registry.resolve(&params.table_name).ok_or_else(|| {
DataFusionError::Execution(format!(
"USearchExec: table '{}' not available after ensure_loaded at execute time",
params.table_name
))
})?;
let registered = params.registered.clone();

if !params.has_filters {
// ── Unfiltered path ───────────────────────────────────────────────
Expand All @@ -385,7 +379,7 @@ async fn usearch_execute(
&registered.index,
&params.query_vec,
params.k,
params.scalar_kind,
registered.scalar_kind,
)?
};

Expand All @@ -404,7 +398,7 @@ async fn usearch_execute(
let data_batches = async {
registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.fetch_by_keys(&matches.keys, &registered.key_col, None)
.await
}
.instrument(tracing::info_span!(
Expand All @@ -415,7 +409,7 @@ async fn usearch_execute(

let key_col_idx = provider_key_col_idx(&registered)?;
let _span = tracing::info_span!("usearch_attach_distances").entered();
attach_distances(data_batches, key_col_idx, &key_to_dist, &params.schema)
attach_distances(data_batches, key_col_idx, &key_to_dist, &registered.schema)
} else {
// ── Adaptive filtered path ────────────────────────────────────────
let scan = params.provider_scan.clone().ok_or_else(|| {
Expand Down Expand Up @@ -489,7 +483,7 @@ async fn adaptive_filtered_execute(

let total = registered.index.size();
let selectivity = valid_keys.len() as f64 / total.max(1) as f64;
let threshold = params.brute_force_threshold;
let threshold = registered.config.brute_force_selectivity_threshold;

let path = if selectivity <= threshold {
"index-get"
Expand Down Expand Up @@ -535,7 +529,7 @@ async fn adaptive_filtered_execute(
let data_batches = async {
registered
.lookup_provider
.fetch_by_keys(&fetch_keys, &params.key_col, None)
.fetch_by_keys(&fetch_keys, &registered.key_col, None)
.await
}
.instrument(tracing::info_span!(
Expand All @@ -550,7 +544,7 @@ async fn adaptive_filtered_execute(
data_batches,
lookup_key_col_idx,
&key_to_dist,
&params.schema,
&registered.schema,
)?
};

Expand Down Expand Up @@ -591,7 +585,7 @@ async fn adaptive_filtered_execute(
let data_batches = async {
registered
.lookup_provider
.fetch_by_keys(&matches.keys, &params.key_col, None)
.fetch_by_keys(&matches.keys, &registered.key_col, None)
.await
}
.instrument(tracing::info_span!(
Expand All @@ -606,7 +600,7 @@ async fn adaptive_filtered_execute(
data_batches,
lookup_key_col_idx,
&key_to_dist,
&params.schema,
&registered.schema,
)?
};

Expand Down
37 changes: 21 additions & 16 deletions src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -230,30 +230,34 @@ pub struct VectorIndexMeta {

/// Trait for resolving vector index entries by name.
///
/// Split into two operations to accommodate DataFusion's mixed sync/async pipeline:
/// DataFusion's optimizer is synchronous but physical planning is async, so the
/// resolver exposes two levels:
/// - `peek()` — sync, cheap: used by the optimizer rule to decide if ANN rewrite applies
/// - `resolve()` — sync: returns fully loaded entry from cache (for executor hot path)
/// - `ensure_loaded()` — async: loads the index on cache miss (for planner/executor)
/// - `resolve()` — sync, cache-only: used by synchronous callers such as the UDTF
/// - `prepare()` — async: returns a query-stable loaded entry for planning/execution
///
/// The built-in [`USearchRegistry`] implements all three as direct hashmap lookups
/// (always cached). Production systems can implement catalog-backed loading in
/// `ensure_loaded`.
/// The important contract is that `prepare()` returns the exact loaded
/// [`RegisteredTable`] the query should execute against. This avoids a second
/// lookup during execution and removes the planner/execute handoff gap where a
/// newer generation could replace the registry entry mid-query.
#[async_trait::async_trait]
pub trait VectorIndexResolver: Send + Sync + std::fmt::Debug {
/// Sync, cheap: check if a vector index exists and return its metadata.
/// Used by the optimizer rule to decide if the ANN rewrite should apply.
/// Must NOT do I/O — only check local cache or lightweight state.
fn peek(&self, name: &str) -> Option<VectorIndexMeta>;

/// Sync: return a fully loaded entry from cache.
/// Returns `None` on cache miss — the caller should call `ensure_loaded` first.
/// Sync, cache-only: return a fully loaded entry from local cache.
/// Returns `None` on cache miss.
fn resolve(&self, name: &str) -> Option<Arc<RegisteredTable>>;

/// Async: ensure the index is loaded and available in cache.
/// On cache miss or stale entry, downloads files and loads the index.
/// On cache hit with current generation, returns immediately.
/// Called from the async physical planner before building the execution plan.
async fn ensure_loaded(&self, name: &str) -> datafusion::common::Result<()>;
/// Async: return a query-stable loaded entry.
///
/// On cache miss or stale entry, implementations may download files and
/// load the current generation. On success, the returned [`Arc`] must stay
/// valid for the lifetime of the query even if a newer generation is loaded
/// concurrently afterwards.
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>>;
}

// ── USearchRegistry ───────────────────────────────────────────────────────────
Expand Down Expand Up @@ -438,9 +442,10 @@ impl VectorIndexResolver for USearchRegistry {
self.get(name)
}

async fn ensure_loaded(&self, _name: &str) -> datafusion::common::Result<()> {
// USearchRegistry is always fully cached — nothing to load.
Ok(())
async fn prepare(&self, name: &str) -> datafusion::common::Result<Arc<RegisteredTable>> {
self.get(name).ok_or_else(|| {
DataFusionError::Execution(format!("USearchRegistry: table '{name}' is not loaded"))
})
}
}

Expand Down
4 changes: 2 additions & 2 deletions src/udtf.rs
Original file line number Diff line number Diff line change
Expand Up @@ -44,8 +44,8 @@ use crate::registry::VectorIndexResolver;
///
/// This entry point is synchronous. For async-backed [`VectorIndexResolver`]
/// implementations, it only works when the target index is already loaded in
/// the local cache. `vector_usearch()` does not call `ensure_loaded()` and
/// cannot trigger async index loads.
/// the local cache. `vector_usearch()` does not call `prepare()` and cannot
/// trigger async index loads.
pub struct USearchUDTF {
registry: Arc<dyn VectorIndexResolver>,
}
Expand Down
Loading