Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
146 changes: 121 additions & 25 deletions app/platforms/implementations/ogc_api_process.py
Original file line number Diff line number Diff line change
Expand Up @@ -27,8 +27,10 @@
"https://schemas.stacspec.org/v1.0.0/collection-spec/json-schema/collection.json"
)

GEOJSON_FEATURECOLLECTION_SCHEMA = "https://schemas.opengis.net/ogcapi/" \
GEOJSON_FEATURECOLLECTION_SCHEMA = (
"https://schemas.opengis.net/ogcapi/"
"features/part1/1.0/openapi/schemas/featureCollectionGeoJSON.yaml"
)


@register_platform(ProcessTypeEnum.OGC_API_PROCESS)
Expand Down Expand Up @@ -292,6 +294,102 @@ def _map_ogcapi_status(self, ogcapi_status: StatusCode) -> ProcessingStatusEnum:
logger.warning(f"Mapping of unknown OGC API status: {ogcapi_status}")
return ProcessingStatusEnum.UNKNOWN

def _extract_download_link_from_asset(self, asset: dict) -> str | None:
"""
Extracts the download link from an asset dictionary. Checks if the
`href` field is present and contains an HTTPS URL. If this is not
the case, look for an alternative link in `alternate`
field. If no valid link is found, return None.

Args:
asset (dict): The asset dictionary.

Returns:
str | None: The download link if available, otherwise None.
"""
refs = [
asset.get("href"),
asset.get("alternate", {}).get("https", {}).get("href"),
]
for href in refs:
if href and href.startswith("https://"):
return href
return None

def _generate_signed_url(self, href: str, user_token: str) -> str:
"""
Generate a signed URL for the given href using the provided user token.
This is a placeholder implementation and should be replaced with
actual logic to generate signed URLs.

Args:
href (str): The original href.
user_token (str): The user token to be used for signing.
"""
# TODO - Add implementation
logger.debug(f"Generating signed URL for href: {href} with user token.")
return href

def _update_assets_hrefs(self, assets: dict, user_token: str) -> dict:
"""
Update the hrefs of the assets to be HTTPS URLs. If the current
href is an S3 URL, the code will look into `alternate` links to
find an HTTPS URL. If no HTTPS URL is found, the original href
will be kept.
"""
updated_assets = {}
for asset_name, asset in assets.items():
updated_asset = asset.copy()
href = self._extract_download_link_from_asset(asset)
if not href:
logger.warning(
"No valid HTTPS download link found for asset "
f"'{asset_name}'. Keeping original href. "
"Skipping asset..."
)
else:
href = self._generate_signed_url(href, user_token)
updated_asset["href"] = href
updated_assets[asset_name] = updated_asset

return updated_assets

def _extract_assets_from_feature_collection(
self, feature_collection: dict, *, result_name: str, user_token: str
) -> tuple[dict, Collection | None]:
assets: dict = {}
for feature in feature_collection.get("features", []):
feature_assets = feature.get("assets")
if isinstance(feature_assets, dict):
assets.update(feature_assets)
continue

# Some providers expose assets through an item link
# instead of inlining them in the feature.
for link in feature.get("links", []):
if "collection" == link.get("rel") and link.get("href"):
collection_link: str = link.get("href")
logger.debug(
f"GeoJSON FeatureCollection results: '{result_name}' "
f"points to a valid collection URL: {collection_link}"
)

response: HTTPXResponse = http_get(
collection_link,
follow_redirects=True,
headers={"Authorization": f"Bearer {user_token}"},
)
response.raise_for_status()
collection = Collection.model_validate(response.json())
collection_assets = collection.assets or {}
logger.debug(
f"Extracted collection '{collection.id}' "
f"with assets: {list(collection_assets.keys())}"
)
assets.update(collection.to_dict().get("assets", {}))
return assets, collection
return assets, None

async def get_job_status(
self, user_token: str, job_id: str, details: ServiceDetails
) -> ProcessingStatusEnum:
Expand All @@ -314,6 +412,7 @@ async def get_job_status(
async def get_job_results(
self, user_token: str, job_id: str, details: ServiceDetails
) -> Collection:
assets: dict = {}
logger.debug(f"Fetching job result for opfenEO job with ID {job_id}")

logger.debug("Exchanging user token for OGC API Process execution...")
Expand Down Expand Up @@ -352,17 +451,18 @@ async def get_job_results(
and qualified_value.var_schema.actual_instance
):
schema_reference = qualified_value.var_schema.actual_instance
media_type = getattr(qualified_value, "media_type", None)
logger.debug(
f"Processing result\n* Name: '{result_name}'\n"
"* media type: {qualified_value.media_type}\n"
"* Python type: {type(qualified_value.value)}\n"
"* schema {qualified_value.var_schema}..."
f"* media type: {media_type}\n"
f"* Python type: {type(qualified_value.value)}\n"
f"* schema {qualified_value.var_schema}..."
)

if not isinstance(schema_reference, str):
logger.warning(
f"Processing result name: '{result_name}' can not be processed, "
"schema of type {type(schema_reference)} not recognized"
f"schema of type {type(schema_reference)} not recognized"
)
continue

Expand All @@ -375,29 +475,24 @@ async def get_job_results(
logger.success(
f"GeoJSON FeatureCollection found in results: '{result_name}'"
)
feature_collection = qualified_value.value.oneof_schema_2_validator or {}
for feature in feature_collection.get("features", []):
for link in feature.get("links", []):
if "collection" == link.get("rel") and link.get("href"):
collection_link: str = link.get("href")
logger.success(
f"GeoJSON FeatureCollection results: '{result_name}' "
"points to a valid collection URL: {collection_link}"
)

response: HTTPXResponse = http_get(
collection_link,
follow_redirects=True,
headers={
"Authorization": f"Bearer {exchanged_token}"
},
)
response.raise_for_status()
return Collection.model_validate(response.json())
feature_collection = (
qualified_value.value.oneof_schema_2_validator or {}
)
download_token = exchanged_token or user_token
extracted_assets, linked_collection = (
self._extract_assets_from_feature_collection(
feature_collection,
result_name=result_name,
user_token=download_token,
)
)
if linked_collection:
return linked_collection
assets.update(extracted_assets)
else:
logger.warning(
f"Processing result: '{result_name}' can not be processed, "
"schema {schema_reference} not yet managed"
f"schema {schema_reference} not yet managed"
)

# result not found, send back an empty collection
Expand All @@ -417,6 +512,7 @@ async def get_job_results(
spatial=SpatialExtent(bbox=[(-180.0, -90.0, 180.0, 90.0)]),
temporal=TimeInterval(interval=[[None, None]]),
),
assets=self._update_assets_hrefs(assets, exchanged_token),
)

async def get_service_parameters(
Expand Down
Loading