From 866b78d346c78b45b5d6fb953cf364ae890825ac Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Tue, 2 Jun 2026 21:25:13 +0000 Subject: [PATCH 1/8] Move to a public vs. private set of endpoints; build /schemas endpoint; restructure files; delete unused files --- app/abstract_worker.py | 94 ------------- app/apis/__init__.py | 0 app/{ => apis}/abstract.py | 11 +- app/{ => apis}/ago.py | 14 +- app/{ => apis}/carto.py | 22 +-- app/config.py | 1 - app/internal.py | 34 +++++ app/main.py | 242 +++------------------------------ app/public.py | 220 ++++++++++++++++++++++++++++++ app/test_main.py | 8 +- app/{ => utils}/models.py | 0 app/{ => utils}/utils.py | 35 +++-- app/{ => utils}/utils_carto.py | 2 +- app/{ => utils}/utils_tests.py | 0 pyproject.toml | 2 +- 15 files changed, 338 insertions(+), 347 deletions(-) delete mode 100644 app/abstract_worker.py create mode 100644 app/apis/__init__.py rename app/{ => apis}/abstract.py (98%) rename app/{ => apis}/ago.py (98%) rename app/{ => apis}/carto.py (98%) delete mode 100644 app/config.py create mode 100644 app/internal.py create mode 100644 app/public.py rename app/{ => utils}/models.py (100%) rename app/{ => utils}/utils.py (91%) rename app/{ => utils}/utils_carto.py (99%) rename app/{ => utils}/utils_tests.py (100%) diff --git a/app/abstract_worker.py b/app/abstract_worker.py deleted file mode 100644 index 9b65274..0000000 --- a/app/abstract_worker.py +++ /dev/null @@ -1,94 +0,0 @@ -from __future__ import annotations -from .models import ReturnJson, GeoJsonFeature -from abc import ABC, abstractmethod -from fastapi import Request -import inspect -import datetime as dt -from collections.abc import Callable - - -class AbstractWorker(ABC): - """Abstract base class to ensure worker classes are properly implemented - See https://www.geeksforgeeks.org/factory-method-python-design-patterns/""" - CACHE_DURATION = dt.timedelta(minutes=15) - MAX_RESPONSE_SIZE = 2 * 1024 * 1024 # 2MB response limit to not crash user systems (2MB of data expands to 10MB response, which is upper limit of what Chrome browser & Postman can handle) - DEFAULT_SRID = 4326 - - - @abstractmethod - async def get_count(self) -> ReturnJson: - """Get the row count of a dataset from the API. Implementation is API-specific""" - raise NotImplementedError - - @abstractmethod - async def normalize_rv_count(self) -> ReturnJson: - """Normalize the data received from the API into a uniform response. - Implementation is API-specific""" - raise NotImplementedError - - @abstractmethod - async def get(self) -> ReturnJson: - """Get data from the API. Implementation is API-specific""" - raise NotImplementedError - - @abstractmethod - async def normalize_rv(self) -> ReturnJson: - """Normalize the data received from the API into a uniform response. - Implementation is API-specific""" - raise NotImplementedError - - def determine_function_params(self, func: Callable) -> list[str]: - """Determine the parameters used in a function so as to document which query - parameters are accepted by each particular API service, ignoring those - not passed in by the API user - - Args: - func (Callable): Any function, notably the API class' `get()` function - - Returns: - list[str]: Names of query parameters available for service's endpoint - """ - sig = inspect.signature(func) - available_parameters = [] - for param in sig.parameters: - if param not in ('session', 'kwargs', 'request'): - available_parameters.append(param) - return available_parameters - - def create_next_where_clause(self, features: list[GeoJsonFeature]) -> str: - """Create the WHERE clause for the NEXT url link to retrieve the next batch - of data. Any API not using "objectid" would need to overwrite this method - - Args: - data (list[dict]): Data records - - Returns: - str: WHERE clause restricting the data to be retrieved - """ - max_objectid = 0 - for feature in features: - objectid = int(feature.id) - max_objectid = max(objectid, max_objectid) - next_where = f"objectid > {max_objectid}" - return next_where - - def create_next_url(self, records: list[dict], request: Request) -> str: - """Create the url to access the next "page" of data, preserving any existing - WHERE clause - - Args: - records (list[dict]): Data records - request (Request): User request to this API - - Returns: - str: URL to access the next page of data - """ - old_url = request.url - old_where = request.query_params.get("where") - next_where = self.create_next_where_clause(records) - if old_where: - new_where = f"({old_where}) AND {next_where}" - else: - new_where = next_where - next_url = str(old_url.include_query_params(where=new_where)) - return next_url diff --git a/app/apis/__init__.py b/app/apis/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/app/abstract.py b/app/apis/abstract.py similarity index 98% rename from app/abstract.py rename to app/apis/abstract.py index 6348e6a..ee5a429 100644 --- a/app/abstract.py +++ b/app/apis/abstract.py @@ -1,11 +1,14 @@ from __future__ import annotations -from .models import ReturnJson, GeoJsonFeature, TableSchema + +import datetime as dt +import inspect from abc import ABC, abstractmethod +from collections.abc import Callable + from fastapi import Request from fastapi.exceptions import HTTPException -import inspect -import datetime as dt -from collections.abc import Callable + +from ..utils.models import GeoJsonFeature, ReturnJson, TableSchema class AbstractWorker(ABC): diff --git a/app/ago.py b/app/apis/ago.py similarity index 98% rename from app/ago.py rename to app/apis/ago.py index 84b28ab..73c713e 100644 --- a/app/ago.py +++ b/app/apis/ago.py @@ -1,9 +1,17 @@ -# ago.py +import datetime as dt + import aiohttp from fastapi import Request -import datetime as dt + +from ..utils.models import ( + Error, + GeoJsonFeatureCollection, + Links, + Meta, + ReturnJson, + TableSchema, +) from .abstract import AbstractWorker, check_fields_valid -from .models import ReturnJson, Meta, Links, Error, GeoJsonFeatureCollection, TableSchema class Ago(AbstractWorker): diff --git a/app/carto.py b/app/apis/carto.py similarity index 98% rename from app/carto.py rename to app/apis/carto.py index 90caa14..b164bc8 100644 --- a/app/carto.py +++ b/app/apis/carto.py @@ -1,19 +1,21 @@ -import aiohttp -import os import datetime as dt +import os + +import aiohttp from fastapi import Request from psycopg import sql as psql # Redefine to allow "sql" as a query parameter -from .abstract import AbstractWorker, check_fields_valid -from .models import ( - ReturnJson, - Meta, - Links, + +from ..utils.models import ( Error, - GeoJsonFeatureCollection, GeoJsonFeature, - TableSchema + GeoJsonFeatureCollection, + Links, + Meta, + ReturnJson, + TableSchema, ) -from .utils_carto import FULL_QUERY +from ..utils.utils_carto import FULL_QUERY +from .abstract import AbstractWorker, check_fields_valid class Carto(AbstractWorker): diff --git a/app/config.py b/app/config.py deleted file mode 100644 index e89fbdd..0000000 --- a/app/config.py +++ /dev/null @@ -1 +0,0 @@ -KEEPER_SECRET = "OIT API Wrapper GitHubActions Token" diff --git a/app/internal.py b/app/internal.py new file mode 100644 index 0000000..d0a7eff --- /dev/null +++ b/app/internal.py @@ -0,0 +1,34 @@ +from typing import Annotated + +from fastapi import APIRouter + +from .utils.utils import schema_cache + +internal_router = APIRouter(include_in_schema=False) + + +@internal_router.get("/schemas", tags=["Routes"]) +async def get_schemas( + table: Annotated[ + str | None, "Name of table to retrieve", + ] = None, +) -> dict: + rv = { + "latest_check": schema_cache.latest_check, + "latest_update": schema_cache.latest_update, + "commit_check_delay_seconds": schema_cache.commit_check_delay, + "folder": schema_cache.folder, + "latest_repo_target": schema_cache.latest_repo_target, + "invalid_fields": schema_cache.invalid_fields, + "schema_count": len(schema_cache.cache.keys()) + } + if table: + table = table.lower() + try: + rv['schema'] = {table: schema_cache.cache[table]} + except KeyError: + rv['schema'] = [] + else: + rv["schemas"] = schema_cache.cache + + return rv diff --git a/app/main.py b/app/main.py index 4a49bdc..eb33756 100644 --- a/app/main.py +++ b/app/main.py @@ -1,26 +1,18 @@ -from fastapi import FastAPI, Depends, Query, Request -from fastapi.responses import JSONResponse, RedirectResponse -from fastapi.exceptions import RequestValidationError, HTTPException +from asyncio import create_task from contextlib import asynccontextmanager -import aiohttp -from asyncio import TimeoutError, create_task -from typing import Annotated -from .models import ReturnJson, Links, Error -from .utils import ( - AbstractWorker, - Api_Manager, - SchemaCache, - SessionManager, - Service, - description, - generate_final_response, - make_param_api_descriptions, -) +from fastapi import FastAPI, Request +from fastapi.exceptions import HTTPException, RequestValidationError +from fastapi.responses import JSONResponse -session_manager = SessionManager() -schema_cache = SchemaCache() -api_manager = Api_Manager() +from .internal import internal_router +from .public import public_router, public_prefix +from .utils.models import Error, Links, ReturnJson +from .utils.utils import ( + description, + schema_cache, + session_manager, +) @asynccontextmanager @@ -40,8 +32,14 @@ async def lifespan(app: FastAPI): await session_manager.stop() -app = FastAPI(lifespan=lifespan, title="OIT API Data Wrapper", description=description) - +app = FastAPI( + lifespan=lifespan, + title="OIT API Data Wrapper", + description=description, + docs_url=f"{public_prefix}/docs", +) +app.include_router(internal_router) +app.include_router(public_router) @app.exception_handler(RequestValidationError) async def validation_exception_handler( @@ -75,203 +73,3 @@ async def http_exception_handler(request: Request, exc: HTTPException) -> JSONRe content=rv.model_dump(mode="json", exclude_none=True), ) return response - - -@app.get( - "/get", - response_model=ReturnJson, - response_model_exclude_unset=True, - tags=["Routes"], -) -async def get_data( - request: Request, - table: Annotated[ - str | None, - Query( - description=f"""Name of table to retrieve. Either `table` or `sql` - parameter is required. Ignored if `sql` parameter is provided. Need an - example tale? Try `table=dor_parcel`. - {make_param_api_descriptions(api_manager, "table")}""" - ), - ] = None, - fields: Annotated[ - str | None, - Query( - description=f"""List of fields to retrieve, taking the form - _field_1_,_field_2_,... To receive all fields, do not include this parameter. - Writing fields=* will return an error. Ignored if `sql` or - `count_only` parameters are provided.{make_param_api_descriptions(api_manager, "fields")}""" - ), - ] = None, - where: Annotated[ - str | None, - Query( - description=f"""An SQL _WHERE_ clause to filter data. Ignored if - `sql` parameter is provided.{make_param_api_descriptions(api_manager, "where")}""" - ), - ] = None, - limit: Annotated[ - int | None, - Query( - description=f"""Limit to the number of records to return. AGO enforces - a limit specific to each table (frequently 2,000 records); for Carto, this API - enforces a limit of 1,000 records as Carto otherwise does not have - limits. Any user-provided limit smaller than those takes precedence. - Ignored if `sql` or `count_only` paramaters are provided.{make_param_api_descriptions(api_manager, "limit")}""" - ), - ] = None, - out_sr: Annotated[ - int, - Query( - description=f"""Spatial Reference to return geometric records in. - Default SRID is WGS84 (4326). Ignored if dataset is not geometric, or `sql` or `count_only` - parameters are provided.{make_param_api_descriptions(api_manager, "count_only")}""" - ), - ] = AbstractWorker.DEFAULT_SRID, - count_only: Annotated[ - bool, - Query( - description=f"""Return record count of provided query. Ignored if - `sql` parameter is provided.{make_param_api_descriptions(api_manager, "count_only")}""" - ), - ] = False, - sql: Annotated[ - str | None, - Query( - description=f"""Raw SQL string to use when retrieving data. Users - should request no more than ~2,000 rows to avoid an `HTTP 413` error. - Either `table` or `sql` parameter is required. Need an example? Try - `sql=SELECT * FROM DOR_PARCEL LIMIT 10`.{make_param_api_descriptions(api_manager, "sql")}""" - ), - ] = None, - service: Annotated[ - Service | None, - Query( - description="""Name of API service to use. If not provided, the first - API service to locate the table will be used. Ignored if `sql` parameter - is provided.""" - ), - ] = None, - timeout: Annotated[ - float, - Query( - description="""Amount of time in seconds to wait for response from downstream APIs - before raising a timeout error""", - gt=0, - lt=300, - ), - ] = 30, - no_cache: Annotated[ - bool, - Query( - description=f"""Request fresh results from downstream APIs, ignoring any HTTP caching. - {make_param_api_descriptions(api_manager, "no_cache")}""" - ), - ] = False, - session: aiohttp.ClientSession = Depends(session_manager), -) -> ReturnJson | JSONResponse: - """Use this endpoint to retrieve data from the available - services. At a minimum either the `table` or `sql` parameter is required. - \nParameters are case-sensitive and those not relevant to a specific service - will be ignored.""" - if "authorization" in request.headers: - token = request.headers["authorization"] - else: - token = None - params = { - "table": table, - "fields": fields, - "where": where, - "limit": limit, - "out_sr": out_sr, - "count_only": count_only, - "sql": sql, - "session": session, - "timeout": timeout, - "no_cache": no_cache, - "request": request, - "schema_cache": schema_cache, - "token": token, - } - if sql: - if service and service.lower() != "carto": - raise HTTPException( - status_code=400, - detail="SQL parameter can only be used with `service=carto`", - headers={"title": "Bad Request"}, - ) - try: - rv = await api_manager.map_str_to_api["carto"].get(**params) - except TimeoutError: - raise HTTPException( - status_code=408, - detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again.", - headers={"title": "Request Timeout"}, - ) - return generate_final_response(rv) - if not service: - links = Links(self=str(request.url)) - rv_combined = ReturnJson(links=links, errors=[]) - for api in api_manager.map_api_to_params: - if table: - try: - rv = await api.get(**params) - except TimeoutError: - api_manager.deprioritize(api) - error = Error( - code=408, - title=f"{api.name} Timeout Error", - detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again.", - ) - rv_combined.errors.append(error) - else: - if not rv.errors: - rv.meta.service_available_query_parameters = ( - api_manager.map_api_to_params[api] - ) - return rv - else: - rv_combined.errors.append(rv.errors[0]) - else: - raise HTTPException( - status_code=400, - detail="'table' or 'sql' parameters are required", - headers={"title": "Bad Request"}, - ) - return generate_final_response(rv_combined) - else: - api = api_manager.map_str_to_api[service.lower()] - if table: - try: - if count_only: - rv = await api.get_count(**params) - else: - rv = await api.get(**params) - except TimeoutError: - raise HTTPException( - status_code=408, - detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again", - headers={"title": "Request Timeout"}, - ) - else: - raise HTTPException( - status_code=400, - detail="'table' or 'sql' parameters are required", - headers={"title": "Bad Request"}, - ) - rv.meta.service_available_query_parameters = api_manager.map_api_to_params[api] - return generate_final_response(rv) - - -@app.get("/api_priority", tags=["Routes"]) -async def get_api_priority() -> list: - """Return the API names in the order they will be searched if no `service` - is specified. If an API returns a TimeoutError during a request, then it will be - placed last in priority order.""" - return [api.name for api in api_manager.api_priority_queue] - - -@app.get("/", tags=["Routes"]) -async def docs() -> RedirectResponse: - """Redirect to the `/docs` endpoint""" - return RedirectResponse(url="/docs") diff --git a/app/public.py b/app/public.py new file mode 100644 index 0000000..d164ee4 --- /dev/null +++ b/app/public.py @@ -0,0 +1,220 @@ +from asyncio import TimeoutError +from typing import Annotated + +import aiohttp +from fastapi import APIRouter, Depends, Query, Request +from fastapi.exceptions import HTTPException +from fastapi.responses import JSONResponse, RedirectResponse + +from .apis.abstract import AbstractWorker +from .utils.models import Error, Links, ReturnJson +from .utils.utils import ( + Service, + api_manager, + generate_final_response, + make_param_api_descriptions, + schema_cache, + session_manager, +) + +public_prefix = "/api" +public_router = APIRouter(prefix=public_prefix, tags=["Routes"]) + + +@public_router.get( + "/get", + response_model=ReturnJson, + response_model_exclude_unset=True, +) +async def get_data( + request: Request, + table: Annotated[ + str | None, + Query( + description=f"""Name of table to retrieve. Either `table` or `sql` + parameter is required. Ignored if `sql` parameter is provided. Need an + example tale? Try `table=dor_parcel`. + {make_param_api_descriptions(api_manager, "table")}""" + ), + ] = None, + fields: Annotated[ + str | None, + Query( + description=f"""List of fields to retrieve, taking the form + _field_1_,_field_2_,... To receive all fields, do not include this parameter. + Writing fields=* will return an error. Ignored if `sql` or + `count_only` parameters are provided.{make_param_api_descriptions(api_manager, "fields")}""" + ), + ] = None, + where: Annotated[ + str | None, + Query( + description=f"""An SQL _WHERE_ clause to filter data. Ignored if + `sql` parameter is provided.{make_param_api_descriptions(api_manager, "where")}""" + ), + ] = None, + limit: Annotated[ + int | None, + Query( + description=f"""Limit to the number of records to return. AGO enforces + a limit specific to each table (frequently 2,000 records); for Carto, this API + enforces a limit of 1,000 records as Carto otherwise does not have + limits. Any user-provided limit smaller than those takes precedence. + Ignored if `sql` or `count_only` paramaters are provided.{make_param_api_descriptions(api_manager, "limit")}""" + ), + ] = None, + out_sr: Annotated[ + int, + Query( + description=f"""Spatial Reference to return geometric records in. + Default SRID is WGS84 (4326). Ignored if dataset is not geometric, or `sql` or `count_only` + parameters are provided.{make_param_api_descriptions(api_manager, "count_only")}""" + ), + ] = AbstractWorker.DEFAULT_SRID, + count_only: Annotated[ + bool, + Query( + description=f"""Return record count of provided query. Ignored if + `sql` parameter is provided.{make_param_api_descriptions(api_manager, "count_only")}""" + ), + ] = False, + sql: Annotated[ + str | None, + Query( + description=f"""Raw SQL string to use when retrieving data. Users + should request no more than ~2,000 rows to avoid an `HTTP 413` error. + Either `table` or `sql` parameter is required. Need an example? Try + `sql=SELECT * FROM DOR_PARCEL LIMIT 10`.{make_param_api_descriptions(api_manager, "sql")}""" + ), + ] = None, + service: Annotated[ + Service | None, + Query( + description="""Name of API service to use. If not provided, the first + API service to locate the table will be used. Ignored if `sql` parameter + is provided.""" + ), + ] = None, + timeout: Annotated[ + float, + Query( + description="""Amount of time in seconds to wait for response from downstream APIs + before raising a timeout error""", + gt=0, + lt=300, + ), + ] = 30, + no_cache: Annotated[ + bool, + Query( + description=f"""Request fresh results from downstream APIs, ignoring any HTTP caching. + {make_param_api_descriptions(api_manager, "no_cache")}""" + ), + ] = False, + session: aiohttp.ClientSession = Depends(session_manager), +) -> ReturnJson | JSONResponse: + """Use this endpoint to retrieve data from the available + services. At a minimum either the `table` or `sql` parameter is required. + \nParameters are case-sensitive and those not relevant to a specific service + will be ignored.""" + if "authorization" in request.headers: + token = request.headers["authorization"] + else: + token = None + params = { + "table": table.lower(), + "fields": fields, + "where": where, + "limit": limit, + "out_sr": out_sr, + "count_only": count_only, + "sql": sql, + "session": session, + "timeout": timeout, + "no_cache": no_cache, + "request": request, + "schema_cache": schema_cache, + "token": token, + } + if sql: + if service and service.lower() != "carto": + raise HTTPException( + status_code=400, + detail="SQL parameter can only be used with `service=carto`", + headers={"title": "Bad Request"}, + ) + try: + rv = await api_manager.map_str_to_api["carto"].get(**params) + except TimeoutError: + raise HTTPException( + status_code=408, + detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again.", + headers={"title": "Request Timeout"}, + ) + return generate_final_response(rv) + if not service: + links = Links(self=str(request.url)) + rv_combined = ReturnJson(links=links, errors=[]) + for api in api_manager.map_api_to_params: + if table: + try: + rv = await api.get(**params) + except TimeoutError: + api_manager.deprioritize(api) + error = Error( + code=408, + title=f"{api.name} Timeout Error", + detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again.", + ) + rv_combined.errors.append(error) + else: + if not rv.errors: + rv.meta.service_available_query_parameters = ( + api_manager.map_api_to_params[api] + ) + return rv + else: + rv_combined.errors.append(rv.errors[0]) + else: + raise HTTPException( + status_code=400, + detail="'table' or 'sql' parameters are required", + headers={"title": "Bad Request"}, + ) + return generate_final_response(rv_combined) + else: + api = api_manager.map_str_to_api[service.lower()] + if table: + try: + if count_only: + rv = await api.get_count(**params) + else: + rv = await api.get(**params) + except TimeoutError: + raise HTTPException( + status_code=408, + detail="Request could not be completed. Request less data, preferably 2,000 rows or fewer, or alternatively try again", + headers={"title": "Request Timeout"}, + ) + else: + raise HTTPException( + status_code=400, + detail="'table' or 'sql' parameters are required", + headers={"title": "Bad Request"}, + ) + rv.meta.service_available_query_parameters = api_manager.map_api_to_params[api] + return generate_final_response(rv) + + +@public_router.get("/api_priority") +async def get_api_priority() -> list: + """Return the API names in the order they will be searched if no `service` + is specified. If an API returns a TimeoutError during a request, then it will be + placed last in priority order.""" + return [api.name for api in api_manager.api_priority_queue] + + +@public_router.get("/") +async def docs() -> RedirectResponse: + """Redirect to the `docs` endpoint""" + return RedirectResponse(url=f"{public_prefix}/docs") diff --git a/app/test_main.py b/app/test_main.py index 188119f..d25bf80 100644 --- a/app/test_main.py +++ b/app/test_main.py @@ -1,8 +1,10 @@ -from fastapi.testclient import TestClient +from collections.abc import Generator + import pytest +from fastapi.testclient import TestClient + from .main import api_manager, app -from .utils_tests import generate_ago_token -from collections.abc import Generator +from .utils.utils_tests import generate_ago_token # Response validation handled by pydantic on API server itself # Still have to coerce FastAPI default validation errors to JSON:API spec diff --git a/app/models.py b/app/utils/models.py similarity index 100% rename from app/models.py rename to app/utils/models.py diff --git a/app/utils.py b/app/utils/utils.py similarity index 91% rename from app/utils.py rename to app/utils/utils.py index b6cd960..a511a0a 100644 --- a/app/utils.py +++ b/app/utils/utils.py @@ -1,5 +1,6 @@ from __future__ import annotations +import datetime as dt import json import os from asyncio import sleep @@ -9,9 +10,9 @@ from fastapi.exceptions import HTTPException from fastapi.responses import JSONResponse -from .abstract import AbstractWorker -from .ago import Ago -from .carto import Carto +from ..apis.abstract import AbstractWorker +from ..apis.ago import Ago +from ..apis.carto import Carto from .models import ReturnJson, TableSchema @@ -23,7 +24,9 @@ class SchemaCache: """ def __init__(self): - self.folder = "/var/git/databridge-schemas" + self._prod_folder = "/var/git/databridge-schemas" + self._local_dev_folder = "/scripts/databridge-schemas" + self.folder = self.set_folder() self.commit_check_delay = 300 self.latest_repo_target: str = None self.cache: dict[str, TableSchema] = {} @@ -33,7 +36,19 @@ def __init__(self): "Shape__Length", # AGO (some tables, such as dor_parcel) "gdb_geomattr_data", # AGO (some tables, such as dor_parcel) ] - + self.latest_check: dt.datetime = None + self.latest_update: dt.datetime = None + + def set_folder(self): + if os.path.isdir(self._prod_folder): + return self._prod_folder + elif os.path.isdir(self._local_dev_folder): + return self._local_dev_folder + else: + raise AssertionError( + f'databridge-schemas repo not found at "{self._prod_folder}" or "{self._local_dev_folder}"' + ) + async def loop_commit_check(self): """Run a continuous asynchronous loop to quickly absorb any updates to the schemas repository @@ -48,6 +63,7 @@ def check_latest_commit(self): # Either will work with realpath(). # (e.g., /var/git/.worktrees//) current_target = os.path.realpath(self.folder) + self.latest_check = dt.datetime.now(tz=dt.UTC) if getattr(self, 'latest_repo_target', None) != current_target: print(f"New symlink target detected: {current_target} Updating SchemaCache.") @@ -72,9 +88,7 @@ def update(self): functions block the API from responding to network requests. """ print("Updating SchemaCache") - assert os.path.isdir(self.folder), ( - f"databridge-schemas repo not found at {self.folder}!!" - ) + self.latest_update = dt.datetime.now(tz=dt.UTC) replacement_cache = self.search_recursively(self.folder) self.cache = replacement_cache print(f"Cache successfully updated. {len(self.cache):,} tables in cache.") @@ -288,3 +302,8 @@ def make_param_api_descriptions(api_manager: Api_Manager, param: str) -> str: if param in api_manager.map_api_to_params[api]: s.append(api.name) return "\n\n_Used by:_ " + ", ".join(s) + + +schema_cache = SchemaCache() +api_manager = Api_Manager() +session_manager = SessionManager() diff --git a/app/utils_carto.py b/app/utils/utils_carto.py similarity index 99% rename from app/utils_carto.py rename to app/utils/utils_carto.py index ef434e8..c37c390 100644 --- a/app/utils_carto.py +++ b/app/utils/utils_carto.py @@ -23,4 +23,4 @@ ) FROM geojson -""" \ No newline at end of file +""" diff --git a/app/utils_tests.py b/app/utils/utils_tests.py similarity index 100% rename from app/utils_tests.py rename to app/utils/utils_tests.py diff --git a/pyproject.toml b/pyproject.toml index cd1d5a9..16815ea 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [project] name = "oit-api-wrapper" -version = "0.1.1" +version = "0.2.0" description = "Add your description here" readme = "README.md" requires-python = ">=3.14" From d2f962e8b48af4872cb0a27746f8765bc73a4574 Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Thu, 4 Jun 2026 19:46:54 +0000 Subject: [PATCH 2/8] Revise openapi_paths; fix tests after restructuring; update README & docstrings --- README.md | 12 +++++++++- app/internal.py | 12 +++++++++- app/main.py | 19 +++++++++++++++ app/public.py | 4 +++- app/test_main.py | 60 +++++++++++++++++++++++++----------------------- uv.lock | 2 +- 6 files changed, 76 insertions(+), 33 deletions(-) diff --git a/README.md b/README.md index 332ac17..57e3d12 100644 --- a/README.md +++ b/README.md @@ -3,6 +3,16 @@ Read the API docs at `/docs` +The publicly accessible endpoints are actually at the path `/api`, i.e. the standard +`GET` request goes to `/api/get`, but this is hidden from the user by the reverse proxy, Mulesoft. + +### Debugging +There is an internal endpoint available for testing. This will only be accessible on the City network, i.e. not by the reverse proxy. + +It's available at `/schemas`, and you can pass `?table=` to see a specific table's schema + +### Development + For local development and testing, copy `env.example` to `.env` and populate it. Then run `export $(grep -v '^#' .env | xargs)` to export them as environment variables so the python program can access it. Running the API locally: @@ -13,6 +23,6 @@ To run in a docker container, make sure your .env file is setup then run: `docker-compose up --build -d` -Testing: +### Testing `uv run --env-file=.env pytest --maxfail=4 --tb=short -v` \ No newline at end of file diff --git a/app/internal.py b/app/internal.py index d0a7eff..b5c6c69 100644 --- a/app/internal.py +++ b/app/internal.py @@ -13,6 +13,16 @@ async def get_schemas( str | None, "Name of table to retrieve", ] = None, ) -> dict: + """Return schema information for debugging purposes. This endpoint is meant to + be internally accessible only + + Args: + table (str, optional): If provided, only return the schema for the specified + table if it exists. Defaults to None, in which case all schemas are returned. + + Returns: + dict: Information about the schema cache + """ rv = { "latest_check": schema_cache.latest_check, "latest_update": schema_cache.latest_update, @@ -27,7 +37,7 @@ async def get_schemas( try: rv['schema'] = {table: schema_cache.cache[table]} except KeyError: - rv['schema'] = [] + rv['schema'] = {table: "Schema Not Found"} else: rv["schemas"] = schema_cache.cache diff --git a/app/main.py b/app/main.py index eb33756..b82591f 100644 --- a/app/main.py +++ b/app/main.py @@ -15,6 +15,23 @@ ) +def revise_openapi_paths(): + """Revise the URL paths in the OpenAPI docs to remove the public path prefix. The + plan is to host this App behind a reverse proxy and only allow public access + to the public endpoints covered by the Public Prefix (currently "/api"). Because + this directory will be the root for the reverse proxy and users will not know this, + the URL paths need to remove the public prefix. + """ + app.openapi() + revised_openapi_paths = {} + openapi_paths = app.openapi_schema["paths"] + for path in openapi_paths: + if path.startswith(public_prefix): + new_path = path.removeprefix(public_prefix) + revised_openapi_paths[new_path] = openapi_paths[path] + app.openapi_schema["paths"] = revised_openapi_paths + + @asynccontextmanager async def lifespan(app: FastAPI): """Define code to run before the FastAPI app starts and after it shuts down, @@ -40,6 +57,8 @@ async def lifespan(app: FastAPI): ) app.include_router(internal_router) app.include_router(public_router) +revise_openapi_paths() + @app.exception_handler(RequestValidationError) async def validation_exception_handler( diff --git a/app/public.py b/app/public.py index d164ee4..c0643de 100644 --- a/app/public.py +++ b/app/public.py @@ -121,8 +121,10 @@ async def get_data( token = request.headers["authorization"] else: token = None + if table: + table = table.lower() params = { - "table": table.lower(), + "table": table, "fields": fields, "where": where, "limit": limit, diff --git a/app/test_main.py b/app/test_main.py index d25bf80..1f07737 100644 --- a/app/test_main.py +++ b/app/test_main.py @@ -3,7 +3,9 @@ import pytest from fastapi.testclient import TestClient -from .main import api_manager, app +from .main import app +from .public import public_prefix +from .utils.utils import api_manager from .utils.utils_tests import generate_ago_token # Response validation handled by pydantic on API server itself @@ -48,7 +50,7 @@ def token() -> str: def test_valid(client: TestClient, service: str, table: str): """Test that each service works""" params = {"table": table, "service": service, "no_cache": True} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 assert rv["links"]["self"] == response.url @@ -67,11 +69,11 @@ def test_valid_private(client: TestClient, token: str, service: str, count_only: "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code <= 500 headers = {"Authorization": f"Bearer {token}"} - response = client.get("/get", params=params, headers=headers) + response = client.get(f"{public_prefix}/get", params=params, headers=headers) rv = response.json() assert response.status_code == 200 assert rv["links"]["self"] == response.url @@ -88,7 +90,7 @@ def test_valid_private_no_interfere(client: TestClient, service: str, token: str table = GOOD_TABLES[0] params = {"table": table, "limit": 5, "service": service, "no_cache": True} response = client.get( - "/get", params=params, headers={"Authorization": f"Bearer {token}"} + f"{public_prefix}/get", params=params, headers={"Authorization": f"Bearer {token}"} ) assert response.status_code == 200 rv = response.json() @@ -106,7 +108,7 @@ def test_valid_fields(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() data = rv["data"] @@ -129,7 +131,7 @@ def test_valid_fields2(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() data = rv["data"] @@ -147,7 +149,7 @@ def test_valid_where(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 assert rv["meta"]["record_count"] == 2 @@ -169,7 +171,7 @@ def test_valid_where_parethesization(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 next_url = rv["links"]["next"] @@ -188,7 +190,7 @@ def test_valid_limit_next(client: TestClient, service: str, table: str): """Test that the `limit` parameter works and that the `next` url works""" LIMIT = 2 params = {"table": table, "limit": LIMIT, "service": service, "no_cache": True} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 assert rv["data"]["features"], f'Service {service} found zero features in table {table}' @@ -224,7 +226,7 @@ def test_valid_count_only(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() assert rv["meta"]["records_total"] == 5 @@ -241,14 +243,14 @@ def test_valid_srid(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 data = rv["data"] assert rv["data"]["features"], f'Service {service} found zero features in table {table}' params["out_sr"] = 2272 - response2 = client.get("/get", params=params) + response2 = client.get(f"{public_prefix}/get", params=params) rv2 = response2.json() assert response2.status_code == 200 data2 = rv2["data"] @@ -268,7 +270,7 @@ def test_valid_sql(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() assert rv["meta"]["record_count"] == 5 @@ -283,7 +285,7 @@ def test_valid_sql_too_large(client: TestClient, service: str): "sql": f"SELECT * FROM {GOOD_TABLES[0]} LIMIT 50000", "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 413 @@ -292,7 +294,7 @@ def test_valid_sql_too_large(client: TestClient, service: str): def test_valid_no_service(client: TestClient, table: str, service: None): """Test that the API works if no `service` is provided""" params = {"table": table, "limit": 1, "no_cache": True} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() assert rv["data"]["features"], f'Service {service} found zero features in table {table}' @@ -307,7 +309,7 @@ def test_valid_timeout(client: TestClient, service: str): "service": service, "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 408 @@ -320,8 +322,8 @@ def test_same_response(client: TestClient, table: str): params = {"table": table, "limit": 1, "no_cache": True} ago_params = params | {"service": "ago"} carto_params = params | {"service": "carto"} - ago_response = client.get("/get", params=ago_params) - carto_response = client.get("/get", params=carto_params) + ago_response = client.get(f"{public_prefix}/get", params=ago_params) + carto_response = client.get(f"{public_prefix}/get", params=carto_params) assert ago_response.status_code == 200 and carto_response.status_code == 200 ago_json = ago_response.json() ago_properties = ago_json["data"]["features"][0]['properties'] @@ -360,7 +362,7 @@ def compare_dicts( @pytest.mark.parametrize("service", [None]) def test_invalid_nothing(client: TestClient, service: None): """Test that the API fails if no `sql` or `table` parameters passed""" - response = client.get("/get") + response = client.get(f"{public_prefix}/get") assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -369,7 +371,7 @@ def test_invalid_nothing(client: TestClient, service: None): def test_invalid_nothing2(client: TestClient, service: str): """Test that the API fails if no `sql` or `table` parameters passed""" params = {"service": service} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -379,7 +381,7 @@ def test_invalid_nothing2(client: TestClient, service: str): def test_invalid_table(client: TestClient, service: str): """Test that the API fails if an invalid `table` parameter is passed""" params = {"table": "bad_table", "service": service} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -394,7 +396,7 @@ def test_invalid_fields(client: TestClient, service: str): "fields": "badfield", "service": service, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -408,7 +410,7 @@ def test_invalid_where(client: TestClient, service: str): "where": "not_a_column <= 2", "service": service, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -423,7 +425,7 @@ def test_invalid_limit(client: TestClient, service: str, limit: str): "limit": limit, "service": service, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert ( response.status_code >= 400 and response.status_code <= 500 ) # Carto returns a 500 error here @@ -438,7 +440,7 @@ def test_invalid_count_only( ): """Test that the API fails if an invalid `count_only` table is passed""" params = {"table": "ASNTEHUSA", "count_only": "true", "service": service} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -448,7 +450,7 @@ def test_invalid_count_only( def test_invalid_sql(client: TestClient, service: str): """Test that the API fails if invalid `sql` parameter is passed""" params = {"sql": "SELECT * FROM ANSTEHUSANTH", "service": service} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -461,7 +463,7 @@ def test_invalid_sql_large_payload(client: TestClient, service: None): "sql": f"SELECT * FROM {GOOD_TABLES[0]}", "no_cache": True, } - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data @@ -472,7 +474,7 @@ def test_invalid_no_service(client: TestClient, service: None): """Test that the API fails if an invalid `table` parameter is passed and no `service` is selected""" params = {"table": "bad_table", "limit": 1} - response = client.get("/get", params=params) + response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 data = response.json() assert "errors" in data diff --git a/uv.lock b/uv.lock index 1836a73..8cf4342 100644 --- a/uv.lock +++ b/uv.lock @@ -603,7 +603,7 @@ wheels = [ [[package]] name = "oit-api-wrapper" -version = "0.1.1" +version = "0.2.0" source = { virtual = "." } dependencies = [ { name = "aiohttp", extra = ["speedups"] }, From 5379c991941e21ecc5de1fbd466d611ca4459834 Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Thu, 4 Jun 2026 19:55:29 +0000 Subject: [PATCH 3/8] Change no_cache to max_age for Carto --- app/apis/carto.py | 12 ++++++------ app/public.py | 14 ++++++++------ app/test_main.py | 32 ++++++++++++++++---------------- app/utils/utils_carto.py | 3 +++ 4 files changed, 33 insertions(+), 28 deletions(-) diff --git a/app/apis/carto.py b/app/apis/carto.py index b164bc8..5ba6adb 100644 --- a/app/apis/carto.py +++ b/app/apis/carto.py @@ -39,7 +39,7 @@ async def get_count( table: str | None, where: str | None, timeout: float, - no_cache: bool, + max_age: int, session: aiohttp.ClientSession, request: Request, **kwargs, @@ -54,8 +54,8 @@ async def get_count( query = query + q_where params = {"q": query.as_string()} headers = self.headers - if no_cache: - headers['cache-control'] = "max-age=0" + if max_age: + headers["cache-control"] = f"max-age={max_age}" async with session.get( self.base_url, params=params, headers=headers, timeout=timeout ) as response: @@ -91,7 +91,7 @@ async def get( out_sr: int | None, sql: str | None, timeout: float, - no_cache: bool, + max_age: int, session: aiohttp.ClientSession, request: Request, **kwargs, @@ -145,8 +145,8 @@ async def get( table_schema = None params = {"q": query.as_string()} headers = self.headers - if no_cache: - headers['cache-control'] = "max-age=0" + if max_age: + headers["cache-control"] = f"max-age={max_age}" async with session.get( self.base_url, params=params, headers=headers, timeout=timeout ) as response: diff --git a/app/public.py b/app/public.py index c0643de..96da34d 100644 --- a/app/public.py +++ b/app/public.py @@ -104,13 +104,15 @@ async def get_data( lt=300, ), ] = 30, - no_cache: Annotated[ - bool, + max_age: Annotated[ + int, Query( - description=f"""Request fresh results from downstream APIs, ignoring any HTTP caching. - {make_param_api_descriptions(api_manager, "no_cache")}""" + description=f"""Request fresh results if cached results are older than _max_age_ in seconds. 0 means request fresh results only, but a request will take longer and demand more server resources to complete. 31,536,000 seconds equals 365 days (default). Use this parameter if you are concerned that table data is incorrect. + {make_param_api_descriptions(api_manager, "max_age")}""", + ge=0, + le=31536000, ), - ] = False, + ] = 31536000, session: aiohttp.ClientSession = Depends(session_manager), ) -> ReturnJson | JSONResponse: """Use this endpoint to retrieve data from the available @@ -133,7 +135,7 @@ async def get_data( "sql": sql, "session": session, "timeout": timeout, - "no_cache": no_cache, + "max_age": max_age, "request": request, "schema_cache": schema_cache, "token": token, diff --git a/app/test_main.py b/app/test_main.py index 1f07737..cfae0da 100644 --- a/app/test_main.py +++ b/app/test_main.py @@ -49,7 +49,7 @@ def token() -> str: @pytest.mark.parametrize("service", api_manager.map_str_to_api.keys()) def test_valid(client: TestClient, service: str, table: str): """Test that each service works""" - params = {"table": table, "service": service, "no_cache": True} + params = {"table": table, "service": service, "max_age": 0} response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 @@ -67,7 +67,7 @@ def test_valid_private(client: TestClient, token: str, service: str, count_only: "limit": 5, "count_only": count_only, "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code <= 500 @@ -88,7 +88,7 @@ def test_valid_private(client: TestClient, token: str, service: str, count_only: def test_valid_private_no_interfere(client: TestClient, service: str, token: str): """Test that a private token doesn't interfere with other APIs""" table = GOOD_TABLES[0] - params = {"table": table, "limit": 5, "service": service, "no_cache": True} + params = {"table": table, "limit": 5, "service": service, "max_age": 0} response = client.get( f"{public_prefix}/get", params=params, headers={"Authorization": f"Bearer {token}"} ) @@ -106,7 +106,7 @@ def test_valid_fields(client: TestClient, service: str): "limit": 2, "fields": "objectid,document_id,document_type,display_date", "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 @@ -129,7 +129,7 @@ def test_valid_fields2(client: TestClient, service: str): "limit": 2, "fields": "addr_std", "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 @@ -147,7 +147,7 @@ def test_valid_where(client: TestClient, service: str): "table": table, "where": "objectid <= 2", "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) rv = response.json() @@ -169,7 +169,7 @@ def test_valid_where_parethesization(client: TestClient, service: str): "where": "objectid >= 1 OR objectid >= 3", "limit": LIMIT, "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) rv = response.json() @@ -189,7 +189,7 @@ def test_valid_where_parethesization(client: TestClient, service: str): def test_valid_limit_next(client: TestClient, service: str, table: str): """Test that the `limit` parameter works and that the `next` url works""" LIMIT = 2 - params = {"table": table, "limit": LIMIT, "service": service, "no_cache": True} + params = {"table": table, "limit": LIMIT, "service": service, "max_age": 0} response = client.get(f"{public_prefix}/get", params=params) rv = response.json() assert response.status_code == 200 @@ -224,7 +224,7 @@ def test_valid_count_only(client: TestClient, service: str): "count_only": "true", "where": "objectid <= 5", "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 @@ -241,7 +241,7 @@ def test_valid_srid(client: TestClient, service: str): "limit": 2, "out_sr": 4326, "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) rv = response.json() @@ -268,7 +268,7 @@ def test_valid_sql(client: TestClient, service: str): "limit": 3, # Should have no effect "sql": f"SELECT * FROM {table} LIMIT 5", "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 @@ -283,7 +283,7 @@ def test_valid_sql_too_large(client: TestClient, service: str): is too large to handle but smaller than a timeout""" params = { "sql": f"SELECT * FROM {GOOD_TABLES[0]} LIMIT 50000", - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 413 @@ -293,7 +293,7 @@ def test_valid_sql_too_large(client: TestClient, service: str): @pytest.mark.parametrize("service", [None]) def test_valid_no_service(client: TestClient, table: str, service: None): """Test that the API works if no `service` is provided""" - params = {"table": table, "limit": 1, "no_cache": True} + params = {"table": table, "limit": 1, "max_age": 0} response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 200 rv = response.json() @@ -307,7 +307,7 @@ def test_valid_timeout(client: TestClient, service: str): "table": GOOD_TABLES[0], "timeout": 0.001, "service": service, - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code == 408 @@ -319,7 +319,7 @@ def test_valid_timeout(client: TestClient, service: str): @pytest.mark.parametrize("table", GOOD_TABLES) def test_same_response(client: TestClient, table: str): """Test that the API timeout parameter returns the correct error code""" - params = {"table": table, "limit": 1, "no_cache": True} + params = {"table": table, "limit": 1, "max_age": 0} ago_params = params | {"service": "ago"} carto_params = params | {"service": "carto"} ago_response = client.get(f"{public_prefix}/get", params=ago_params) @@ -461,7 +461,7 @@ def test_invalid_sql_large_payload(client: TestClient, service: None): """Test that the API fails if too large of a dataset is requsted""" params = { "sql": f"SELECT * FROM {GOOD_TABLES[0]}", - "no_cache": True, + "max_age": 0, } response = client.get(f"{public_prefix}/get", params=params) assert response.status_code >= 400 and response.status_code < 500 diff --git a/app/utils/utils_carto.py b/app/utils/utils_carto.py index c37c390..8eee80a 100644 --- a/app/utils/utils_carto.py +++ b/app/utils/utils_carto.py @@ -1,3 +1,6 @@ +# Requires PostGIS version >= 3.5 +# As of 2026-05, Carto V2/V3 both use 3.6 +# while Databridge-V2 uses version 3.4 FULL_QUERY = """ WITH subq as ( {subq} From 5319af43cf7dbf2b0087f3c8de15fe2845a15f0c Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Thu, 4 Jun 2026 19:56:08 +0000 Subject: [PATCH 4/8] Code improvement --- app/apis/ago.py | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/app/apis/ago.py b/app/apis/ago.py index 73c713e..46e2eb5 100644 --- a/app/apis/ago.py +++ b/app/apis/ago.py @@ -131,7 +131,7 @@ async def normalize_rv( if "error" not in data: records = data["features"] records = self.harmonize_timestamp_fields(records, table_schema) - gjfc = GeoJsonFeatureCollection(features=records) + gjfc = GeoJsonFeatureCollection(**data) meta.record_count = len(gjfc.features) try: data["properties"]["exceededTransferLimit"] @@ -139,7 +139,6 @@ async def normalize_rv( links.next = next_url except KeyError: pass - gjfc = GeoJsonFeatureCollection(**data) rv = ReturnJson(data=gjfc, links=links, meta=meta) return rv else: From 8201f6709f840445db3518a3e374ec33b11228df Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Thu, 4 Jun 2026 19:56:40 +0000 Subject: [PATCH 5/8] Begin adding Databridge via PostgREST --- app/apis/databridge.py | 156 +++++++++++++++++++++++++++++++++++++++++ app/utils/utils.py | 3 + 2 files changed, 159 insertions(+) create mode 100644 app/apis/databridge.py diff --git a/app/apis/databridge.py b/app/apis/databridge.py new file mode 100644 index 0000000..5c3cc29 --- /dev/null +++ b/app/apis/databridge.py @@ -0,0 +1,156 @@ +import aiohttp +import os +import datetime as dt +from fastapi import Request +from psycopg import sql as psql # Redefine to allow "sql" as a query parameter +from .abstract import AbstractWorker, check_fields_valid +from .models import ( + ReturnJson, + Meta, + Links, + Error, + GeoJsonFeatureCollection, + GeoJsonFeature, + TableSchema, +) + + +class Databridge(AbstractWorker): + def __init__(self): + self.name = "Databridge-Public PostgREST API" + self.base_url = "https://postgrest-public-dev.citygeo.phila.city" + + async def get_count( + self, + table: str | None, + where: str | None, + timeout: float, + session: aiohttp.ClientSession, + request: Request, + **kwargs, + ) -> ReturnJson: + url = f'{self.base_url}/{table}' + headers = {"prefer": "count=exact"} + async with session.head(url, headers=headers, timeout=timeout) as response: + return await self.normalize_rv_count(request, response) + + async def normalize_rv_count( + self, request: Request, response: aiohttp.ClientResponse + ) -> ReturnJson: + links = Links(self=str(request.url)) + meta = Meta(service=self.name, service_url=str(response.url)) + if response.ok: + meta.records_total = response.headers.get("Content-Range").split("/")[1] + rv = ReturnJson(links=links, meta=meta) + return rv + else: + error = Error( + code=response.status, + title=f"{self.name} Error", + detail=response.reason, + ) + rv = ReturnJson(errors=[error], links=links, meta=meta) + return rv + + # Do not remove any unused parameters as they are crucial to the documentation + async def get( + self, + table: str | None, + fields: str | None, + where: str | None, + limit: int | None, + count_only: bool, + out_sr: int | None, + sql: str | None, + timeout: float, + session: aiohttp.ClientSession, + request: Request, + **kwargs, + ) -> ReturnJson: + schema_cache = kwargs["schema_cache"] + table_schema = schema_cache.retrieve_table_schema(table) + geom_column = table_schema._api_geom_column + valid_fields = table_schema._api_valid_fields + + url = f'{self.base_url}/{table}' + params = {} + + if not fields: + fields = ", ".join([field for field in valid_fields]) + else: + field_list = [field.strip() for field in fields.split(",")] + check_fields_valid(field_list, valid_fields, table) + if geom_column: + fields = f'{geom_column}, ' + fields + fields = "objectid, " + fields + params = {"select": fields} + headers = {"prefer": "count=exact"} + + if limit: + params["limit"] = limit + async with session.get( + url, params=params, headers=headers, timeout=timeout + ) as response: + return await self.normalize_rv(request, response, table_schema) + + async def normalize_rv( + self, + request: Request, + response: aiohttp.ClientResponse, + table_schema: TableSchema | None, + ) -> ReturnJson: + links = Links(self=str(request.url)) + meta = Meta(service=self.name, service_url=str(response.url)) + # Now how we gonna transform this to geojson? Should we make database views for that instead? + data = await response.json() + if response.ok: + geojsons = [] + for record in data["rows"]: + geom_column = table_schema._api_geom_column + if geom_column: + geojson = GeoJsonFeature( + id=record.pop("objectid"), + properties=record, + geometry=record.pop(geom_column), + ) + else: + geojson = GeoJsonFeature( + id=record.pop("objectid"), + properties=record, + ) + geojsons.append(geojson) + gjfc = GeoJsonFeatureCollection(features=geojsons) + + meta.record_count = len(gjfc.features) + next_url = self.create_next_url(gjfc.features, request) + links.next = next_url + rv = ReturnJson(data=gjfc, links=links, meta=meta) + return rv + else: + error = Error( + code=response.status, + title=f"{self.name} Error", + detail=data["error"], + ) + rv = ReturnJson(errors=[error], links=links, meta=meta) + return rv + + def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSchema) -> list[dict]: + """Return a consistent representation of timestamp fields. AGO returns + timestamp fields as milliseconds since the epoch + + Args: + records (list[dict]): Data records + table_schema (TableSchema): TableSchema + + Returns: + list[dict]: Updated records + """ + for record in records: + for field in record["properties"]: + if field in table_schema._api_timestamp_fields: + if record["properties"][field]: + record["properties"][field] = dt.datetime.fromtimestamp( + record["properties"][field] / 1000 + ) + return records \ No newline at end of file diff --git a/app/utils/utils.py b/app/utils/utils.py index a511a0a..e83f7f4 100644 --- a/app/utils/utils.py +++ b/app/utils/utils.py @@ -13,6 +13,7 @@ from ..apis.abstract import AbstractWorker from ..apis.ago import Ago from ..apis.carto import Carto +from ..apis.databridge import Databridge from .models import ReturnJson, TableSchema @@ -251,6 +252,7 @@ class Service(str, Enum): AGO = "ago" CARTO = "carto" + DATABRIDGE = "databridge" class Api_Manager: @@ -263,6 +265,7 @@ def __init__(self): self.map_str_to_api: dict[str, AbstractWorker] = { "ago": Ago(), "carto": Carto(), + "databridge": Databridge(), } # This is the initial order searched if no API is specified, and is the query param the user must submit self.map_api_to_params: dict[AbstractWorker, list[str]] = {} self.api_priority_queue: list[AbstractWorker] = [] From c551a30a841e73bdd26a6affbfa3fb3c305dc415 Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Thu, 11 Jun 2026 20:45:53 +0000 Subject: [PATCH 6/8] Remove improper assignment; deactivate Databridge API for now --- app/apis/abstract.py | 9 ++------- app/apis/ago.py | 10 +++------- app/apis/carto.py | 12 +++--------- app/utils/utils.py | 2 +- 4 files changed, 9 insertions(+), 24 deletions(-) diff --git a/app/apis/abstract.py b/app/apis/abstract.py index ee5a429..e5437e2 100644 --- a/app/apis/abstract.py +++ b/app/apis/abstract.py @@ -44,18 +44,13 @@ async def normalize_rv(self) -> ReturnJson: raise NotImplementedError @abstractmethod - def harmonize_timestamp_fields( - self, records: list[dict], table_schema: TableSchema - ) -> list[dict]: - """Return a consistent representation of timestamp fields. Implementation + def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSchema): + """Coerce to a consistent representation of timestamp fields. Implementation is API-specific Args: records (list[dict]): Data records table_schema (TableSchema): TableSchema - - Returns: - list[dict]: Updated records """ raise NotImplementedError diff --git a/app/apis/ago.py b/app/apis/ago.py index 46e2eb5..9c593dd 100644 --- a/app/apis/ago.py +++ b/app/apis/ago.py @@ -130,7 +130,7 @@ async def normalize_rv( data = await response.json() if "error" not in data: records = data["features"] - records = self.harmonize_timestamp_fields(records, table_schema) + self.harmonize_timestamp_fields(records, table_schema) gjfc = GeoJsonFeatureCollection(**data) meta.record_count = len(gjfc.features) try: @@ -162,16 +162,13 @@ async def normalize_rv( rv = ReturnJson(errors=[error], links=links, meta=meta) return rv - def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSchema) -> list[dict]: - """Return a consistent representation of timestamp fields. AGO returns + def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSchema): + """Coerce to a consistent representation of timestamp fields. AGO returns timestamp fields as milliseconds since the epoch Args: records (list[dict]): Data records table_schema (TableSchema): TableSchema - - Returns: - list[dict]: Updated records """ for record in records: for field in record["properties"]: @@ -180,7 +177,6 @@ def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSch record["properties"][field] = dt.datetime.fromtimestamp( record["properties"][field] / 1000 ) - return records def mask_service_url( self, request: Request, response: aiohttp.ClientResponse diff --git a/app/apis/carto.py b/app/apis/carto.py index 5ba6adb..fa6ee77 100644 --- a/app/apis/carto.py +++ b/app/apis/carto.py @@ -174,7 +174,7 @@ async def normalize_rv( if response.ok: if not sql: records = data["rows"][0]["jsonb_build_object"]['features'] - records = self.harmonize_timestamp_fields(records, table_schema) + self.harmonize_timestamp_fields(records, table_schema) gjfc = GeoJsonFeatureCollection( type="FeatureCollection", features=records ) @@ -200,18 +200,13 @@ async def normalize_rv( rv = ReturnJson(errors=[error], links=links, meta=meta) return rv - def harmonize_timestamp_fields( - self, records: list[dict], table_schema: TableSchema - ) -> list[dict]: - """Return a consistent representation of timestamp fields. Carto returns + def harmonize_timestamp_fields(self, records: list[dict], table_schema: TableSchema): + """Coerce to a consistent representation of timestamp fields. Carto returns timestamps in ISO format Args: records (list[dict]): Data records table_schema (TableSchema): TableSchema - - Returns: - list[dict]: Updated records """ for record in records: for field in record["properties"]: @@ -220,4 +215,3 @@ def harmonize_timestamp_fields( record["properties"][field] = dt.datetime.fromisoformat( record["properties"][field] ) - return records diff --git a/app/utils/utils.py b/app/utils/utils.py index e83f7f4..809990c 100644 --- a/app/utils/utils.py +++ b/app/utils/utils.py @@ -265,7 +265,7 @@ def __init__(self): self.map_str_to_api: dict[str, AbstractWorker] = { "ago": Ago(), "carto": Carto(), - "databridge": Databridge(), + # "databridge": Databridge(), } # This is the initial order searched if no API is specified, and is the query param the user must submit self.map_api_to_params: dict[AbstractWorker, list[str]] = {} self.api_priority_queue: list[AbstractWorker] = [] From b5c1a918ff8883d2dd859a6e5f78d1e81033eaa3 Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Fri, 12 Jun 2026 15:32:07 +0000 Subject: [PATCH 7/8] Change missing schema to 404 on interanl route --- app/internal.py | 9 +++++++-- 1 file changed, 7 insertions(+), 2 deletions(-) diff --git a/app/internal.py b/app/internal.py index b5c6c69..323e2fb 100644 --- a/app/internal.py +++ b/app/internal.py @@ -1,6 +1,7 @@ from typing import Annotated from fastapi import APIRouter +from fastapi.exceptions import HTTPException from .utils.utils import schema_cache @@ -36,8 +37,12 @@ async def get_schemas( table = table.lower() try: rv['schema'] = {table: schema_cache.cache[table]} - except KeyError: - rv['schema'] = {table: "Schema Not Found"} + except KeyError: + raise HTTPException( + status_code=404, + detail=f"The requested schema '{table}' was not found", + headers={"title": "Not Found"}, + ) else: rv["schemas"] = schema_cache.cache From 56baac009b740bfe1fef9016f5b423ce38566c6b Mon Sep 17 00:00:00 2001 From: James Midkiff Date: Fri, 12 Jun 2026 15:35:46 +0000 Subject: [PATCH 8/8] Atomic commits are important --- app/apis/databridge.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/app/apis/databridge.py b/app/apis/databridge.py index 5c3cc29..da5d476 100644 --- a/app/apis/databridge.py +++ b/app/apis/databridge.py @@ -4,7 +4,7 @@ from fastapi import Request from psycopg import sql as psql # Redefine to allow "sql" as a query parameter from .abstract import AbstractWorker, check_fields_valid -from .models import ( +from ..utils.models import ( ReturnJson, Meta, Links,