Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
92 changes: 92 additions & 0 deletions CONTRACT.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,92 @@
# hotdata-runtime Contract

`hotdata-runtime` is the framework-agnostic runtime contract for Hotdata integrations.

## Scope

This package provides shared primitives for:

- Environment and workspace resolution
- Query execution and polling
- Normalized tabular result handling
- Basic workspace health checks

## Public Runtime Contract

The supported import surface is:

- `HotdataClient`
- `QueryResult`
- `from_env`
- `workspace_health_lines`
- `default_api_key`
- `default_host`
- `default_session_id`
- `explicit_workspace_id`
- `list_workspaces`
- `normalize_host`
- `pick_workspace`
- `resolve_workspace_selection`
- `ResultSummary`
- `RunHistoryItem`
- `WorkspaceSelection`

Adapters should import from `hotdata_runtime` and treat this surface as the stable API.

## Semantic Guarantees

### `HotdataClient`

- Represents runtime context: API key, host, workspace, optional session.
- `from_env()` resolves runtime context from env vars and selected workspace.
- `execute_sql(sql)` returns `QueryResult` or raises `RuntimeError`/`TimeoutError`.
- `get_result(result_id)` returns a ready `QueryResult` and waits for readiness when needed.
- `connections()` returns the connections API wrapper for adapter UI/status features.
- `query_runs()` returns the query-runs API wrapper for adapter history views.
- `results()` returns the results API wrapper for adapter result pickers.
- `list_recent_results(...)` returns normalized `ResultSummary` entries.
- `list_run_history(limit=...)` returns normalized `RunHistoryItem` entries.
- `list_qualified_table_names(...)` returns sorted fully qualified table names.
- `columns_for_qualified(qualified, connection_id=...)` resolves table columns, and
adapters should pass `connection_id` when known.

### `QueryResult`

- Canonical tabular result model with `columns`, `rows`, and `row_count`.
- Carries server identifiers and execution metadata when available.
- `to_pandas()` converts to a DataFrame with stable column ordering.
- `to_records(max_rows=...)` returns row dicts keyed by column names.
- `metadata_dict()` returns normalized result metadata for adapter rendering.

### Env Resolution

- `default_api_key()` reads `HOTDATA_API_KEY`.
- `default_host()` reads `HOTDATA_API_URL` (default: `https://api.hotdata.dev`) and normalizes it.
- `default_session_id()` reads `HOTDATA_SANDBOX`.
- `explicit_workspace_id()` reads `HOTDATA_WORKSPACE` (workspace public id).
- `pick_workspace()` prefers explicit env workspace, then active workspace, then first workspace.
- `resolve_workspace_selection()` is the canonical workspace selection algorithm. It returns `WorkspaceSelection` with selected workspace id, selection source, and discovered workspaces when auto-selected.

## Adapter Responsibilities

Framework packages (Jupyter, Marimo, LangChain, LangGraph, LlamaIndex, Streamlit) own:

- Framework-native lifecycle and state management
- Rendering/UI concerns
- Tool/agent wrappers and callback integration

They should not duplicate runtime env/workspace/query semantics.

## Runtime Non-Goals

`hotdata-runtime` does not define framework UI primitives and does not require framework dependencies.

## Versioning Policy

- Backward-incompatible contract changes require a major version bump.
- Additive contract changes are minor versions.
- Bug fixes that preserve contract semantics are patch versions.

## Enforcement

Contract stability is enforced by tests that verify the public export surface and key behavioral invariants.
19 changes: 19 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -2,13 +2,32 @@

Shared runtime primitives for Hotdata integrations: workspace/session semantics, execution context, query state, run history, and replayable result handles. Framework packages (Marimo, Jupyter, Streamlit, LangGraph) depend on this package.

Runtime boundary and guarantees are defined in `CONTRACT.md`.

## Features

