Skip to content

Adds async QQL execution and programmatic gRPC transport support.#50

Open
srimon12 wants to merge 3 commits into
pavanjava:mainfrom
srimon12:feat/grpc-async
Open

Adds async QQL execution and programmatic gRPC transport support.#50
srimon12 wants to merge 3 commits into
pavanjava:mainfrom
srimon12:feat/grpc-async

Conversation

@srimon12
Copy link
Copy Markdown
Collaborator

@srimon12 srimon12 commented May 30, 2026

Summary

Adds async QQL execution and programmatic gRPC transport support.

What Changed

  • Added AsyncConnection
  • Added AsyncExecutor
  • Added prefer_grpc / grpc_port support for sync, async, and run_query
  • Moved shared executor/helper logic into utils.py
  • Removed duplicate hybrid prefetch setup across flat/grouped and sync/async search
  • Added async/gRPC tests and docs
  • Added benchmark/qql_transport_benchmark.py

Benchmark

Command:

uv run python benchmark/qql_transport_benchmark.py

Environment:

  • Local Qdrant: http://localhost:6333
  • gRPC port: 6334
  • 50 measured SEARCH queries
  • 5 warmup queries
  • Local FastEmbed dense embedding enabled and warmed before timing

Single-flight latency

Mode Total ms Avg ms/op Ops/sec
sync REST 976.93 19.54 51.18
async REST 1,201.62 24.03 41.61
sync gRPC 703.23 14.06 71.10
async gRPC 763.53 15.27 65.49

Async concurrent throughput

Mode Total ms Avg ms/op Ops/sec
async REST x10 775.90 15.52 64.44
async gRPC x10 612.11 12.24 81.68

Benchmark Notes

Async does not improve single-query latency in this benchmark. The single-flight async path is slower because QQL still performs local FastEmbed embedding before the Qdrant request, and async adds coroutine/client overhead.

The value of async here is async application support and concurrent throughput. With x10 concurrency, async gRPC reached 81.68 ops/sec, compared to 71.10 ops/sec for single-flight sync gRPC.

Validation

uv run pytest
python3 -m compileall src/qql
git diff --check

Result:

633 passed

Summary by CodeRabbit

  • New Features

    • Non-blocking async client and executor for QQL queries; run_query and connections can opt to prefer gRPC and set a gRPC port.
    • New benchmark script to measure sync/async/gRPC query latency and throughput.
  • Documentation

    • Programmatic guides updated with async examples, gRPC transport options, and README badge/count refreshed (549 → 633).
  • Tests

    • New async and executor tests covering lifecycle, concurrency, topology, and error handling.

- Moved various utility functions from `executor.py` to a new `utils.py` file for better organization and maintainability.
- Simplified the `_resolve_topology` method by utilizing the new `collection_topology_kwargs` function.
- Updated the `_build_dense_point_vector` and `_extract_point_id_and_payload` methods to use the new utility functions.
- Enhanced error handling in the `_ensure_collection` method to catch specific exceptions.
- Improved the handling of hybrid queries and prefetching by introducing `build_hybrid_prefetches` and `build_dense_query` functions.
- Added tests for the `AsyncConnection` class to ensure proper initialization, query execution, and lifecycle management.
- Updated existing tests to cover new functionality and ensure backward compatibility.
Repository owner deleted a comment from coderabbitai Bot May 30, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Actionable comments posted: 2

🧹 Nitpick comments (1)
src/qql/connection.py (1)

80-84: ⚡ Quick win

Annotate client_kwargs to keep strict type-checkers happy

client_kwargs is built from url/api_key/verify so its value type is inferred as str | bool | None; when prefer_grpc is enabled, grpc_port: int is added, which can trip mypy/pyright in strict mode. CI only runs pytest (no mypy/pyright step/config found), so this may not surface automatically.

