Adds async QQL execution and programmatic gRPC transport support.#50
Adds async QQL execution and programmatic gRPC transport support.#50srimon12 wants to merge 3 commits into
Conversation
- Moved various utility functions from `executor.py` to a new `utils.py` file for better organization and maintainability. - Simplified the `_resolve_topology` method by utilizing the new `collection_topology_kwargs` function. - Updated the `_build_dense_point_vector` and `_extract_point_id_and_payload` methods to use the new utility functions. - Enhanced error handling in the `_ensure_collection` method to catch specific exceptions. - Improved the handling of hybrid queries and prefetching by introducing `build_hybrid_prefetches` and `build_dense_query` functions. - Added tests for the `AsyncConnection` class to ensure proper initialization, query execution, and lifecycle management. - Updated existing tests to cover new functionality and ensure backward compatibility.
There was a problem hiding this comment.
Actionable comments posted: 2
🧹 Nitpick comments (1)
src/qql/connection.py (1)
80-84: ⚡ Quick winAnnotate
client_kwargsto keep strict type-checkers happy
client_kwargsis built fromurl/api_key/verifyso its value type is inferred asstr | bool | None; whenprefer_grpcis enabled,grpc_port: intis added, which can trip mypy/pyright in strict mode. CI only runspytest(no mypy/pyright step/config found), so this may not surface automatically.♻️ Proposed annotation
- client_kwargs = {"url": url, "api_key": secret, "verify": verify} + client_kwargs: dict[str, object] = {"url": url, "api_key": secret, "verify": verify} if prefer_grpc: client_kwargs["prefer_grpc"] = True client_kwargs["grpc_port"] = grpc_port🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@src/qql/connection.py` around lines 80 - 84, client_kwargs is inferred as a mixed str|bool|None and later extended with an int (grpc_port), which can break strict type-checkers; annotate client_kwargs explicitly (e.g., as Dict[str, Any] or Dict[str, object]) before populating it in the block that builds client_kwargs so adding grpc_port when prefer_grpc is True type-checks cleanly; update the local variable declaration near the use of client_kwargs in the QdrantClient construction (the block that references prefer_grpc and grpc_port) and import typing.Any/Dict as needed.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@src/qql/executor.py`:
- Around line 241-248: Non-NOT_FOUND exceptions from _fetch_collection_info()
are being re-raised raw for some transports, causing inconsistent behavior for
_resolve_topology()/SHOW COLLECTION; change the except branches in
_fetch_collection_info() so that when a caught ValueError or generic Exception
is NOT recognized as a "not found" (i.e., not matching f"Collection {name} not
found" and not passing is_grpc_not_found_error(e)), you wrap and raise a
QQLRuntimeError (preserving the original exception as the cause) instead of
re-raising the raw exception; also normalize UnexpectedResponse cases by
converting them to QQLRuntimeError so all transports surface non-NOT_FOUND
errors consistently to callers like _resolve_topology().
In `@tests/test_async_connection.py`:
- Around line 76-105: The tests are asserting AsyncMock invocation but not that
the coroutine was awaited; update the assertions in the AsyncConnection tests to
use awaited checks: replace mock_executor.execute.assert_called_once() with
mock_executor.execute.assert_awaited_once() in
test_run_query_calls_executor_execute, and replace
executor_cls.assert_called_once() / assert mock_executor.execute.call_count == 3
with executor_cls.assert_called_once() (keep as-is for construction) and assert
mock_executor.execute.await_count == 3 (or use
mock_executor.execute.assert_awaited_count(3) if available) in
test_executor_instance_reused_across_queries so you verify the coroutine was
actually awaited across runs of AsyncConnection.run_query that use
AsyncExecutor.
---
Nitpick comments:
In `@src/qql/connection.py`:
- Around line 80-84: client_kwargs is inferred as a mixed str|bool|None and
later extended with an int (grpc_port), which can break strict type-checkers;
annotate client_kwargs explicitly (e.g., as Dict[str, Any] or Dict[str, object])
before populating it in the block that builds client_kwargs so adding grpc_port
when prefer_grpc is True type-checks cleanly; update the local variable
declaration near the use of client_kwargs in the QdrantClient construction (the
block that references prefer_grpc and grpc_port) and import typing.Any/Dict as
needed.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 00eaaace-0177-41a8-a51f-2efe19ad0c56
📒 Files selected for processing (11)
README.mdbenchmark/qql_transport_benchmark.pydocs/programmatic.mdsrc/qql/__init__.pysrc/qql/async_connection.pysrc/qql/async_executor.pysrc/qql/connection.pysrc/qql/executor.pysrc/qql/utils.pytests/test_async_connection.pytests/test_connection.py
There was a problem hiding this comment.
Caution
Some comments are outside the diff and can’t be posted inline due to platform limitations.
⚠️ Outside diff range comments (2)
tests/test_async_connection.py (2)
138-138:⚠️ Potential issue | 🟠 Major | ⚡ Quick winIncomplete fix: use
assert_awaited_once()for AsyncMock.close checks.Lines 86 and 105 were correctly updated to use await-specific assertions, but these three assertions on
mock_client.close(an AsyncMock method) still useassert_called_once(). These must beassert_awaited_once()to verify the coroutine is actually awaited—otherwise the tests could pass even ifAsyncConnection.close()or the context manager stops awaiting the client'sclose().🐛 Proposed fix
async def test_close_calls_client_close(self, mocker): mock_client = AsyncMock() mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client) conn = AsyncConnection("http://localhost:6333") await conn.close() - mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once() async def test_context_manager_closes_on_exit(self, mocker): mock_client = AsyncMock() mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client) async with AsyncConnection("http://localhost:6333") as conn: assert conn._client is mock_client - mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once() async def test_context_manager_closes_on_exception(self, mocker): mock_client = AsyncMock() mocker.patch("qql.async_connection.AsyncQdrantClient", return_value=mock_client) with pytest.raises(RuntimeError, match="oops"): async with AsyncConnection("http://localhost:6333"): raise RuntimeError("oops") - mock_client.close.assert_called_once() + mock_client.close.assert_awaited_once()Also applies to: 147-147, 157-157
🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_async_connection.py` at line 138, The test uses mock_client.close (an AsyncMock) with assert_called_once(), which doesn't verify the coroutine was awaited; change those assertions to assert_awaited_once() for each occurrence (e.g., the assertion at mock_client.close.assert_called_once() and the similar occurrences noted at lines 147 and 157) to ensure AsyncConnection.close() or the async context manager actually awaits mock_client.close; search for mock_client.close in tests/test_async_connection.py and replace assert_called_once() with assert_awaited_once() where the mock is an AsyncMock.
301-301:⚠️ Potential issue | 🟠 Major | ⚡ Quick winIncomplete fix: use
assert_awaited_once()for AsyncMock on collection creation.
_create_collection_and_waitis patched withnew_callable=AsyncMock(line 209), but the assertion on line 301 still usesassert_called_once(). This must beassert_awaited_once()to verify the async method is actually awaited. Otherwise the test could pass if the executor invokes but never awaits the creation coroutine, missing concurrency bugs.🐛 Proposed fix
- executor._create_collection_and_wait.assert_called_once() + executor._create_collection_and_wait.assert_awaited_once()🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_async_connection.py` at line 301, The test currently asserts on the AsyncMock `_create_collection_and_wait` with `assert_called_once()`, which doesn't verify it was awaited; replace that call with `assert_awaited_once()` so the test verifies the async method was actually awaited (keep using the existing AsyncMock patch of `_create_collection_and_wait` to ensure the awaited assertion is available).
🧹 Nitpick comments (1)
tests/test_executor.py (1)
89-101: 💤 Low valueConsider clearer test method names.
The double-negative phrasing in
test_non_not_found_value_error_is_wrappedmakes the intent harder to parse. Consider renaming to something liketest_value_error_wrapped_as_qql_runtime_errorandtest_runtime_error_wrapped_as_qql_runtime_errorfor better clarity.♻️ Suggested rename
- def test_non_not_found_value_error_is_wrapped(self, executor, mock_client): + def test_value_error_wrapped_as_qql_runtime_error(self, executor, mock_client): mock_client.get_collection.side_effect = ValueError("transport failed") with pytest.raises(QQLRuntimeError, match="Qdrant error fetching collection"): executor._fetch_collection_info("docs") - def test_non_not_found_exception_is_wrapped(self, executor, mock_client): + def test_runtime_error_wrapped_as_qql_runtime_error(self, executor, mock_client): mock_client.get_collection.side_effect = RuntimeError("transport failed") with pytest.raises(QQLRuntimeError, match="Qdrant error fetching collection"): executor._fetch_collection_info("docs")🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the rest with a brief reason, keep changes minimal, and validate. In `@tests/test_executor.py` around lines 89 - 101, Rename the confusing double-negative test method names in TestFetchCollectionInfo for clarity: change test_non_not_found_value_error_is_wrapped to test_value_error_wrapped_as_qql_runtime_error and test_non_not_found_exception_is_wrapped to test_runtime_error_wrapped_as_qql_runtime_error; update the method names where they are defined in the class TestFetchCollectionInfo and any references to them (e.g., test discovery or calls), leaving the test bodies and assertions (executor._fetch_collection_info, mock_client.get_collection and the pytest.raises checks) unchanged.
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Outside diff comments:
In `@tests/test_async_connection.py`:
- Line 138: The test uses mock_client.close (an AsyncMock) with
assert_called_once(), which doesn't verify the coroutine was awaited; change
those assertions to assert_awaited_once() for each occurrence (e.g., the
assertion at mock_client.close.assert_called_once() and the similar occurrences
noted at lines 147 and 157) to ensure AsyncConnection.close() or the async
context manager actually awaits mock_client.close; search for mock_client.close
in tests/test_async_connection.py and replace assert_called_once() with
assert_awaited_once() where the mock is an AsyncMock.
- Line 301: The test currently asserts on the AsyncMock
`_create_collection_and_wait` with `assert_called_once()`, which doesn't verify
it was awaited; replace that call with `assert_awaited_once()` so the test
verifies the async method was actually awaited (keep using the existing
AsyncMock patch of `_create_collection_and_wait` to ensure the awaited assertion
is available).
---
Nitpick comments:
In `@tests/test_executor.py`:
- Around line 89-101: Rename the confusing double-negative test method names in
TestFetchCollectionInfo for clarity: change
test_non_not_found_value_error_is_wrapped to
test_value_error_wrapped_as_qql_runtime_error and
test_non_not_found_exception_is_wrapped to
test_runtime_error_wrapped_as_qql_runtime_error; update the method names where
they are defined in the class TestFetchCollectionInfo and any references to them
(e.g., test discovery or calls), leaving the test bodies and assertions
(executor._fetch_collection_info, mock_client.get_collection and the
pytest.raises checks) unchanged.
ℹ️ Review info
⚙️ Run configuration
Configuration used: Repository UI
Review profile: CHILL
Plan: Pro Plus
Run ID: 9acc6d91-0f08-49aa-afd2-57f6859d63f4
📒 Files selected for processing (4)
src/qql/connection.pysrc/qql/executor.pytests/test_async_connection.pytests/test_executor.py
🚧 Files skipped from review as they are similar to previous changes (2)
- src/qql/connection.py
- src/qql/executor.py
… improve error handling in executor tests
|
The lock-based collection creation guard in AsyncExecutor is a good idea for a single-process async app. But QQL is often used as a library inside FastAPI/uvicorn which runs multiple worker processes, and the lock is not shared across processes. A concurrent insert from two different workers will still race. This is a real limitation and the docs say nothing about it. At minimum, the docstring should say "this serializes concurrent creation within a single process; cross-process races are not prevented." |
|
Benchmark measures FastEmbed inference time, not Qdrant transport latency Pre-compute and cache embeddings before the timing loop (call Embedder().embed() once, store the vector, inject it directly) |
|
anyio test dependency is likely undeclared |
|
|
pavanjava
left a comment
There was a problem hiding this comment.
Needs the review comments to be addressed before merging.
| from dataclasses import dataclass | ||
| from typing import Any | ||
|
|
||
| from grpc import RpcError, StatusCode |
There was a problem hiding this comment.
utils.py is imported unconditionally by both executor.py and async_executor.py. That means every QQL user — even those who only use REST and never touch gRPC — must now have the grpc package installed. This was previously optional (a Qdrant client extra). This is a silent breaking dependency change. Fix: guard it with a try/except ImportError or use a lazy import inside is_grpc_not_found_error(). Better yet, keep this helper local to async_executor.py where it's actually needed.
There was a problem hiding this comment.
Issue #42 (previously merged) introduced _topology_from_collection_info(info) → CollectionTopology on the Executor class specifically to make topology parsing the single, canonical path — one method, one place, tested. This PR introduces collection_topology_kwargs(vectors, sparse_vectors) → dict in utils.py that does the exact same parsing and returns a raw dict. Now the topology logic lives in two places:
executor.py → _topology_from_collection_info() → returns CollectionTopology
utils.py → collection_topology_kwargs() → returns dict
These will diverge the moment someone fixes a bug in one and not the other (e.g., a future change to how dense_sizes is handled). AsyncExecutor should call _topology_from_collection_info() from the base class, not a parallel util function.
| ) | ||
|
|
||
|
|
||
| def parse_search_lookup(parser: Any) -> tuple[str, str | None] | None: |
There was a problem hiding this comment.
parse_search_using, parse_search_lookup, parse_search_with, parse_search_group_by, _parse_hybrid_using, _parse_sparse_using are all parser-layer code that calls private parser methods (_peek(), _advance(), _expect(), _parse_identifier(), _parse_field_path()). They were moved to utils.py to avoid a circular import, but the solution is worse than the problem. They belong in parser.py. The circular import can be avoided by putting the shared dataclasses (SearchUsingOptions, SearchGroupByOptions) in ast_nodes.py or a small parser_types.py that both modules import from. Using Any to paper over a circular import is a code smell that will bite you when the parser's internal API changes.
| _COLLECTION_VISIBILITY_TIMEOUT_SECONDS = 5.0 | ||
| _COLLECTION_VISIBILITY_POLL_SECONDS = 0.05 | ||
|
|
||
|
|
There was a problem hiding this comment.
AsyncExecutor inheriting from sync Executor is the wrong model
If AsyncExecutor(Executor) actually inherits from the sync class, then:
The base Executor.init stores a sync QdrantClient; AsyncExecutor.init stores an AsyncQdrantClient — the self._client type contracts are incompatible
Any sync method accidentally called through super() will hand a sync method an async client, and the failure will be a runtime error deep inside Qdrant's SDK, not a clear "you called a sync method" error
It creates confusion about which methods are overridden vs. inherited and still sync
A cleaner design is a shared abstract base (BaseExecutor) that holds config and common logic like _build_filter, _topology_from_collection_info, with no client coupling, and two concrete subclasses (Executor, AsyncExecutor) that each own their own client.
| result = await executor.execute(node) | ||
| assert result.success is True | ||
| mock_embed.assert_called_once_with("neurology") | ||
|
|
There was a problem hiding this comment.
test_race_condition_collection_creation is fragile and tests the mock, not the code
The test preloads a sequence of 4 CollectionTopology objects via side_effect and asserts _create_collection_and_wait was called once. But the sequence assumes very specific internal call ordering that will break if the implementation changes the number of _resolve_topology() calls. It is testing the fixture, not the race-safety logic. A better test would use real async timing (a asyncio.Event to gate creation) or at minimum mock at the get_collection level rather than _resolve_topology.
Summary
Adds async QQL execution and programmatic gRPC transport support.
What Changed
AsyncConnectionAsyncExecutorprefer_grpc/grpc_portsupport for sync, async, andrun_queryutils.pybenchmark/qql_transport_benchmark.pyBenchmark
Command:
Environment:
http://localhost:63336334SEARCHqueriesSingle-flight latency
Async concurrent throughput
Benchmark Notes
Async does not improve single-query latency in this benchmark. The single-flight async path is slower because QQL still performs local FastEmbed embedding before the Qdrant request, and async adds coroutine/client overhead.
The value of async here is async application support and concurrent throughput. With x10 concurrency, async gRPC reached
81.68 ops/sec, compared to71.10 ops/secfor single-flight sync gRPC.Validation
Result:
Summary by CodeRabbit
New Features
Documentation
Tests