Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
8 changes: 8 additions & 0 deletions .github/workflows/regenerate.yml
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,14 @@ jobs:
'keywords = ["hotdata", "api-client", "data-platform"]',
re.MULTILINE,
),
# Insert [project.optional-dependencies] (for hotdata.arrow) just
# before [project.urls]. Run before the urls patch so the urls
# anchor is unchanged when this fires.
(
r'(\ndependencies = \[\n(?:[^\]]|\][^\n])*\]\n)\n(\[project\.urls\])',
r'\1\n[project.optional-dependencies]\narrow = ["pyarrow >= 14"]\n\n\2',
0,
),
(
r'\[project\.urls\]\nRepository = "[^"]*"\n',
'[project.urls]\nHomepage = "https://www.hotdata.dev"\nRepository = "https://github.com/hotdata-dev/sdk-python"\n',
Expand Down
29 changes: 29 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -60,6 +60,35 @@ with hotdata.ApiClient(configuration) as api_client:

Each `Api` class groups endpoints by resource. Construct the client, then call the typed methods you need.

## Arrow results

Query results can be fetched as an [Apache Arrow](https://arrow.apache.org/) IPC stream instead of JSON, which is faster and far more memory-efficient for large result sets. Install the optional extra:

```sh
pip install 'hotdata[arrow]'
```

Use `hotdata.arrow.ResultsApi` (a drop-in subclass of `ResultsApi` that adds Arrow methods):

```python
from hotdata import ApiClient, Configuration
from hotdata.arrow import ResultsApi

with ApiClient(Configuration(api_key="...", workspace_id="...")) as client:
results = ResultsApi(client)

# Buffered: returns a pyarrow.Table.
table = results.get_result_arrow(result_id)

# Streaming: yields a pyarrow.RecordBatchStreamReader without
# materializing the full table in memory.
with results.stream_result_arrow(result_id) as reader:
for batch in reader:
...
```

Both methods accept `offset` and `limit` for pagination. They raise `hotdata.arrow.ResultNotReadyError` if the result is still pending or processing — poll `results.get_result(result_id)` until `status == "ready"` first.

## API reference

Generated Markdown for every operation and model is in [`docs/`](https://github.com/hotdata-dev/sdk-python/tree/main/docs):
Expand Down
190 changes: 190 additions & 0 deletions hotdata/arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
"""Arrow IPC helpers for ``GET /v1/results/{id}``.

The auto-generated :class:`hotdata.api.results_api.ResultsApi` understands the
``format=arrow`` query parameter but cannot decode the
``application/vnd.apache.arrow.stream`` response body — openapi-generator picks
the JSON content variant for status 200 and routes Arrow bytes through the
JSON deserializer, which raises ``Unsupported content type``.

This module wraps the generated client with a thin subclass that:

* sets ``Accept: application/vnd.apache.arrow.stream`` and ``?format=arrow``,
* uses the generator's ``*_without_preload_content`` plumbing to hold the
underlying ``urllib3.HTTPResponse`` open as a byte stream,
* hands that stream to ``pyarrow.ipc.open_stream`` so callers get a
:class:`pyarrow.Table` (or a :class:`pyarrow.RecordBatchStreamReader` for
the streaming variant).

Install with ``pip install 'hotdata[arrow]'`` to pull in pyarrow.
"""

from __future__ import annotations

from contextlib import contextmanager
from typing import TYPE_CHECKING, Any, Dict, Iterator, Optional

from hotdata.api.results_api import ResultsApi as _GeneratedResultsApi
from hotdata.models.results_format_query import ResultsFormatQuery

if TYPE_CHECKING: # pragma: no cover - import-time only for type checkers
import pyarrow as pa # type: ignore[import-untyped]


ARROW_STREAM_MEDIA_TYPE = "application/vnd.apache.arrow.stream"


class ResultNotReadyError(Exception):
"""Raised when the result exists but is not yet ``ready``.

The server replies with HTTP 202 while a result is ``pending`` or
``processing``. Poll :meth:`ResultsApi.get_result` until ``status='ready'``
before fetching as Arrow.
"""

def __init__(self, status: str, result_id: str) -> None:
self.status = status
self.result_id = result_id
super().__init__(
f"Result {result_id} is not ready (status={status!r}); "
"poll get_result until status='ready' before fetching as Arrow."
)


def _import_pyarrow() -> Any:
try:
import pyarrow.ipc as ipc # type: ignore[import-untyped]
except ImportError as exc: # pragma: no cover - exercised via tests
raise ImportError(
"pyarrow is required to fetch results as Arrow. "
"Install with: pip install 'hotdata[arrow]'"
) from exc
return ipc


class ResultsApi(_GeneratedResultsApi):
"""Drop-in replacement for :class:`hotdata.api.results_api.ResultsApi`
that adds Arrow IPC fetch helpers.

All methods on the base class continue to work unchanged.
"""

def get_result_arrow(
self,
id: str,
*,
offset: Optional[int] = None,
limit: Optional[int] = None,
_request_timeout: Any = None,
) -> "pa.Table":
"""Fetch a ready result as a :class:`pyarrow.Table`.

Buffers the full Arrow IPC stream into memory before returning. Use
:meth:`stream_result_arrow` for large results where you want to
iterate batches without materializing the whole table.

:param id: Result ID.
:param offset: Rows to skip (default: 0).
:param limit: Maximum rows to return (default: unbounded).
:raises ResultNotReadyError: result is still pending or processing.
:raises hotdata.exceptions.ApiException: for other HTTP errors
(400 invalid params, 404 not found, 409 failed result).
"""
ipc = _import_pyarrow()
response = self._call_arrow(id=id, offset=offset, limit=limit,
_request_timeout=_request_timeout)
try:
return ipc.open_stream(response).read_all()
finally:
response.release_conn()

@contextmanager
def stream_result_arrow(
self,
id: str,
*,
offset: Optional[int] = None,
limit: Optional[int] = None,
_request_timeout: Any = None,
) -> Iterator["pa.RecordBatchStreamReader"]:
"""Yield a :class:`pyarrow.RecordBatchStreamReader` for a ready result.

The HTTP connection is released when the context exits. Iterate the
reader to consume :class:`pyarrow.RecordBatch` messages, or call
``reader.read_all()`` for a full :class:`pyarrow.Table`.

Example::

with results.stream_result_arrow(result_id) as reader:
for batch in reader:
process(batch)

:raises ResultNotReadyError: result is still pending or processing.
:raises hotdata.exceptions.ApiException: for other HTTP errors.
"""
ipc = _import_pyarrow()
response = self._call_arrow(id=id, offset=offset, limit=limit,
_request_timeout=_request_timeout)
try:
yield ipc.open_stream(response)
finally:
response.release_conn()

def _call_arrow(
self,
*,
id: str,
offset: Optional[int],
limit: Optional[int],
_request_timeout: Any,
) -> Any:
# Build the request via the generator's private serialize helper so
# path/query/auth handling stays in lockstep with the generated client.
# Override only what we need: the Accept header and the format query.
headers: Dict[str, Any] = {"Accept": ARROW_STREAM_MEDIA_TYPE}
params = self._get_result_serialize(
id=id,
offset=offset,
limit=limit,
format=ResultsFormatQuery.ARROW,
_request_auth=None,
_content_type=None,
_headers=headers,
_host_index=0,
)
response_data = self.api_client.call_api(
*params,
_request_timeout=_request_timeout,
)

if response_data.status == 200:
# Hand the raw urllib3.HTTPResponse to the caller. preload_content
# was False on the way in, so the body has not been consumed.
return response_data.response

# Non-200: drain, deserialize as JSON, then raise. response_deserialize
# raises ApiException for status >= 400 itself; only 202 falls through.
try:
response_data.read()
body = self.api_client.response_deserialize(
response_data=response_data,
response_types_map={
"202": "GetResultResponse",
"400": "ApiErrorResponse",
"404": "ApiErrorResponse",
"409": "GetResultResponse",
},
).data
finally:
response_data.response.release_conn()

raise ResultNotReadyError(
status=getattr(body, "status", "pending"),
result_id=getattr(body, "result_id", id),
)


__all__ = [
"ARROW_STREAM_MEDIA_TYPE",
"ResultNotReadyError",
"ResultsApi",
]
3 changes: 3 additions & 0 deletions pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,9 @@ dependencies = [
"typing-extensions (>=4.7.1)",
]

[project.optional-dependencies]
arrow = ["pyarrow >= 14"]

[project.urls]
Homepage = "https://www.hotdata.dev"
Repository = "https://github.com/hotdata-dev/sdk-python"
Expand Down
101 changes: 101 additions & 0 deletions tests/integration/test_results_arrow.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,101 @@
"""Scenario: results_arrow.

Submit a small query, poll until the result is ready, then fetch the result
as a pyarrow.Table via hotdata.arrow.ResultsApi.get_result_arrow. Verifies
that Arrow IPC content negotiation works end-to-end and that the streaming
variant yields the same data.

Skipped if pyarrow is not installed (the helper requires the ``arrow`` extra).
"""

from __future__ import annotations

import time

import pytest

pa = pytest.importorskip("pyarrow")

from hotdata.api.query_api import QueryApi
from hotdata.api.query_runs_api import QueryRunsApi
from hotdata.arrow import ResultsApi
from hotdata.models.query_request import QueryRequest


TERMINAL_STATUSES = {"succeeded", "failed", "cancelled"}
POLL_TIMEOUT_S = 60.0
POLL_INTERVAL_S = 1.0


@pytest.fixture
def query_api(api_client) -> QueryApi:
return QueryApi(api_client)


@pytest.fixture
def query_runs_api(api_client) -> QueryRunsApi:
return QueryRunsApi(api_client)


@pytest.fixture
def results_api(api_client) -> ResultsApi:
return ResultsApi(api_client)


def test_results_arrow(
query_api: QueryApi,
query_runs_api: QueryRunsApi,
results_api: ResultsApi,
) -> None:
submitted = query_api.query(
QueryRequest(
var_async=True,
async_after_ms=1000,
sql="SELECT 1 AS x, 'hello' AS msg UNION ALL SELECT 2, 'world'",
)
)
query_run_id = submitted.query_run_id
assert query_run_id

deadline = time.monotonic() + POLL_TIMEOUT_S
run = None
while time.monotonic() < deadline:
run = query_runs_api.get_query_run(query_run_id)
if run.status in TERMINAL_STATUSES:
break
time.sleep(POLL_INTERVAL_S)
assert run is not None
assert run.status == "succeeded", (
f"expected succeeded, got {run.status}: {run.error_message}"
)
assert run.result_id, "succeeded run must expose a result_id"
result_id = run.result_id

# Wait for ready before fetching as Arrow — get_result_arrow raises
# ResultNotReadyError on 202.
deadline = time.monotonic() + POLL_TIMEOUT_S
while time.monotonic() < deadline:
result = results_api.get_result(result_id)
if result.status == "ready":
break
time.sleep(POLL_INTERVAL_S)
else:
pytest.fail(f"result {result_id} never became ready")

# Buffered: returns a full pyarrow.Table.
table = results_api.get_result_arrow(result_id)
assert isinstance(table, pa.Table)
assert table.num_rows == 2
assert set(table.column_names) == {"x", "msg"}
assert table.column("x").to_pylist() == [1, 2]
assert table.column("msg").to_pylist() == ["hello", "world"]

# Streaming: same data via RecordBatchStreamReader.
with results_api.stream_result_arrow(result_id) as reader:
streamed = pa.Table.from_batches(list(reader), schema=reader.schema)
assert streamed.equals(table)

# Pagination forwards correctly.
sliced = results_api.get_result_arrow(result_id, offset=1, limit=1)
assert sliced.num_rows == 1
assert sliced.column("x").to_pylist() == [2]
Loading