♻️ Proposed annotation
-        client_kwargs = {"url": url, "api_key": secret, "verify": verify}
+        client_kwargs: dict[str, object] = {"url": url, "api_key": secret, "verify": verify}
         if prefer_grpc:
             client_kwargs["prefer_grpc"] = True
             client_kwargs["grpc_port"] = grpc_port
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@src/qql/connection.py` around lines 80 - 84, client_kwargs is inferred as a
mixed str|bool|None and later extended with an int (grpc_port), which can break
strict type-checkers; annotate client_kwargs explicitly (e.g., as Dict[str, Any]
or Dict[str, object]) before populating it in the block that builds
client_kwargs so adding grpc_port when prefer_grpc is True type-checks cleanly;
update the local variable declaration near the use of client_kwargs in the
QdrantClient construction (the block that references prefer_grpc and grpc_port)
and import typing.Any/Dict as needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Inline comments:
In `@src/qql/executor.py`:
- Around line 241-248: Non-NOT_FOUND exceptions from _fetch_collection_info()
are being re-raised raw for some transports, causing inconsistent behavior for
_resolve_topology()/SHOW COLLECTION; change the except branches in
_fetch_collection_info() so that when a caught ValueError or generic Exception
is NOT recognized as a "not found" (i.e., not matching f"Collection {name} not
found" and not passing is_grpc_not_found_error(e)), you wrap and raise a
QQLRuntimeError (preserving the original exception as the cause) instead of
re-raising the raw exception; also normalize UnexpectedResponse cases by
converting them to QQLRuntimeError so all transports surface non-NOT_FOUND
errors consistently to callers like _resolve_topology().

In `@tests/test_async_connection.py`:
- Around line 76-105: The tests are asserting AsyncMock invocation but not that
the coroutine was awaited; update the assertions in the AsyncConnection tests to
use awaited checks: replace mock_executor.execute.assert_called_once() with
mock_executor.execute.assert_awaited_once() in
test_run_query_calls_executor_execute, and replace
executor_cls.assert_called_once() / assert mock_executor.execute.call_count == 3
with executor_cls.assert_called_once() (keep as-is for construction) and assert
mock_executor.execute.await_count == 3 (or use
mock_executor.execute.assert_awaited_count(3) if available) in
test_executor_instance_reused_across_queries so you verify the coroutine was
actually awaited across runs of AsyncConnection.run_query that use
AsyncExecutor.

---

Nitpick comments:
In `@src/qql/connection.py`:
- Around line 80-84: client_kwargs is inferred as a mixed str|bool|None and
later extended with an int (grpc_port), which can break strict type-checkers;
annotate client_kwargs explicitly (e.g., as Dict[str, Any] or Dict[str, object])
before populating it in the block that builds client_kwargs so adding grpc_port
when prefer_grpc is True type-checks cleanly; update the local variable
declaration near the use of client_kwargs in the QdrantClient construction (the
block that references prefer_grpc and grpc_port) and import typing.Any/Dict as
needed.
🪄 Autofix (Beta)

Fix all unresolved CodeRabbit comments on this PR:

  • Push a commit to this branch (recommended)
  • Create a new PR with the fixes

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 00eaaace-0177-41a8-a51f-2efe19ad0c56

📥 Commits

Reviewing files that changed from the base of the PR and between 9172127 and 7a21721.

📒 Files selected for processing (11)
  • README.md
  • benchmark/qql_transport_benchmark.py
  • docs/programmatic.md
  • src/qql/__init__.py
  • src/qql/async_connection.py
  • src/qql/async_executor.py
  • src/qql/connection.py
  • src/qql/executor.py
  • src/qql/utils.py
  • tests/test_async_connection.py
  • tests/test_connection.py

Comment thread src/qql/executor.py Outdated
Comment thread tests/test_async_connection.py Outdated
@srimon12 srimon12 requested a review from pavanjava May 31, 2026 02:01
@srimon12 srimon12 added the enhancement New feature or request label May 31, 2026
Copy link
Copy Markdown

@coderabbitai coderabbitai Bot left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Caution

Some comments are outside the diff and can’t be posted inline due to platform limitations.

⚠️ Outside diff range comments (2)
tests/test_async_connection.py (2)

138-138: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Incomplete fix: use assert_awaited_once() for AsyncMock.close checks.

Lines 86 and 105 were correctly updated to use await-specific assertions, but these three assertions on mock_client.close (an AsyncMock method) still use assert_called_once(). These must be assert_awaited_once() to verify the coroutine is actually awaited—otherwise the tests could pass even if AsyncConnection.close() or the context manager stops awaiting the client's close().

🐛 Proposed fix
     async def test_close_calls_client_close(self, mocker):
         mock_client = AsyncMock()
         mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client)
         conn = AsyncConnection("http://localhost:6333")
         await conn.close()
-        mock_client.close.assert_called_once()
+        mock_client.close.assert_awaited_once()

     async def test_context_manager_closes_on_exit(self, mocker):
         mock_client = AsyncMock()
         mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client)

         async with AsyncConnection("http://localhost:6333") as conn:
             assert conn._client is mock_client

-        mock_client.close.assert_called_once()
+        mock_client.close.assert_awaited_once()

     async def test_context_manager_closes_on_exception(self, mocker):
         mock_client = AsyncMock()
         mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client)

         with pytest.raises(RuntimeError, match="oops"):
             async with AsyncConnection("http://localhost:6333"):
                 raise RuntimeError("oops")

-        mock_client.close.assert_called_once()
+        mock_client.close.assert_awaited_once()

Also applies to: 147-147, 157-157

🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_async_connection.py` at line 138, The test uses mock_client.close
(an AsyncMock) with assert_called_once(), which doesn't verify the coroutine was
awaited; change those assertions to assert_awaited_once() for each occurrence
(e.g., the assertion at mock_client.close.assert_called_once() and the similar
occurrences noted at lines 147 and 157) to ensure AsyncConnection.close() or the
async context manager actually awaits mock_client.close; search for
mock_client.close in tests/test_async_connection.py and replace
assert_called_once() with assert_awaited_once() where the mock is an AsyncMock.