- **Environment-driven client setup** — create clients from `HOTDATA_API_KEY`, optional `HOTDATA_API_URL`, `HOTDATA_WORKSPACE`, and `HOTDATA_SANDBOX`.
- **Workspace resolution** — choose an explicit workspace from env, otherwise discover workspaces and select the active workspace or first available workspace.
- **Sandbox/session propagation** — pass sandbox session context through the SDK via `X-Session-Id`.
- **HTTP resilience** — configure SDK retries for transient connection failures and retry SQL execution on stale pooled sockets.
- **SQL execution helper** — run SQL through `POST /v1/query`, poll async query runs when needed, and return a `QueryResult`.
- **Result utilities** — convert query results to records, pandas DataFrames, or metadata dictionaries for adapter display layers.
- **History helpers** — list recent results and query run history with normalized dataclasses.
- **Health helpers** — build compact API/workspace health summaries for UI integrations.

Install:

```bash
uv pip install hotdata-runtime
# or: pip install hotdata-runtime
```

Example:

```bash
python examples/basic_usage.py
```

Development (uses **uv**; creates `.venv/` in this repo):

```bash
Expand Down
25 changes: 25 additions & 0 deletions examples/basic_usage.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,25 @@
"""Basic hotdata-runtime usage."""

from hotdata_runtime import from_env


def main() -> None:
client = from_env()
result = client.execute_sql("SELECT 1 AS ok")

print("result metadata:", result.metadata_dict())
print("records:", result.to_records(max_rows=5))

print("recent results:")
for item in client.list_recent_results(limit=5, offset=0):
print(item.to_dict())

print("run history:")
for item in client.list_run_history(limit=5):
print(item.to_dict())

client.close()


if __name__ == "__main__":
main()
13 changes: 12 additions & 1 deletion hotdata_runtime/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -2,7 +2,12 @@

from importlib.metadata import PackageNotFoundError, version

from hotdata_runtime.client import HotdataClient, from_env
from hotdata_runtime.client import (
HotdataClient,
ResultSummary,
RunHistoryItem,
from_env,
)
from hotdata_runtime.env import (
default_api_key,
default_host,
Expand All @@ -11,6 +16,8 @@
list_workspaces,
normalize_host,
pick_workspace,
resolve_workspace_selection,
WorkspaceSelection,
)
from hotdata_runtime.health import workspace_health_lines
from hotdata_runtime.result import QueryResult
Expand All @@ -33,4 +40,8 @@
"list_workspaces",
"normalize_host",
"pick_workspace",
"resolve_workspace_selection",
"ResultSummary",
"RunHistoryItem",
"WorkspaceSelection",
]
113 changes: 102 additions & 11 deletions hotdata_runtime/client.py
Original file line number Diff line number Diff line change
@@ -1,8 +1,12 @@
from __future__ import annotations

from dataclasses import asdict, dataclass
import time
from typing import Any, Iterator

from urllib3.exceptions import HTTPError as Urllib3HTTPError
from urllib3.exceptions import ProtocolError

from hotdata import ApiClient, Configuration
from hotdata.api.connections_api import ConnectionsApi
from hotdata.api.information_schema_api import InformationSchemaApi
Expand All @@ -22,9 +26,33 @@
normalize_host,
pick_workspace,
)
from hotdata_runtime.http import default_http_retries
from hotdata_runtime.result import QueryResult

_TERMINAL = frozenset({"succeeded", "failed", "cancelled"})
_RESULT_FAILURE = frozenset({"failed", "cancelled"})


@dataclass(frozen=True)
class ResultSummary:
result_id: str
status: str
created_at: str | None

def to_dict(self) -> dict[str, Any]:
return asdict(self)


@dataclass(frozen=True)
class RunHistoryItem:
query_run_id: str
status: str
created_at: str | None
execution_time_ms: int | None
result_id: str | None

def to_dict(self) -> dict[str, Any]:
return asdict(self)


class HotdataClient:
Expand All @@ -47,16 +75,15 @@ def __init__(
api_key=api_key,
workspace_id=workspace_id,
session_id=session_id,
retries=default_http_retries(),
)
self._api = ApiClient(self._config)

@classmethod
def from_env(cls) -> HotdataClient:
api_key = default_api_key()
if not api_key:
raise RuntimeError(
"HOTDATA_API_KEY or HOTDATA_TOKEN must be set."
)
raise RuntimeError("HOTDATA_API_KEY must be set.")
host = default_host()
session = default_session_id()
workspace_id = pick_workspace(api_key, host, session)
Expand Down Expand Up @@ -108,6 +135,39 @@ def query_runs(self) -> QueryRunsApi:
def results(self) -> ResultsApi:
return self._results_api()

def list_recent_results(
self,
*,
limit: int = 50,
offset: int = 0,
) -> list[ResultSummary]:
listing = self.results().list_results(limit=limit, offset=offset)
return [
ResultSummary(
result_id=r.id,
status=r.status,
created_at=r.created_at,
)
for r in listing.results
]

def list_run_history(
self,
*,
limit: int = 20,
) -> list[RunHistoryItem]:
listing = self.query_runs().list_query_runs(limit=limit)
return [
RunHistoryItem(
query_run_id=r.id,
status=r.status,
created_at=r.created_at,
execution_time_ms=r.execution_time_ms,
result_id=r.result_id,
)
for r in listing.query_runs
]
Comment on lines +138 to +169
Copy link
Copy Markdown

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

super nit: (not blocking) list_recent_results accepts offset but list_run_history does not, and their default limits differ (50 vs 20). Both wrap paginated server endpoints and adapters will likely want to page through both. Adding a matching offset: int = 0 to list_run_history (and considering aligning default limits) would make these helpers consistent and avoid surprise when adapters reuse pagination code across the two.


def iter_tables(
self,
*,
Expand Down Expand Up @@ -143,9 +203,26 @@ def list_qualified_table_names(

def connection_id_by_name(self) -> dict[str, str]:
listing = self.connections().list_connections()
return {c.name: c.id for c in listing.connections}
id_map: dict[str, str] = {}
duplicate_names: set[str] = set()
for c in listing.connections:
if c.name in id_map and id_map[c.name] != c.id:
duplicate_names.add(c.name)
id_map[c.name] = c.id
if duplicate_names:
names = ", ".join(sorted(duplicate_names))
raise RuntimeError(
f"Duplicate connection names found: {names}. "
"Use an explicit connection_id."
)
return id_map

def columns_for_qualified(self, qualified: str) -> list[TableInfo]:
def columns_for_qualified(
self,
qualified: str,
*,
connection_id: str | None = None,
) -> list[TableInfo]:
parts = qualified.split(".")
if len(parts) < 3:
raise ValueError(
Expand All @@ -156,10 +233,12 @@ def columns_for_qualified(self, qualified: str) -> list[TableInfo]:
parts[1],
".".join(parts[2:]),
)
id_map = self.connection_id_by_name()
conn_id = id_map.get(conn_name)
if not conn_id:
raise KeyError(f"Unknown connection {conn_name!r}")
conn_id = connection_id
if conn_id is None:
id_map = self.connection_id_by_name()
conn_id = id_map.get(conn_name)
if not conn_id:
raise KeyError(f"Unknown connection {conn_name!r}")
resp = self._information_schema().information_schema(
connection_id=conn_id,
var_schema=schema_name,
Expand Down Expand Up @@ -206,9 +285,9 @@ def _wait_result_ready(
last = results.get_result(result_id)
if last.status == "ready":
return last
if last.status == "failed":
if last.status in _RESULT_FAILURE:
raise RuntimeError(
last.error_message or "Result persistence failed"
last.error_message or f"Result {last.status}"
)
time.sleep(interval_s)
raise TimeoutError(
Expand All @@ -217,6 +296,18 @@ def _wait_result_ready(
)

def execute_sql(self, sql: str) -> QueryResult:
last_err: BaseException | None = None
for attempt in range(3):
try:
return self._execute_sql_once(sql)
except (ProtocolError, ConnectionResetError, Urllib3HTTPError) as e:
last_err = e
if attempt == 2:
raise
time.sleep(0.2 * (2**attempt))
raise last_err # pragma: no cover

def _execute_sql_once(self, sql: str) -> QueryResult:
q = self._query_api()
try:
raw = q.query(QueryRequest(sql=sql))
Expand Down
Loading
Loading