diff --git a/app/platforms/implementations/ogc_api_process.py b/app/platforms/implementations/ogc_api_process.py index 546f74f..a0b84da 100644 --- a/app/platforms/implementations/ogc_api_process.py +++ b/app/platforms/implementations/ogc_api_process.py @@ -27,8 +27,10 @@ "https://schemas.stacspec.org/v1.0.0/collection-spec/json-schema/collection.json" ) -GEOJSON_FEATURECOLLECTION_SCHEMA = "https://schemas.opengis.net/ogcapi/" \ +GEOJSON_FEATURECOLLECTION_SCHEMA = ( + "https://schemas.opengis.net/ogcapi/" "features/part1/1.0/openapi/schemas/featureCollectionGeoJSON.yaml" +) @register_platform(ProcessTypeEnum.OGC_API_PROCESS) @@ -292,6 +294,102 @@ def _map_ogcapi_status(self, ogcapi_status: StatusCode) -> ProcessingStatusEnum: logger.warning(f"Mapping of unknown OGC API status: {ogcapi_status}") return ProcessingStatusEnum.UNKNOWN + def _extract_download_link_from_asset(self, asset: dict) -> str | None: + """ + Extracts the download link from an asset dictionary. Checks if the + `href` field is present and contains an HTTPS URL. If this is not + the case, look for an alternative link in `alternate` + field. If no valid link is found, return None. + + Args: + asset (dict): The asset dictionary. + + Returns: + str | None: The download link if available, otherwise None. + """ + refs = [ + asset.get("href"), + asset.get("alternate", {}).get("https", {}).get("href"), + ] + for href in refs: + if href and href.startswith("https://"): + return href + return None + + def _generate_signed_url(self, href: str, user_token: str) -> str: + """ + Generate a signed URL for the given href using the provided user token. + This is a placeholder implementation and should be replaced with + actual logic to generate signed URLs. + + Args: + href (str): The original href. + user_token (str): The user token to be used for signing. + """ + # TODO - Add implementation + logger.debug(f"Generating signed URL for href: {href} with user token.") + return href + + def _update_assets_hrefs(self, assets: dict, user_token: str) -> dict: + """ + Update the hrefs of the assets to be HTTPS URLs. If the current + href is an S3 URL, the code will look into `alternate` links to + find an HTTPS URL. If no HTTPS URL is found, the original href + will be kept. + """ + updated_assets = {} + for asset_name, asset in assets.items(): + updated_asset = asset.copy() + href = self._extract_download_link_from_asset(asset) + if not href: + logger.warning( + "No valid HTTPS download link found for asset " + f"'{asset_name}'. Keeping original href. " + "Skipping asset..." + ) + else: + href = self._generate_signed_url(href, user_token) + updated_asset["href"] = href + updated_assets[asset_name] = updated_asset + + return updated_assets + + def _extract_assets_from_feature_collection( + self, feature_collection: dict, *, result_name: str, user_token: str + ) -> tuple[dict, Collection | None]: + assets: dict = {} + for feature in feature_collection.get("features", []): + feature_assets = feature.get("assets") + if isinstance(feature_assets, dict): + assets.update(feature_assets) + continue + + # Some providers expose assets through an item link + # instead of inlining them in the feature. + for link in feature.get("links", []): + if "collection" == link.get("rel") and link.get("href"): + collection_link: str = link.get("href") + logger.debug( + f"GeoJSON FeatureCollection results: '{result_name}' " + f"points to a valid collection URL: {collection_link}" + ) + + response: HTTPXResponse = http_get( + collection_link, + follow_redirects=True, + headers={"Authorization": f"Bearer {user_token}"}, + ) + response.raise_for_status() + collection = Collection.model_validate(response.json()) + collection_assets = collection.assets or {} + logger.debug( + f"Extracted collection '{collection.id}' " + f"with assets: {list(collection_assets.keys())}" + ) + assets.update(collection.to_dict().get("assets", {})) + return assets, collection + return assets, None + async def get_job_status( self, user_token: str, job_id: str, details: ServiceDetails ) -> ProcessingStatusEnum: @@ -314,6 +412,7 @@ async def get_job_status( async def get_job_results( self, user_token: str, job_id: str, details: ServiceDetails ) -> Collection: + assets: dict = {} logger.debug(f"Fetching job result for opfenEO job with ID {job_id}") logger.debug("Exchanging user token for OGC API Process execution...") @@ -352,17 +451,18 @@ async def get_job_results( and qualified_value.var_schema.actual_instance ): schema_reference = qualified_value.var_schema.actual_instance + media_type = getattr(qualified_value, "media_type", None) logger.debug( f"Processing result\n* Name: '{result_name}'\n" - "* media type: {qualified_value.media_type}\n" - "* Python type: {type(qualified_value.value)}\n" - "* schema {qualified_value.var_schema}..." + f"* media type: {media_type}\n" + f"* Python type: {type(qualified_value.value)}\n" + f"* schema {qualified_value.var_schema}..." ) if not isinstance(schema_reference, str): logger.warning( f"Processing result name: '{result_name}' can not be processed, " - "schema of type {type(schema_reference)} not recognized" + f"schema of type {type(schema_reference)} not recognized" ) continue @@ -375,29 +475,24 @@ async def get_job_results( logger.success( f"GeoJSON FeatureCollection found in results: '{result_name}'" ) - feature_collection = qualified_value.value.oneof_schema_2_validator or {} - for feature in feature_collection.get("features", []): - for link in feature.get("links", []): - if "collection" == link.get("rel") and link.get("href"): - collection_link: str = link.get("href") - logger.success( - f"GeoJSON FeatureCollection results: '{result_name}' " - "points to a valid collection URL: {collection_link}" - ) - - response: HTTPXResponse = http_get( - collection_link, - follow_redirects=True, - headers={ - "Authorization": f"Bearer {exchanged_token}" - }, - ) - response.raise_for_status() - return Collection.model_validate(response.json()) + feature_collection = ( + qualified_value.value.oneof_schema_2_validator or {} + ) + download_token = exchanged_token or user_token + extracted_assets, linked_collection = ( + self._extract_assets_from_feature_collection( + feature_collection, + result_name=result_name, + user_token=download_token, + ) + ) + if linked_collection: + return linked_collection + assets.update(extracted_assets) else: logger.warning( f"Processing result: '{result_name}' can not be processed, " - "schema {schema_reference} not yet managed" + f"schema {schema_reference} not yet managed" ) # result not found, send back an empty collection @@ -417,6 +512,7 @@ async def get_job_results( spatial=SpatialExtent(bbox=[(-180.0, -90.0, 180.0, 90.0)]), temporal=TimeInterval(interval=[[None, None]]), ), + assets=self._update_assets_hrefs(assets, exchanged_token), ) async def get_service_parameters(