301-301: ⚠️ Potential issue | 🟠 Major | ⚡ Quick win

Incomplete fix: use assert_awaited_once() for AsyncMock on collection creation.

_create_collection_and_wait is patched with new_callable=AsyncMock (line 209), but the assertion on line 301 still uses assert_called_once(). This must be assert_awaited_once() to verify the async method is actually awaited. Otherwise the test could pass if the executor invokes but never awaits the creation coroutine, missing concurrency bugs.

🐛 Proposed fix
-        executor._create_collection_and_wait.assert_called_once()
+        executor._create_collection_and_wait.assert_awaited_once()
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_async_connection.py` at line 301, The test currently asserts on
the AsyncMock `_create_collection_and_wait` with `assert_called_once()`, which
doesn't verify it was awaited; replace that call with `assert_awaited_once()` so
the test verifies the async method was actually awaited (keep using the existing
AsyncMock patch of `_create_collection_and_wait` to ensure the awaited assertion
is available).
🧹 Nitpick comments (1)
tests/test_executor.py (1)

89-101: 💤 Low value

Consider clearer test method names.

The double-negative phrasing in test_non_not_found_value_error_is_wrapped makes the intent harder to parse. Consider renaming to something like test_value_error_wrapped_as_qql_runtime_error and test_runtime_error_wrapped_as_qql_runtime_error for better clarity.

♻️ Suggested rename
-    def test_non_not_found_value_error_is_wrapped(self, executor, mock_client):
+    def test_value_error_wrapped_as_qql_runtime_error(self, executor, mock_client):
         mock_client.get_collection.side_effect = ValueError("transport failed")

         with pytest.raises(QQLRuntimeError, match="Qdrant error fetching collection"):
             executor._fetch_collection_info("docs")

-    def test_non_not_found_exception_is_wrapped(self, executor, mock_client):
+    def test_runtime_error_wrapped_as_qql_runtime_error(self, executor, mock_client):
         mock_client.get_collection.side_effect = RuntimeError("transport failed")

         with pytest.raises(QQLRuntimeError, match="Qdrant error fetching collection"):
             executor._fetch_collection_info("docs")
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

In `@tests/test_executor.py` around lines 89 - 101, Rename the confusing
double-negative test method names in TestFetchCollectionInfo for clarity: change
test_non_not_found_value_error_is_wrapped to
test_value_error_wrapped_as_qql_runtime_error and
test_non_not_found_exception_is_wrapped to
test_runtime_error_wrapped_as_qql_runtime_error; update the method names where
they are defined in the class TestFetchCollectionInfo and any references to them
(e.g., test discovery or calls), leaving the test bodies and assertions
(executor._fetch_collection_info, mock_client.get_collection and the
pytest.raises checks) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.

Outside diff comments:
In `@tests/test_async_connection.py`:
- Line 138: The test uses mock_client.close (an AsyncMock) with
assert_called_once(), which doesn't verify the coroutine was awaited; change
those assertions to assert_awaited_once() for each occurrence (e.g., the
assertion at mock_client.close.assert_called_once() and the similar occurrences
noted at lines 147 and 157) to ensure AsyncConnection.close() or the async
context manager actually awaits mock_client.close; search for mock_client.close
in tests/test_async_connection.py and replace assert_called_once() with
assert_awaited_once() where the mock is an AsyncMock.
- Line 301: The test currently asserts on the AsyncMock
`_create_collection_and_wait` with `assert_called_once()`, which doesn't verify
it was awaited; replace that call with `assert_awaited_once()` so the test
verifies the async method was actually awaited (keep using the existing
AsyncMock patch of `_create_collection_and_wait` to ensure the awaited assertion
is available).

---

Nitpick comments:
In `@tests/test_executor.py`:
- Around line 89-101: Rename the confusing double-negative test method names in
TestFetchCollectionInfo for clarity: change
test_non_not_found_value_error_is_wrapped to
test_value_error_wrapped_as_qql_runtime_error and
test_non_not_found_exception_is_wrapped to
test_runtime_error_wrapped_as_qql_runtime_error; update the method names where
they are defined in the class TestFetchCollectionInfo and any references to them
(e.g., test discovery or calls), leaving the test bodies and assertions
(executor._fetch_collection_info, mock_client.get_collection and the
pytest.raises checks) unchanged.

ℹ️ Review info
⚙️ Run configuration

Configuration used: Repository UI

Review profile: CHILL

Plan: Pro Plus

Run ID: 9acc6d91-0f08-49aa-afd2-57f6859d63f4

📥 Commits

Reviewing files that changed from the base of the PR and between 7a21721 and 712b0bf.

📒 Files selected for processing (4)
  • src/qql/connection.py
  • src/qql/executor.py
  • tests/test_async_connection.py
  • tests/test_executor.py
🚧 Files skipped from review as they are similar to previous changes (2)
  • src/qql/connection.py
  • src/qql/executor.py

Repository owner deleted a comment from coderabbitai Bot May 31, 2026
@pavanjava
Copy link
Copy Markdown
Owner

The lock-based collection creation guard in AsyncExecutor is a good idea for a single-process async app. But QQL is often used as a library inside FastAPI/uvicorn which runs multiple worker processes, and the lock is not shared across processes. A concurrent insert from two different workers will still race. This is a real limitation and the docs say nothing about it. At minimum, the docstring should say "this serializes concurrent creation within a single process; cross-process races are not prevented."

@pavanjava
Copy link
Copy Markdown
Owner

Benchmark measures FastEmbed inference time, not Qdrant transport latency
DOCS[8] = "Benchmark results should separate setup cost from measured query latency"
The benchmark is literally documenting its own flaw. The FastEmbed model runs locally and for a 384-dim model on CPU can take 5–20ms. The gRPC vs REST delta at localhost is 1–3ms. The model inference noise swamps the signal you're trying to measure. The benchmark should either:

Pre-compute and cache embeddings before the timing loop (call Embedder().embed() once, store the vector, inject it directly)
Or make clear in the output that it's measuring end-to-end latency including embedding
The PR's own benchmark results in the description (14ms/op for sync gRPC vs presumably ~12ms for REST?) need to be read with this in mind.

@pavanjava
Copy link
Copy Markdown
Owner

anyio test dependency is likely undeclared
test_async_connection.py uses @pytest.mark.anyio and an anyio_backend fixture. anyio is not a standard pytest dependency — it requires pytest-anyio or anyio[pytest]. If this isn't added to pyproject.toml's [project.optional-dependencies] test group, CI will silently skip or error on all async tests. The PR description says "633 tests passed" which suggests it was installed locally, but it needs to be in the lockfile.

@pavanjava
Copy link
Copy Markdown
Owner

SearchUsingOptions and SearchGroupByOptions belong in ast_nodes.py
These are frozen dataclasses representing parsed structure from a query. They are AST-adjacent and would fit naturally in ast_nodes.py (or a parser_types.py). Putting them in utils.py fragments type definitions across the codebase and makes it harder to understand the AST boundary.

Copy link
Copy Markdown
Owner

@pavanjava pavanjava left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Needs the review comments to be addressed before merging.

Comment thread src/qql/utils.py
from dataclasses import dataclass
from typing import Any

from grpc import RpcError, StatusCode
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

utils.py is imported unconditionally by both executor.py and async_executor.py. That means every QQL user — even those who only use REST and never touch gRPC — must now have the grpc package installed. This was previously optional (a Qdrant client extra). This is a silent breaking dependency change. Fix: guard it with a try/except ImportError or use a lazy import inside is_grpc_not_found_error(). Better yet, keep this helper local to async_executor.py where it's actually needed.

Comment thread src/qql/utils.py
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Issue #42 (previously merged) introduced _topology_from_collection_info(info) → CollectionTopology on the Executor class specifically to make topology parsing the single, canonical path — one method, one place, tested. This PR introduces collection_topology_kwargs(vectors, sparse_vectors) → dict in utils.py that does the exact same parsing and returns a raw dict. Now the topology logic lives in two places:

executor.py → _topology_from_collection_info() → returns CollectionTopology
utils.py → collection_topology_kwargs() → returns dict
These will diverge the moment someone fixes a bug in one and not the other (e.g., a future change to how dense_sizes is handled). AsyncExecutor should call _topology_from_collection_info() from the base class, not a parallel util function.

Comment thread src/qql/utils.py
)


def parse_search_lookup(parser: Any) -> tuple[str, str | None] | None:
Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

parse_search_using, parse_search_lookup, parse_search_with, parse_search_group_by, _parse_hybrid_using, _parse_sparse_using are all parser-layer code that calls private parser methods (_peek(), _advance(), _expect(), _parse_identifier(), _parse_field_path()). They were moved to utils.py to avoid a circular import, but the solution is worse than the problem. They belong in parser.py. The circular import can be avoided by putting the shared dataclasses (SearchUsingOptions, SearchGroupByOptions) in ast_nodes.py or a small parser_types.py that both modules import from. Using Any to paper over a circular import is a code smell that will bite you when the parser's internal API changes.

Comment thread src/qql/async_executor.py
_COLLECTION_VISIBILITY_TIMEOUT_SECONDS = 5.0
_COLLECTION_VISIBILITY_POLL_SECONDS = 0.05


Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AsyncExecutor inheriting from sync Executor is the wrong model
If AsyncExecutor(Executor) actually inherits from the sync class, then:

The base Executor.init stores a sync QdrantClient; AsyncExecutor.init stores an AsyncQdrantClient — the self._client type contracts are incompatible
Any sync method accidentally called through super() will hand a sync method an async client, and the failure will be a runtime error deep inside Qdrant's SDK, not a clear "you called a sync method" error
It creates confusion about which methods are overridden vs. inherited and still sync
A cleaner design is a shared abstract base (BaseExecutor) that holds config and common logic like _build_filter, _topology_from_collection_info, with no client coupling, and two concrete subclasses (Executor, AsyncExecutor) that each own their own client.

result = await executor.execute(node)
assert result.success is True
mock_embed.assert_called_once_with("neurology")

Copy link
Copy Markdown
Owner

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

test_race_condition_collection_creation is fragile and tests the mock, not the code
The test preloads a sequence of 4 CollectionTopology objects via side_effect and asserts _create_collection_and_wait was called once. But the sequence assumes very specific internal call ordering that will break if the implementation changes the number of _resolve_topology() calls. It is testing the fixture, not the race-safety logic. A better test would use real async timing (a asyncio.Event to gate creation) or at minimum mock at the get_collection level rather than _resolve_topology.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

enhancement New feature or request

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants