diff --git a/apigw-lambda-opensearch-serverless-nextgen/.cfnlintrc b/apigw-lambda-opensearch-serverless-nextgen/.cfnlintrc new file mode 100644 index 000000000..641a4b26f --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/.cfnlintrc @@ -0,0 +1,5 @@ +configure_rules: + E3030: + # python3.14 is valid but not yet in cfn-lint's schema + exceptions: + - python3.14 diff --git a/apigw-lambda-opensearch-serverless-nextgen/.checkov.yaml b/apigw-lambda-opensearch-serverless-nextgen/.checkov.yaml new file mode 100644 index 000000000..a3176031b --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/.checkov.yaml @@ -0,0 +1,10 @@ +# Checkov suppressions for sample application +# These controls are intentionally skipped as this is a demonstration/sample project. +# Production workloads should implement these controls. + +skip-checks: + - CKV_AWS_115 # Lambda reserved concurrency — not required for sample application + - CKV_AWS_116 # Lambda DLQ — not required for sample application with synchronous API handlers + - CKV_AWS_117 # Lambda in VPC — not required for sample application + - CKV_AWS_158 # CloudWatch LogGroup KMS encryption — not required for sample application log data + - CKV_AWS_173 # Lambda env var encryption — no secrets stored, only configuration values in sample application diff --git a/apigw-lambda-opensearch-serverless-nextgen/.gitignore b/apigw-lambda-opensearch-serverless-nextgen/.gitignore new file mode 100644 index 000000000..75700a25c --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/.gitignore @@ -0,0 +1,49 @@ +# AWS SAM +.aws-sam/ +packaged.yaml + +# Python +__pycache__/ +*.py[cod] +*$py.class +*.so +*.egg-info/ +*.egg +dist/ +build/ +.eggs/ + +# Virtual environments +.venv/ +venv/ +ENV/ + +# IDE +.idea/ +.vscode/ +*.swp +*.swo +*~ + +# OS +.DS_Store +Thumbs.db + +# Environment variables +.env +.env.local + +# Coverage & testing +htmlcov/ +.coverage +.coverage.* +.pytest_cache/ +.mypy_cache/ + +# Distribution +*.whl + +# Local Config +mise.local.toml +.kiro +blog/ \ No newline at end of file diff --git a/apigw-lambda-opensearch-serverless-nextgen/README.md b/apigw-lambda-opensearch-serverless-nextgen/README.md new file mode 100644 index 000000000..ad106b78d --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/README.md @@ -0,0 +1,176 @@ +# Amazon API Gateway to AWS Lambda to Amazon OpenSearch Serverless NextGen + +This pattern deploys a serverless semantic search API using Amazon API Gateway, AWS Lambda, and Amazon OpenSearch Serverless with the NextGen architecture. All three services operate on a pay-per-use model with no minimum baseline cost, meaning the entire stack incurs zero compute charges when idle. You pay only for storage of indexed data. + +Learn more about this pattern at Serverless Land Patterns: https://serverlessland.com/patterns/apigw-lambda-opensearch-serverless-nextgen + +Important: this application uses various AWS services and there are costs associated with these services after the Free Tier usage - please see the [AWS Pricing page](https://aws.amazon.com/pricing/) for details. You are responsible for any AWS costs incurred. No warranty is implied in this example. + +## Requirements + +* [Create an AWS account](https://portal.aws.amazon.com/gp/aws/developer/registration/index.html) if you do not already have one and log in. The IAM user that you use must have sufficient permissions to make necessary AWS service calls and manage AWS resources. +* [AWS CLI installed and configured](https://docs.aws.amazon.com/cli/latest/userguide/install-cliv2.html) +* [Git installed](https://git-scm.com/book/en/v2/Getting-Started-Installing-Git) +* [AWS SAM CLI](https://docs.aws.amazon.com/serverless-application-model/latest/developerguide/serverless-sam-cli-install.html) installed +* [Python 3.14](https://www.python.org/downloads/) + +**Region availability:** This pattern uses OpenSearch Serverless AI connectors and hybrid search, which are available in the following regions: US East (N. Virginia), US East (Ohio), US West (Oregon), Asia Pacific (Mumbai), Asia Pacific (Singapore), Asia Pacific (Sydney), Asia Pacific (Tokyo), Europe (Frankfurt), Europe (Ireland), Europe (Spain), and Europe (Stockholm). See the [launch announcement](https://aws.amazon.com/about-aws/whats-new/2025/08/amazon-opensearch-serverless-ai-connectors-hybrid-search/) for details. + + +## Deployment Instructions + +1. Create a new directory, navigate to that directory in a terminal and clone the GitHub repository: + ``` + git clone https://github.com/aws-samples/serverless-patterns + ``` +1. Change directory to the pattern directory: + ``` + cd serverless-patterns/apigw-lambda-opensearch-serverless-nextgen + ``` +1. Build the application: + ``` + sam build + ``` +1. Deploy the application: + ``` + sam deploy --guided + ``` +1. During the prompts: + * Enter a stack name + * Enter the desired AWS Region + * Accept the default parameter values or customize them + * Allow SAM CLI to create IAM roles with the required permissions + + Once you have run `sam deploy --guided` mode once and saved arguments to a configuration file (samconfig.toml), you can use `sam deploy` in future to use these defaults. + + +## How it works + +![Architecture diagram](images/architecture.png) + +Figure 1 - Architecture + +This pattern creates a REST API backed by three AWS Lambda functions that interact with an OpenSearch Serverless NextGen collection configured for vector search: + +1. The client sends an HTTPS request (SigV4-signed) to Amazon API Gateway with IAM authorization. +2. API Gateway routes the request to the appropriate Lambda function based on path: Search (`POST /search`), Index (`POST /index`), or Delete (`DELETE /documents`). +3. The Lambda function calls the OpenSearch Serverless collection — performing a neural/lexical/hybrid query, bulk indexing via the ingest pipeline, or a bulk delete. +4. For semantic and hybrid search, and during document indexing, the OpenSearch ML model calls Amazon Bedrock (Amazon Titan Text Embeddings V2) to generate 1024-dimensional embeddings server-side. +5. For hybrid search, the search pipeline applies min-max score normalization to combine BM25 (lexical) and k-NN (semantic) results with configurable weights (0.3 lexical / 0.7 semantic). + +The OpenSearch collection lives inside a NextGen collection group, which enables scale-to-zero behavior. When idle, both indexing and search OCUs (OpenSearch Compute Units) drop to zero. When a request arrives, capacity provisions in approximately 10 seconds. Requests are queued (not dropped) during this window. + +The NextGen collection group is created using a Lambda-backed custom resource since CloudFormation doesn't yet natively support the `Generation` parameter. + +### Scale-to-zero in action + +The chart below shows the OCU (OpenSearch Compute Unit) metrics from CloudWatch during a test run: + +![CloudWatch metrics showing Search and Indexing OCUs scaling from zero, handling traffic, then returning to zero](images/search-acu-scaling.png) + +*Figure 2 — Two test runs separated by a period of no activity. Search OCUs scale 0 → 2 during queries, Indexing OCUs scale 0 → 1 during document ingestion. Both return to 0 after the idle timeout.* + +The pattern: +1. **Idle** — Both indexing and search OCUs sit at 0. No compute cost. +2. **Traffic arrives** — First request triggers provisioning (~10 seconds). Requests are queued during this window. +3. **Active** — OCUs scale up to match demand, up to the configured maximum. +4. **Traffic subsides** — After 10 minutes of no requests, OCUs scale back to 0. + +## Testing + +Install the test dependencies: + +```bash +python -m venv .venv +source .venv/bin/activate # On Windows: .venv\Scripts\activate +pip install -r tests/requirements.txt +``` + +### Unit tests + +Run the unit tests (no deployed stack or AWS credentials required): + +```bash +pytest tests/unit/ -v +``` + +### Integration tests + +The repository includes integration tests that exercise all three search modes against a 50-product outdoor equipment catalog: + +```bash +# Set your stack name and region +export STACK_NAME="your-stack-name" +export AWS_REGION="your-region" + +# Run integration tests (requires a deployed stack) +pytest tests/integration/ -v -s +``` + +The tests demonstrate semantic understanding: `"shoes for the beach"` matches "Summer Beach Sandals" (no keyword overlap), `"charging phone while camping"` matches "Solar Power Bank" (intent matching), and hybrid mode combines both signals for queries like `"waterproof bag for kayaking"` → "Dry Bag 20L". + +### Manual testing with awscurl + +Install the project dependencies (includes `awscurl`): + +```bash +pip install -r requirements.txt +``` + +Set your stack name and region (if not already set): + +```bash +STACK_NAME="your-stack-name" +AWS_REGION="your-region" +``` + +Index a document: + +```bash +awscurl --service execute-api --region $AWS_REGION -X POST \ + -H "Content-Type: application/json" \ + -d '{ + "documents": [{ + "id": "doc-1", + "title": "OpenSearch Serverless NextGen", + "content": "The next generation architecture scales to zero and provisions in seconds." + }] + }' \ + "$(aws cloudformation describe-stacks --stack-name $STACK_NAME --region $AWS_REGION --query 'Stacks[0].Outputs[?OutputKey==`IndexApiUrl`].OutputValue' --output text)" +``` + +Search for it: + +```bash +awscurl --service execute-api --region $AWS_REGION -X POST \ + -H "Content-Type: application/json" \ + -d '{"query": "serverless scaling", "mode": "hybrid"}' \ + "$(aws cloudformation describe-stacks --stack-name $STACK_NAME --region $AWS_REGION --query 'Stacks[0].Outputs[?OutputKey==`SearchApiUrl`].OutputValue' --output text)" +``` + +Delete a document: + +```bash +awscurl --service execute-api --region $AWS_REGION -X DELETE \ + -H "Content-Type: application/json" \ + -d '{"ids": ["doc-1"]}' \ + "$(aws cloudformation describe-stacks --stack-name $STACK_NAME --region $AWS_REGION --query 'Stacks[0].Outputs[?OutputKey==`DeleteApiUrl`].OutputValue' --output text)" +``` + +> **Note:** The first request after an idle period takes approximately 10 seconds while OpenSearch provisions compute from zero. Subsequent requests respond at normal latency. + +## Cleanup + +> **Warning:** This will permanently delete all indexed documents in the OpenSearch collection. Back up any data you need to retain before proceeding. + +1. Delete the stack: + ```bash + sam delete + ``` + + This removes all resources including the API Gateway, Lambda functions, OpenSearch collection, collection group, security policies, IAM roles, and CloudWatch log groups. + +---- +Copyright 2026 Amazon.com, Inc. or its affiliates. All Rights Reserved. + +SPDX-License-Identifier: MIT-0 diff --git a/apigw-lambda-opensearch-serverless-nextgen/example-pattern.json b/apigw-lambda-opensearch-serverless-nextgen/example-pattern.json new file mode 100644 index 000000000..1c3f1a9f5 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/example-pattern.json @@ -0,0 +1,68 @@ +{ + "title": "API Gateway to Lambda to OpenSearch Serverless NextGen", + "description": "Deploy a serverless semantic search API with zero baseline compute cost using Lambda and OpenSearch Serverless NextGen (scale-to-zero).", + "language": "Python", + "level": "300", + "framework": "SAM", + "introBox": { + "headline": "How it works", + "text": [ + "This pattern deploys an Amazon API Gateway REST API backed by three AWS Lambda functions that perform semantic, lexical, and hybrid search against an Amazon OpenSearch Serverless NextGen collection.", + "Amazon OpenSearch Serverless NextGen scales compute to zero when idle and provisions in approximately 10 seconds when traffic arrives. Combined with Lambda's own scale-to-zero, the entire stack incurs zero compute cost when not in use.", + "Embeddings are generated server-side by an OpenSearch ML model connected to Amazon Bedrock (Amazon Titan Text Embeddings V2) — Lambda functions send and receive plain text only.", + "A hybrid search pipeline applies min-max score normalization to combine BM25 (lexical) and k-NN (semantic) results with configurable weights." + ] + }, + "gitHub": { + "template": { + "repoURL": "https://github.com/aws-samples/serverless-patterns/tree/main/apigw-lambda-opensearch-serverless-nextgen", + "templateURL": "serverless-patterns/apigw-lambda-opensearch-serverless-nextgen", + "projectFolder": "apigw-lambda-opensearch-serverless-nextgen", + "templateFile": "template.yaml" + } + }, + "resources": { + "bullets": [ + { + "text": "Introducing the next generation of Amazon OpenSearch Serverless", + "link": "https://aws.amazon.com/blogs/aws/introducing-the-next-generation-of-amazon-opensearch-serverless-for-building-your-agentic-ai-applications/" + }, + { + "text": "Amazon OpenSearch Serverless", + "link": "https://docs.aws.amazon.com/opensearch-service/latest/developerguide/serverless.html" + }, + { + "text": "OpenSearch neural search", + "link": "https://opensearch.org/docs/latest/search-plugins/neural-search/" + }, + { + "text": "Amazon Titan Text Embeddings V2", + "link": "https://docs.aws.amazon.com/bedrock/latest/userguide/titan-embedding-models.html" + } + ] + }, + "deploy": { + "text": [ + "sam build", + "sam deploy --guided" + ] + }, + "testing": { + "text": [ + "See the GitHub repo for detailed testing instructions." + ] + }, + "cleanup": { + "text": [ + "Delete the stack: sam delete --stack-name STACK_NAME." + ] + }, + "authors": [ + { + "name": "Pete Davis", + "image": "https://github.com/pjdavis-aws.png", + "bio": "Senior Partner Solution Architect at AWS", + "linkedin": "peter-davis-2676585" + } + ] +} diff --git a/apigw-lambda-opensearch-serverless-nextgen/images/architecture.drawio b/apigw-lambda-opensearch-serverless-nextgen/images/architecture.drawio new file mode 100644 index 000000000..007d80c1b --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/images/architecture.drawio @@ -0,0 +1,55 @@ + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + + \ No newline at end of file diff --git a/apigw-lambda-opensearch-serverless-nextgen/images/architecture.png b/apigw-lambda-opensearch-serverless-nextgen/images/architecture.png new file mode 100644 index 000000000..94226166c Binary files /dev/null and b/apigw-lambda-opensearch-serverless-nextgen/images/architecture.png differ diff --git a/apigw-lambda-opensearch-serverless-nextgen/images/search-acu-scaling.png b/apigw-lambda-opensearch-serverless-nextgen/images/search-acu-scaling.png new file mode 100644 index 000000000..aa7a4b004 Binary files /dev/null and b/apigw-lambda-opensearch-serverless-nextgen/images/search-acu-scaling.png differ diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/app.py b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/app.py new file mode 100644 index 000000000..0f61aceec --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/app.py @@ -0,0 +1,97 @@ +"""Custom Resource handler for OpenSearch Serverless NextGen Collection Group. + +CloudFormation doesn't yet support the 'Generation' parameter on +AWS::OpenSearchServerless::CollectionGroup. This custom resource uses +the boto3 API directly to create/delete NextGen collection groups. + +Properties: + Name: Collection group name (3-32 chars, lowercase alphanumeric + hyphens) + Description: Optional description + MaxIndexingCapacityInOCU: Max indexing OCUs (default: 2) + MaxSearchCapacityInOCU: Max search OCUs (default: 2) +""" + +from __future__ import annotations + +import re + +import boto3 +from crhelper import CfnResource + +helper = CfnResource(json_logging=True, log_level="INFO", sleep_on_delete=30) +client = boto3.client("opensearchserverless") + + +@helper.create +def on_create(event, context): + """Create a NextGen collection group.""" + props = event["ResourceProperties"] + name = props["Name"] + description = props.get("Description", "") + max_indexing = int(props.get("MaxIndexingCapacityInOCU", 2)) + max_search = int(props.get("MaxSearchCapacityInOCU", 2)) + + resp = client.create_collection_group( + name=name, + standbyReplicas="ENABLED", + generation="NEXTGEN", + description=description, + capacityLimits={ + "maxIndexingCapacityInOCU": max_indexing, + "maxSearchCapacityInOCU": max_search, + }, + ) + + detail = resp["createCollectionGroupDetail"] + helper.Data["Id"] = detail["id"] + helper.Data["Arn"] = detail["arn"] + helper.Data["Name"] = detail["name"] + helper.Data["Generation"] = "NEXTGEN" + + return detail["id"] + + +@helper.update +def on_update(event, context): + """Update the collection group capacity limits.""" + props = event["ResourceProperties"] + physical_id = event["PhysicalResourceId"] + max_indexing = int(props.get("MaxIndexingCapacityInOCU", 2)) + max_search = int(props.get("MaxSearchCapacityInOCU", 2)) + description = props.get("Description", "") + + client.update_collection_group( + id=physical_id, + description=description, + capacityLimits={ + "maxIndexingCapacityInOCU": max_indexing, + "maxSearchCapacityInOCU": max_search, + }, + ) + + helper.Data["Id"] = physical_id + helper.Data["Name"] = props["Name"] + helper.Data["Generation"] = "NEXTGEN" + + return physical_id + + +@helper.delete +def on_delete(event, context): + """Delete the collection group.""" + physical_id = event["PhysicalResourceId"] + + # If the create failed, the physical ID will be the CFN logical ID + # rather than a valid collection group ID (lowercase alphanumeric, 3-40 chars) + if not re.match(r'^[a-z0-9]{3,40}$', physical_id): + return + + try: + client.delete_collection_group(id=physical_id) + except client.exceptions.ResourceNotFoundException: + pass + + +def lambda_handler(event, context): + """Main Lambda handler — delegates to crhelper.""" + helper(event, context) diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/requirements.txt new file mode 100644 index 000000000..0350e9b1c --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/nextgen_collection_group/requirements.txt @@ -0,0 +1,2 @@ +crhelper +boto3>=1.43.17 diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/app.py b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/app.py new file mode 100644 index 000000000..5a2ca39c0 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/app.py @@ -0,0 +1,260 @@ +"""Custom Resource handler to set up ML model and pipelines at deploy time. + +Creates: +- An Amazon Bedrock Titan V2 ML connector +- A registered and deployed ML model +- An ingest pipeline with text_embedding processor +- A hybrid search pipeline with min-max normalization + +Properties: + CollectionEndpoint: The AOSS collection endpoint URL + ModelRoleArn: IAM role ARN for OpenSearch ML to call Bedrock + EmbeddingModelId: Bedrock model ID (e.g. amazon.titan-embed-text-v2:0) + EmbeddingDimension: Vector dimension (e.g. 1024) +""" + +from __future__ import annotations + +import logging +import os +import time + +from opensearch_client import get_client +from crhelper import CfnResource + +logger = logging.getLogger(__name__) + +helper = CfnResource(json_logging=True, log_level="INFO", sleep_on_delete=30) + +SEARCH_PIPELINE_NAME = "hybrid-search-pipeline" +INGEST_PIPELINE_NAME = "embedding-ingest-pipeline" +REGION = os.environ.get("AWS_REGION") + + +def _wait_for_model_deployed(client, model_id, max_attempts=30, delay=5): + """Poll model status until deployed or timeout.""" + for _ in range(max_attempts): + resp = client.transport.perform_request( + "GET", f"/_plugins/_ml/models/{model_id}" + ) + status = resp.get("model_state") + if status == "DEPLOYED": + return True + if status in ("DEPLOY_FAILED", "REGISTER_FAILED"): + raise RuntimeError(f"Model {model_id} failed with state: {status}") + time.sleep(delay) + raise TimeoutError(f"Model {model_id} did not reach DEPLOYED state") + + +def _retry_with_backoff(func, max_attempts=6, initial_delay=10): + """Retry a function with exponential backoff for auth propagation.""" + delay = initial_delay + last_error = None + for attempt in range(max_attempts): + try: + return func() + except Exception as e: + last_error = e + error_str = str(e) + # Only retry on authorization/forbidden errors (policy propagation) + if "403" in error_str or "Forbidden" in error_str or "Authorization" in error_str: + logger.warning( + "Authorization error on attempt %d/%d, retrying in %ds: %s", + attempt + 1, max_attempts, delay, error_str, + ) + time.sleep(delay) + delay = min(delay * 2, 60) + else: + raise + raise last_error + + +def _setup_ml_and_pipelines(event): + """Core logic shared by create and update handlers.""" + props = event["ResourceProperties"] + endpoint = props["CollectionEndpoint"] + model_role_arn = props["ModelRoleArn"] + embedding_model_id = props.get("EmbeddingModelId", "amazon.titan-embed-text-v2:0") + embedding_dimension = int(props.get("EmbeddingDimension", "1024")) + + client = get_client(endpoint, REGION) + + # Step 1: Create the Bedrock connector (with retry for policy propagation) + connector_body = { + "name": "Amazon Bedrock Titan Embeddings V2", + "description": "Connector for Amazon Bedrock Titan Text Embeddings V2", + "version": "1.0", + "protocol": "aws_sigv4", + "credential": { + "roleArn": model_role_arn, + }, + "parameters": { + "region": REGION, + "service_name": "bedrock", + "model": embedding_model_id, + }, + "actions": [ + { + "action_type": "predict", + "method": "POST", + "headers": {"content-type": "application/json"}, + "url": f"https://bedrock-runtime.{REGION}.amazonaws.com/model/{embedding_model_id}/invoke", + "request_body": '{"inputText": "${parameters.inputText}", "dimensions": ' + + str(embedding_dimension) + + ', "normalize": true}', + "pre_process_function": "connector.pre_process.bedrock.embedding", + "post_process_function": "connector.post_process.bedrock.embedding", + } + ], + } + + connector_resp = _retry_with_backoff( + lambda: client.transport.perform_request( + "POST", "/_plugins/_ml/connectors/_create", body=connector_body + ) + ) + connector_id = connector_resp["connector_id"] + logger.info("Created connector: %s", connector_id) + + # Step 2: Register the model + register_body = { + "name": "Bedrock Titan Embed V2", + "function_name": "remote", + "description": "Titan Text Embeddings V2 via Bedrock connector", + "connector_id": connector_id, + } + + register_resp = client.transport.perform_request( + "POST", "/_plugins/_ml/models/_register", body=register_body + ) + model_id = register_resp["model_id"] + logger.info("Registered model: %s", model_id) + + # Step 3: Deploy the model + client.transport.perform_request( + "POST", f"/_plugins/_ml/models/{model_id}/_deploy" + ) + logger.info("Deploy initiated for model: %s", model_id) + + # Wait for deployment + _wait_for_model_deployed(client, model_id) + logger.info("Model deployed successfully: %s", model_id) + + # Step 4: Create the ingest pipeline with text_embedding processor + ingest_pipeline_body = { + "description": "Ingest pipeline that generates embeddings via Bedrock Titan V2", + "processors": [ + { + "text_embedding": { + "model_id": model_id, + "field_map": { + "embedding_text": "embedding", + }, + } + } + ], + } + + client.transport.perform_request( + "PUT", + f"/_ingest/pipeline/{INGEST_PIPELINE_NAME}", + body=ingest_pipeline_body, + ) + logger.info("Created ingest pipeline: %s", INGEST_PIPELINE_NAME) + + # Step 5: Create/update the search pipeline with normalization + search_pipeline_body = { + "description": "Normalization pipeline for hybrid search", + "phase_results_processors": [ + { + "normalization-processor": { + "normalization": {"technique": "min_max"}, + "combination": { + "technique": "arithmetic_mean", + "parameters": {"weights": [0.3, 0.7]}, + }, + } + } + ], + } + + client.transport.perform_request( + "PUT", + f"/_search/pipeline/{SEARCH_PIPELINE_NAME}", + body=search_pipeline_body, + ) + logger.info("Created search pipeline: %s", SEARCH_PIPELINE_NAME) + + # Store outputs for !GetAtt + helper.Data["SearchPipeline"] = SEARCH_PIPELINE_NAME + helper.Data["IngestPipeline"] = INGEST_PIPELINE_NAME + helper.Data["ModelId"] = model_id + helper.Data["ConnectorId"] = connector_id + + return f"{connector_id}/{model_id}" + + +@helper.create +def on_create(event, _context): + """Create ML connector, model, ingest pipeline, and search pipeline.""" + return _setup_ml_and_pipelines(event) + + +@helper.update +def on_update(event, _context): + """Update recreates all ML resources and pipelines.""" + return _setup_ml_and_pipelines(event) + + +@helper.delete +def on_delete(event, _context): + """Delete pipelines and ML resources on stack deletion.""" + props = event["ResourceProperties"] + endpoint = props["CollectionEndpoint"] + + client = get_client(endpoint, REGION) + + # Delete ingest pipeline + try: + client.transport.perform_request( + "DELETE", f"/_ingest/pipeline/{INGEST_PIPELINE_NAME}" + ) + except Exception: + logger.warning("Failed to delete ingest pipeline", exc_info=True) + + # Delete search pipeline + try: + client.transport.perform_request( + "DELETE", f"/_search/pipeline/{SEARCH_PIPELINE_NAME}" + ) + except Exception: + logger.warning("Failed to delete search pipeline", exc_info=True) + + # Undeploy and delete model (best effort from physical resource ID) + physical_id = event.get("PhysicalResourceId", "") + if "/" in physical_id: + connector_id, model_id = physical_id.split("/", 1) + try: + client.transport.perform_request( + "POST", f"/_plugins/_ml/models/{model_id}/_undeploy" + ) + time.sleep(5) + client.transport.perform_request( + "DELETE", f"/_plugins/_ml/models/{model_id}" + ) + except Exception: + logger.warning("Failed to delete model %s", model_id, exc_info=True) + + try: + client.transport.perform_request( + "DELETE", f"/_plugins/_ml/connectors/{connector_id}" + ) + except Exception: + logger.warning( + "Failed to delete connector %s", connector_id, exc_info=True + ) + + +def lambda_handler(event, context): + """Main Lambda handler — delegates to crhelper.""" + helper(event, context) diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/requirements.txt new file mode 100644 index 000000000..832072fd1 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/custom_resources/setup_pipeline/requirements.txt @@ -0,0 +1 @@ +crhelper diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/app.py b/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/app.py new file mode 100644 index 000000000..73557202b --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/app.py @@ -0,0 +1,63 @@ +"""Lambda handler for deleting documents from OpenSearch Serverless NextGen.""" + +import json +import os + +from aws_lambda_powertools import Logger, Tracer +from opensearch_client import get_client + +logger = Logger() +tracer = Tracer() + +INDEX_NAME = os.environ.get("INDEX_NAME", "documents") +COLLECTION_ENDPOINT = os.environ["COLLECTION_ENDPOINT"] +REGION = os.environ.get("AWS_REGION", "eu-west-1") + + +@tracer.capture_lambda_handler +@logger.inject_lambda_context +def handler(event, context): + """Delete one or more documents by ID. + + Expected request body: + { + "ids": ["doc-1", "doc-2", ...] + } + """ + try: + body = json.loads(event.get("body", "{}")) + doc_ids = body.get("ids", []) + + if not doc_ids: + return { + "statusCode": 400, + "body": json.dumps({"error": "No document IDs provided"}), + } + + client = get_client(COLLECTION_ENDPOINT, REGION) + + bulk_body = [] + for doc_id in doc_ids: + bulk_body.append({"delete": {"_index": INDEX_NAME, "_id": doc_id}}) + + response = client.bulk(body=bulk_body) + + logger.info("Deleted documents", extra={"count": len(doc_ids), "errors": response.get("errors", False)}) + + return { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps( + { + "message": f"Deleted {len(doc_ids)} document(s)", + "errors": response.get("errors", False), + } + ), + } + + except Exception as e: + logger.exception("Error deleting documents") + return { + "statusCode": 500, + "body": json.dumps({"error": str(e)}), + } diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/requirements.txt new file mode 100644 index 000000000..bd9f77520 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/delete_documents/requirements.txt @@ -0,0 +1 @@ +# Dependencies provided by OpenSearchClientLayer diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/app.py b/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/app.py new file mode 100644 index 000000000..1148b9707 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/app.py @@ -0,0 +1,105 @@ +"""Lambda handler for indexing documents into OpenSearch Serverless NextGen. + +Documents are indexed with a text field that the OpenSearch ingest pipeline +automatically converts to embeddings via Bedrock Titan V2. No client-side +embedding generation is needed. +""" + +import json +import os + +from aws_lambda_powertools import Logger, Tracer +from opensearch_client import get_client + +logger = Logger() +tracer = Tracer() + +INDEX_NAME = os.environ.get("INDEX_NAME", "documents") +COLLECTION_ENDPOINT = os.environ["COLLECTION_ENDPOINT"] +INGEST_PIPELINE = os.environ.get("INGEST_PIPELINE", "embedding-ingest-pipeline") +REGION = os.environ.get("AWS_REGION", "eu-west-1") + + +@tracer.capture_lambda_handler +@logger.inject_lambda_context +def handler(event, context): + """Index one or more documents. + + Expected request body: + { + "documents": [ + { + "id": "doc-1", + "title": "Example Document", + "content": "Full text content..." + } + ] + } + + Embeddings are generated automatically by the OpenSearch ingest pipeline. + """ + try: + body = json.loads(event.get("body", "{}")) + documents = body.get("documents", []) + + if not documents: + return { + "statusCode": 400, + "body": json.dumps({"error": "No documents provided"}), + } + + client = get_client(COLLECTION_ENDPOINT, REGION) + + # Build bulk request — the ingest pipeline handles embedding generation + bulk_body = [] + for doc in documents: + doc_id = doc.get("id") + title = doc.get("title", "") + content = doc.get("content", "") + + # Combine title and content for the embedding source field + embedding_text = f"{title}. {content}" if title else content + + bulk_body.append({"index": {"_index": INDEX_NAME, "_id": doc_id}}) + bulk_body.append( + { + "title": title, + "content": content, + "embedding_text": embedding_text, + } + ) + + # Use the ingest pipeline to auto-generate embeddings + response = client.bulk(body=bulk_body, pipeline=INGEST_PIPELINE) + + # Extract individual item errors for debugging + item_errors = [] + if response.get("errors"): + for item in response.get("items", []): + for action, result in item.items(): + if "error" in result: + item_errors.append({"id": result.get("_id"), "error": result["error"]}) + + logger.info("Indexed documents", extra={ + "count": len(documents), + "errors": response.get("errors", False), + }) + + return { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps( + { + "message": f"Indexed {len(documents)} document(s)", + "errors": response.get("errors", False), + "item_errors": item_errors[:5] if item_errors else [], + } + ), + } + + except Exception as e: + logger.exception("Error indexing documents") + return { + "statusCode": 500, + "body": json.dumps({"error": str(e)}), + } diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/requirements.txt new file mode 100644 index 000000000..bd9f77520 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/index_documents/requirements.txt @@ -0,0 +1 @@ +# Dependencies provided by OpenSearchClientLayer diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/search/app.py b/apigw-lambda-opensearch-serverless-nextgen/lambda/search/app.py new file mode 100644 index 000000000..9bdc17b37 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/search/app.py @@ -0,0 +1,159 @@ +"""Lambda handler for search against OpenSearch Serverless NextGen. + +Supports three search modes: +- semantic: neural search using OpenSearch ML model (Bedrock Titan V2) +- lexical: BM25 full-text match query with fuzzy matching +- hybrid: combines both with score normalization pipeline + +Embeddings are generated server-side by the OpenSearch ML model — no +client-side embedding generation is needed. +""" + +import json +import os + +from aws_lambda_powertools import Logger, Tracer +from opensearch_client import get_client + +logger = Logger() +tracer = Tracer() + +INDEX_NAME = os.environ.get("INDEX_NAME", "documents") +COLLECTION_ENDPOINT = os.environ["COLLECTION_ENDPOINT"] +MODEL_ID = os.environ["MODEL_ID"] +REGION = os.environ.get("AWS_REGION", "eu-west-1") + + +@tracer.capture_lambda_handler +@logger.inject_lambda_context +def handler(event, context): + """Perform search in the specified mode. + + Expected request body: + { + "query": "search terms", + "mode": "semantic" | "lexical" | "hybrid", // optional, default "semantic" + "size": 5 // optional, default 5 + } + """ + try: + body = json.loads(event.get("body", "{}")) + query_text = body.get("query") + mode = body.get("mode", "semantic") + size = body.get("size", 5) + + if not query_text: + return { + "statusCode": 400, + "body": json.dumps({"error": "'query' is required"}), + } + + if mode not in ("semantic", "lexical", "hybrid"): + return { + "statusCode": 400, + "body": json.dumps({"error": "mode must be 'semantic', 'lexical', or 'hybrid'"}), + } + + client = get_client(COLLECTION_ENDPOINT, REGION) + search_body = _build_query(query_text, mode, size) + + params = {} + if mode == "hybrid": + params["search_pipeline"] = "hybrid-search-pipeline" + + response = client.search(index=INDEX_NAME, body=search_body, params=params) + + hits = [ + { + "id": hit["_id"], + "score": hit["_score"], + "title": hit["_source"].get("title"), + "content": hit["_source"].get("content"), + } + for hit in response["hits"]["hits"] + ] + + logger.info("Search completed", extra={ + "mode": mode, + "total_hits": response["hits"]["total"]["value"], + }) + + return { + "statusCode": 200, + "headers": {"Content-Type": "application/json"}, + "body": json.dumps( + { + "results": hits, + "total": response["hits"]["total"]["value"], + "mode": mode, + } + ), + } + + except Exception as e: + logger.exception("Error performing search") + return { + "statusCode": 500, + "body": json.dumps({"error": str(e)}), + } + + +def _build_query(query_text, mode, size): + """Build the OpenSearch query body based on mode.""" + if mode == "hybrid": + return _build_hybrid_query(query_text, size) + + if mode == "lexical": + return { + "size": size, + "query": { + "multi_match": { + "query": query_text, + "fields": ["title^2", "content"], + "fuzziness": 1, + } + }, + } + + # semantic — uses neural query (OpenSearch generates embedding server-side) + return { + "size": size, + "min_score": 0.55, + "query": { + "neural": { + "embedding": { + "query_text": query_text, + "model_id": MODEL_ID, + "k": size, + } + } + }, + } + + +def _build_hybrid_query(query_text, size): + """Build a hybrid query combining lexical and neural search.""" + return { + "size": size, + "query": { + "hybrid": { + "queries": [ + { + "multi_match": { + "query": query_text, + "fields": ["title^2", "content"], + } + }, + { + "neural": { + "embedding": { + "query_text": query_text, + "model_id": MODEL_ID, + "k": size, + } + } + }, + ] + } + }, + } diff --git a/apigw-lambda-opensearch-serverless-nextgen/lambda/search/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/lambda/search/requirements.txt new file mode 100644 index 000000000..bd9f77520 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/lambda/search/requirements.txt @@ -0,0 +1 @@ +# Dependencies provided by OpenSearchClientLayer diff --git a/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/__init__.py b/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/opensearch_client.py b/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/opensearch_client.py new file mode 100644 index 000000000..718cbd87f --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/opensearch_client.py @@ -0,0 +1,33 @@ +"""Shared OpenSearch Serverless client configuration.""" + +import boto3 +from opensearchpy import OpenSearch, RequestsHttpConnection +from requests_aws4auth import AWS4Auth + + +def get_client(endpoint: str, region: str) -> OpenSearch: + """Create an OpenSearch client authenticated with SigV4 for AOSS. + + Args: + endpoint: The collection endpoint URL (https://...). + region: AWS region code (e.g. eu-west-1). + """ + host = endpoint.replace("https://", "").rstrip("/") + + credentials = boto3.Session().get_credentials() + auth = AWS4Auth( + credentials.access_key, + credentials.secret_key, + region, + "aoss", + session_token=credentials.token, + ) + + return OpenSearch( + hosts=[{"host": host, "port": 443}], + http_auth=auth, + use_ssl=True, + verify_certs=True, + connection_class=RequestsHttpConnection, + timeout=25, + ) diff --git a/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/requirements.txt new file mode 100644 index 000000000..9824a37fd --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/layers/opensearch_client/requirements.txt @@ -0,0 +1,3 @@ +opensearch-py>=2.8.0 +requests-aws4auth>=1.3.2 +requests>=2.34.2 diff --git a/apigw-lambda-opensearch-serverless-nextgen/mise.toml b/apigw-lambda-opensearch-serverless-nextgen/mise.toml new file mode 100644 index 000000000..90d0ae997 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/mise.toml @@ -0,0 +1,93 @@ +[env] +_.python.venv = { path = ".venv", create = true } +AWS_REGION = "eu-west-1" +STACK_NAME = "lambda-aoss-nextgen" +ENABLE_API_ACCESS_LOGS = "false" +COLLECTION_GROUP_NAME = "{{exec(command='echo cg-$STACK_NAME | tr A-Z a-z')}}" +COLLECTION_NAME = "{{exec(command='echo col-$STACK_NAME | tr A-Z a-z')}}" + +[tools] +python = "3.14" +uv = "latest" +aws-sam = "latest" + +[tasks] + +[tasks.init] +description = "Install all Lambda, layer, and test dependencies into the local .venv" +run = """ +find lambda layers tests -name 'requirements.txt' | while read -r req; do + echo "Installing ${req}..." + uv pip install --upgrade -r "${req}" +done +echo "Installing ./requirements.txt" +uv pip install --upgrade -r requirements.txt +""" + +[tasks.clean] +description = "Remove all generated and temporary files" +run = """ +rm -rf .aws-sam/ .pytest_cache/ __pycache__/ .venv/ +find . -type d -name '__pycache__' -exec rm -rf {} + 2>/dev/null || true +find . -type f -name '*.py[cod]' -delete 2>/dev/null || true +""" + +[tasks."sam:validate"] +description = "Validate the SAM template against AWS CloudFormation rules and lint for best practices" +run = "sam validate --lint" + +[tasks."sam:build"] +description = "Build the SAM application artifacts (validates template first)" +depends = "sam:validate" +run = "sam build" + +[tasks."sam:deploy"] +description = "Deploy the SAM application to AWS without requiring changeset confirmation (builds first)" +depends = "sam:build" +run = """ + sam deploy --stack-name ${STACK_NAME} \ + --parameter-overrides EnableApiAccessLogs=${ENABLE_API_ACCESS_LOGS} \ + CollectionGroupName=${COLLECTION_GROUP_NAME} \ + CollectionName=${COLLECTION_NAME} +""" + +[tasks."sam:sync"] +description = "Watch for file changes and automatically sync code and infra to AWS" +run = """sam sync --stack-name ${STACK_NAME} \ + --watch \ + --parameter-overrides EnableApiAccessLogs=${ENABLE_API_ACCESS_LOGS} \ + CollectionGroupName=${COLLECTION_GROUP_NAME} \ + CollectionName=${COLLECTION_NAME} +""" + +[tasks."sam:delete"] +description = "Delete the SAM stack and all associated resources from AWS" +depends_post = "clean:loggroups" +run = "sam delete --no-prompts --stack-name ${STACK_NAME}" + +[tasks."clean:loggroups"] +description = "Delete log groups that may have been recreated as part of the delete process" +run = """ +aws logs describe-log-groups \ + --log-group-name-prefix "/aws/lambda/${STACK_NAME}-" \ + --query 'logGroups[].logGroupName' --output text \ + | tr '\t' '\n' \ + | while read -r lg; do + [ -z "${lg}" ] && continue + echo "Deleting log group ${lg}" + aws logs delete-log-group --log-group-name "${lg}" + done +""" + +[tasks."test:unit"] +description = "Run unit tests" +run = "pytest tests/unit/ -v" + +[tasks."test:integration"] +description = "Run integration tests against the deployed stack" +run = "pytest tests/integration/ -v -s" + +[tasks.test] +description = "Run all tests (unit and integration)" +depends = "test:unit" +depends_post = "test:integration" diff --git a/apigw-lambda-opensearch-serverless-nextgen/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/requirements.txt new file mode 100644 index 000000000..8e9837f55 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/requirements.txt @@ -0,0 +1 @@ +awscurl>=0.32 diff --git a/apigw-lambda-opensearch-serverless-nextgen/template.yaml b/apigw-lambda-opensearch-serverless-nextgen/template.yaml new file mode 100644 index 000000000..4cbf3e06a --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/template.yaml @@ -0,0 +1,630 @@ +AWSTemplateFormatVersion: '2010-09-09' +Transform: AWS::Serverless-2016-10-31 +Description: > + Serverless Search API — Lambda + OpenSearch Serverless NextGen. + Demonstrates zero-baseline cost with both Lambda and OpenSearch scaling to zero when idle. + +Parameters: + CollectionName: + Type: String + Default: semantic-search + Description: Name of the OpenSearch Serverless collection + AllowedPattern: '^[a-z][a-z0-9-]{2,27}$' + + CollectionGroupName: + Type: String + Default: semantic-search-cg + Description: Name of the OpenSearch Serverless collection group + AllowedPattern: '^[a-z][a-z0-9-]{2,31}$' + + LogRetentionDays: + Type: Number + Default: 7 + Description: Number of days to retain CloudWatch log events + AllowedValues: [1, 3, 5, 7, 14, 30, 60, 90, 120, 150, 180, 365, 400, 545, 731, 1827, 3653] + + EnableApiAccessLogs: + Type: String + Default: 'false' + Description: > + Enable API Gateway access logging. Prerequisite: the account-level API Gateway + CloudWatch Logs role must already be configured before setting this to true. + AllowedValues: ['true', 'false'] + +Conditions: + ApiAccessLogsEnabled: !Equals [!Ref EnableApiAccessLogs, 'true'] + +Globals: + Function: + Timeout: 30 + MemorySize: 256 + Runtime: python3.14 + Architectures: + - arm64 + Tracing: Active + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: opensearch-nextgen + POWERTOOLS_METRICS_NAMESPACE: opensearch-nextgen + Layers: + - !Sub arn:${AWS::Partition}:lambda:${AWS::Region}:017000801446:layer:AWSLambdaPowertoolsPythonV3-python314-arm64:33 + +Resources: + # ============================================================ + # API Gateway + # ============================================================ + + SearchApi: + Type: AWS::Serverless::Api + Metadata: + cfn_nag: + rules_to_suppress: + - id: W64 + reason: "Sample application — usage plan not required for demonstration purposes" + - id: W68 + reason: "Sample application — usage plan not required for demonstration purposes" + - id: W69 + reason: "Sample application — access logging controlled by EnableApiAccessLogs parameter" + Properties: + StageName: Prod + TracingEnabled: true + Auth: + DefaultAuthorizer: AWS_IAM + AccessLogSetting: !If + - ApiAccessLogsEnabled + - DestinationArn: !GetAtt ApiAccessLogGroup.Arn + Format: '{"requestId":"$context.requestId","ip":"$context.identity.sourceIp","caller":"$context.identity.caller","user":"$context.identity.user","requestTime":"$context.requestTime","httpMethod":"$context.httpMethod","resourcePath":"$context.resourcePath","status":"$context.status","protocol":"$context.protocol","responseLength":"$context.responseLength"}' + - !Ref AWS::NoValue + + # ============================================================ + # API Gateway Access Logging (conditional) + # Prerequisite: The account-level API Gateway CloudWatch Logs role must be configured + # before enabling. Set via AWS Console (API Gateway > Settings) or a separate stack. + # ============================================================ + + ApiAccessLogGroup: + Type: AWS::Logs::LogGroup + Condition: ApiAccessLogsEnabled + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/apigateway/${AWS::StackName}-access-logs + RetentionInDays: !Ref LogRetentionDays + + # ============================================================ + # Shared Lambda Layer + # ============================================================ + + OpenSearchClientLayer: + Type: AWS::Serverless::LayerVersion + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Properties: + LayerName: opensearch-client + Description: OpenSearch Serverless client with SigV4 auth + ContentUri: layers/opensearch_client/ + CompatibleRuntimes: + - python3.14 + CompatibleArchitectures: + - arm64 + Metadata: + BuildMethod: python3.14 + BuildArchitecture: arm64 + + # ============================================================ + # Custom Resource — NextGen Collection Group + # ============================================================ + + CollectionGroupFunctionLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/lambda/${AWS::StackName}-CollectionGroupFunction + RetentionInDays: !Ref LogRetentionDays + + CollectionGroupFunction: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W89 + reason: "Sample application — VPC deployment not required for demonstration purposes" + - id: W92 + reason: "Sample application — reserved concurrency not required for demonstration purposes" + Properties: + Handler: app.lambda_handler + CodeUri: lambda/custom_resources/nextgen_collection_group/ + Description: Custom resource handler for NextGen collection group + Timeout: 60 + LoggingConfig: + LogGroup: !Ref CollectionGroupFunctionLogGroup + Policies: + - Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - aoss:CreateCollectionGroup + - aoss:UpdateCollectionGroup + - aoss:DeleteCollectionGroup + Resource: !Sub 'arn:${AWS::Partition}:aoss:${AWS::Region}:${AWS::AccountId}:collection-group/*' + - Effect: Allow + Action: + - aoss:BatchGetCollectionGroup + Resource: !Sub 'arn:${AWS::Partition}:aoss:${AWS::Region}:${AWS::AccountId}:collection-group/*' + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: opensearch-nextgen-cr + + CollectionGroup: + Type: AWS::CloudFormation::CustomResource + Properties: + ServiceToken: !GetAtt CollectionGroupFunction.Arn + ServiceTimeout: '120' + Name: !Ref CollectionGroupName + Description: Collection group for semantic search sample — NextGen with scale-to-zero + MaxIndexingCapacityInOCU: '8' + MaxSearchCapacityInOCU: '8' + + # ============================================================ + # OpenSearch Serverless — Security Policies & Collection + # ============================================================ + + EncryptionPolicy: + Type: AWS::OpenSearchServerless::SecurityPolicy + Properties: + Name: !Sub '${AWS::StackName}-enc' + Type: encryption + Description: Encryption policy for semantic search collection + Policy: !Sub | + { + "Rules": [ + { + "ResourceType": "collection", + "Resource": ["collection/${CollectionName}"] + } + ], + "AWSOwnedKey": true + } + + NetworkPolicy: + Type: AWS::OpenSearchServerless::SecurityPolicy + Properties: + Name: !Sub '${AWS::StackName}-net' + Type: network + Description: Network policy — public access with semantic enrichment service access + Policy: !Sub | + [ + { + "Rules": [ + { + "ResourceType": "collection", + "Resource": ["collection/${CollectionName}"] + }, + { + "ResourceType": "dashboard", + "Resource": ["collection/${CollectionName}"] + } + ], + "AllowFromPublic": true + }, + { + "Rules": [ + { + "ResourceType": "collection", + "Resource": ["collection/${CollectionName}"] + } + ], + "AllowFromPublic": false, + "SourceServices": ["aoss.amazonaws.com"] + } + ] + + DataAccessPolicy: + Type: AWS::OpenSearchServerless::AccessPolicy + Properties: + Name: !Sub '${AWS::StackName}-access' + Type: data + Description: Data access policy for Lambda function role + Policy: !Sub + - | + [ + { + "Description": "Search read and ML execute access", + "Rules": [{"ResourceType": "index", "Resource": ["index/${CollectionName}/*"], "Permission": ["aoss:DescribeIndex", "aoss:ReadDocument"]}, {"ResourceType": "model", "Resource": ["model/${CollectionName}/*"], "Permission": ["aoss:DescribeMLResource", "aoss:ExecuteMLResource"]}], + "Principal": ["${SearchRoleArn}"] + }, + { + "Description": "Index write access", + "Rules": [{"ResourceType": "index", "Resource": ["index/${CollectionName}/*"], "Permission": ["aoss:CreateIndex", "aoss:DescribeIndex", "aoss:UpdateIndex", "aoss:ReadDocument", "aoss:WriteDocument"]}], + "Principal": ["${IndexRoleArn}"] + }, + { + "Description": "Delete write access", + "Rules": [{"ResourceType": "index", "Resource": ["index/${CollectionName}/*"], "Permission": ["aoss:DescribeIndex", "aoss:WriteDocument"]}], + "Principal": ["${DeleteRoleArn}"] + }, + { + "Description": "Index pipeline and ML full access", + "Rules": [{"ResourceType": "collection", "Resource": ["collection/${CollectionName}"], "Permission": ["aoss:CreateCollectionItems", "aoss:DescribeCollectionItems", "aoss:UpdateCollectionItems"]}, {"ResourceType": "model", "Resource": ["model/${CollectionName}/*"], "Permission": ["aoss:CreateMLResource", "aoss:DescribeMLResource", "aoss:UpdateMLResource", "aoss:DeleteMLResource", "aoss:ExecuteMLResource"]}], + "Principal": ["${IndexRoleArn}"] + }, + { + "Description": "Admin full access", + "Rules": [{"ResourceType": "index", "Resource": ["index/${CollectionName}/*"], "Permission": ["aoss:CreateIndex", "aoss:DescribeIndex", "aoss:UpdateIndex", "aoss:DeleteIndex", "aoss:ReadDocument", "aoss:WriteDocument"]}, {"ResourceType": "model", "Resource": ["model/${CollectionName}/*"], "Permission": ["aoss:CreateMLResource", "aoss:DescribeMLResource", "aoss:UpdateMLResource", "aoss:DeleteMLResource", "aoss:ExecuteMLResource"]}], + "Principal": ["arn:aws:iam::${AWS::AccountId}:role/Admin"] + } + ] + - SearchRoleArn: !GetAtt SearchFunctionRole.Arn + IndexRoleArn: !GetAtt IndexFunctionRole.Arn + DeleteRoleArn: !GetAtt DeleteFunctionRole.Arn + + Collection: + Type: AWS::OpenSearchServerless::Collection + DependsOn: + - EncryptionPolicy + - NetworkPolicy + - DataAccessPolicy + - CollectionGroup + Properties: + Name: !Ref CollectionName + Type: VECTORSEARCH + Description: NextGen vector search collection for semantic search API + CollectionGroupName: !Ref CollectionGroupName + + # ============================================================ + # ML Model Role — allows OpenSearch ML to invoke Bedrock + # ============================================================ + + OpenSearchMLRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: + - opensearchservice.amazonaws.com + - ml.opensearchservice.amazonaws.com + Action: sts:AssumeRole + Policies: + - PolicyName: BedrockInvoke + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - bedrock:InvokeModel + Resource: !Sub 'arn:aws:bedrock:${AWS::Region}::foundation-model/amazon.titan-embed-text-v2:0' + + # ============================================================ + # Index & Pipeline Setup (runs at deploy time) + # ============================================================ + + SetupPipelineFunctionLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/lambda/${AWS::StackName}-SetupPipelineFunction + RetentionInDays: !Ref LogRetentionDays + + SetupPipelineFunction: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W89 + reason: "Sample application — VPC deployment not required for demonstration purposes" + - id: W92 + reason: "Sample application — reserved concurrency not required for demonstration purposes" + Properties: + Handler: app.lambda_handler + CodeUri: lambda/custom_resources/setup_pipeline/ + Description: Creates ML model, ingest pipeline, and search pipeline at deploy time + Timeout: 300 + Role: !GetAtt IndexFunctionRole.Arn + LoggingConfig: + LogGroup: !Ref SetupPipelineFunctionLogGroup + Layers: + - !Ref OpenSearchClientLayer + Environment: + Variables: + POWERTOOLS_SERVICE_NAME: opensearch-nextgen-setup + + VectorIndex: + Type: AWS::OpenSearchServerless::Index + Properties: + CollectionEndpoint: !GetAtt Collection.CollectionEndpoint + IndexName: documents + Mappings: + Properties: + title: + Type: text + content: + Type: text + embedding_text: + Type: text + embedding: + Type: knn_vector + Dimension: 1024 + Method: + Name: hnsw + SpaceType: cosinesimil + Settings: + Index: + Knn: true + + SetupSearchPipeline: + Type: AWS::CloudFormation::CustomResource + DependsOn: VectorIndex + Properties: + ServiceToken: !GetAtt SetupPipelineFunction.Arn + ServiceTimeout: '300' + CollectionEndpoint: !GetAtt Collection.CollectionEndpoint + ModelRoleArn: !GetAtt OpenSearchMLRole.Arn + EmbeddingModelId: amazon.titan-embed-text-v2:0 + EmbeddingDimension: '1024' + + # ============================================================ + # Lambda Functions + # ============================================================ + + SearchFunctionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess + Policies: + - PolicyName: OpenSearchServerlessRead + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - aoss:APIAccessAll + Resource: !Sub 'arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*' + + SearchFunctionLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/lambda/${AWS::StackName}-SearchFunction + RetentionInDays: !Ref LogRetentionDays + + SearchFunction: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W89 + reason: "Sample application — VPC deployment not required for demonstration purposes" + - id: W92 + reason: "Sample application — reserved concurrency not required for demonstration purposes" + Properties: + Handler: app.handler + CodeUri: lambda/search/ + Description: Performs semantic, lexical, and hybrid search (neural queries via OpenSearch ML) + Role: !GetAtt SearchFunctionRole.Arn + LoggingConfig: + LogGroup: !Ref SearchFunctionLogGroup + Layers: + - !Ref OpenSearchClientLayer + Environment: + Variables: + COLLECTION_ENDPOINT: !GetAtt Collection.CollectionEndpoint + COLLECTION_NAME: !Ref CollectionName + INDEX_NAME: documents + MODEL_ID: !GetAtt SetupSearchPipeline.ModelId + Events: + SearchApi: + Type: Api + Properties: + RestApiId: !Ref SearchApi + Path: /search + Method: POST + + IndexFunctionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess + Policies: + - PolicyName: OpenSearchServerlessWrite + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - aoss:APIAccessAll + - aoss:CreateIndex + - aoss:GetIndex + Resource: !Sub 'arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*' + - PolicyName: PassMLRole + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - iam:PassRole + Resource: !GetAtt OpenSearchMLRole.Arn + + IndexFunctionLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/lambda/${AWS::StackName}-IndexFunction + RetentionInDays: !Ref LogRetentionDays + + IndexFunction: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W89 + reason: "Sample application — VPC deployment not required for demonstration purposes" + - id: W92 + reason: "Sample application — reserved concurrency not required for demonstration purposes" + Properties: + Handler: app.handler + CodeUri: lambda/index_documents/ + Description: Indexes documents with text (embeddings generated by OpenSearch ML) + Timeout: 120 + Role: !GetAtt IndexFunctionRole.Arn + LoggingConfig: + LogGroup: !Ref IndexFunctionLogGroup + Layers: + - !Ref OpenSearchClientLayer + Environment: + Variables: + COLLECTION_ENDPOINT: !GetAtt Collection.CollectionEndpoint + COLLECTION_NAME: !Ref CollectionName + INDEX_NAME: documents + INGEST_PIPELINE: !GetAtt SetupSearchPipeline.IngestPipeline + Events: + IndexApi: + Type: Api + Properties: + RestApiId: !Ref SearchApi + Path: /index + Method: POST + + DeleteFunctionRole: + Type: AWS::IAM::Role + Properties: + AssumeRolePolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Principal: + Service: lambda.amazonaws.com + Action: sts:AssumeRole + ManagedPolicyArns: + - arn:aws:iam::aws:policy/service-role/AWSLambdaBasicExecutionRole + - arn:aws:iam::aws:policy/AWSXRayDaemonWriteAccess + Policies: + - PolicyName: OpenSearchServerlessDelete + PolicyDocument: + Version: '2012-10-17' + Statement: + - Effect: Allow + Action: + - aoss:APIAccessAll + Resource: !Sub 'arn:aws:aoss:${AWS::Region}:${AWS::AccountId}:collection/*' + + DeleteFunctionLogGroup: + Type: AWS::Logs::LogGroup + DeletionPolicy: Delete + UpdateReplacePolicy: Delete + Metadata: + cfn_nag: + rules_to_suppress: + - id: W84 + reason: "Sample application — KMS encryption not required for demonstration log data" + Properties: + LogGroupName: !Sub /aws/lambda/${AWS::StackName}-DeleteFunction + RetentionInDays: !Ref LogRetentionDays + + DeleteFunction: + Type: AWS::Serverless::Function + Metadata: + cfn_nag: + rules_to_suppress: + - id: W89 + reason: "Sample application — VPC deployment not required for demonstration purposes" + - id: W92 + reason: "Sample application — reserved concurrency not required for demonstration purposes" + Properties: + Handler: app.handler + CodeUri: lambda/delete_documents/ + Description: Deletes documents by ID from the index + Role: !GetAtt DeleteFunctionRole.Arn + LoggingConfig: + LogGroup: !Ref DeleteFunctionLogGroup + Layers: + - !Ref OpenSearchClientLayer + Environment: + Variables: + COLLECTION_ENDPOINT: !GetAtt Collection.CollectionEndpoint + COLLECTION_NAME: !Ref CollectionName + INDEX_NAME: documents + Events: + DeleteApi: + Type: Api + Properties: + RestApiId: !Ref SearchApi + Path: /documents + Method: DELETE + +Outputs: + SearchApiUrl: + Description: API Gateway endpoint URL for the search function + Value: !Sub 'https://${SearchApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/search' + + IndexApiUrl: + Description: API Gateway endpoint URL for the index function + Value: !Sub 'https://${SearchApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/index' + + DeleteApiUrl: + Description: API Gateway endpoint URL for the delete function + Value: !Sub 'https://${SearchApi}.execute-api.${AWS::Region}.amazonaws.com/Prod/documents' + + CollectionEndpoint: + Description: OpenSearch Serverless collection endpoint + Value: !GetAtt Collection.CollectionEndpoint + + CollectionArn: + Description: OpenSearch Serverless collection ARN + Value: !GetAtt Collection.Arn + + CollectionGroupId: + Description: NextGen Collection Group ID + Value: !GetAtt CollectionGroup.Id + + MLModelId: + Description: OpenSearch ML model ID for neural search + Value: !GetAtt SetupSearchPipeline.ModelId diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/__init__.py b/apigw-lambda-opensearch-serverless-nextgen/tests/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/integration/__init__.py b/apigw-lambda-opensearch-serverless-nextgen/tests/integration/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_data.json b/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_data.json new file mode 100644 index 000000000..65f1aacb3 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_data.json @@ -0,0 +1,54 @@ +{ + "documents": [ + {"id": "prod-01", "title": "Waterproof Hiking Boots", "content": "Durable leather boots with Gore-Tex lining for all-weather trail use. Vibram sole provides excellent grip on wet rock and muddy paths."}, + {"id": "prod-02", "title": "Summer Beach Sandals", "content": "Lightweight open-toe sandals with quick-dry straps. Perfect for shoreline walks and water activities at the coast."}, + {"id": "prod-03", "title": "Running Sneakers Pro", "content": "Breathable mesh upper with responsive foam cushioning. Designed for long-distance road running and marathon training."}, + {"id": "prod-04", "title": "Wool Winter Socks", "content": "Merino wool blend socks with reinforced heel and toe. Moisture-wicking and thermal insulation keep feet warm in cold weather."}, + {"id": "prod-05", "title": "Portable Bluetooth Speaker", "content": "Waterproof wireless speaker with 12-hour battery life. Rich bass and 360-degree sound for outdoor gatherings and pool parties."}, + {"id": "prod-06", "title": "Noise-Cancelling Headphones", "content": "Over-ear headphones with active noise cancellation. 30-hour battery and premium memory foam ear cushions for long flights."}, + {"id": "prod-07", "title": "Camping Hammock", "content": "Ultralight nylon hammock for relaxing outdoors between trees. Integrated mosquito net and supports up to 200kg. Packs down to fist size for easy carrying on hikes and camping trips."}, + {"id": "prod-08", "title": "Stainless Steel Water Bottle", "content": "Double-wall vacuum insulated bottle for hydration on the go. Keeps drinks cold 24 hours or hot 12 hours. BPA-free and leak-proof."}, + {"id": "prod-09", "title": "Yoga Mat Premium", "content": "Non-slip natural rubber mat with alignment markers. Extra thick 6mm cushioning protects joints during floor exercises and stretching."}, + {"id": "prod-10", "title": "Cycling Helmet", "content": "Aerodynamic road cycling helmet with MIPS brain protection system. Lightweight polycarbonate shell with 14 ventilation channels."}, + {"id": "prod-11", "title": "Camping Tent 3-Person", "content": "Freestanding dome tent with waterproof rainfly. Sets up in under 5 minutes. Two vestibules provide gear storage space for weekend trips."}, + {"id": "prod-12", "title": "LED Head Torch", "content": "Rechargeable headlamp with 800 lumens and red night-vision mode. Lightweight at 75g with adjustable beam angle for trail running after dark."}, + {"id": "prod-13", "title": "Insulated Ski Jacket", "content": "Down-filled jacket with 10,000mm waterproof rating. Helmet-compatible hood and powder skirt keep snow out during alpine skiing."}, + {"id": "prod-14", "title": "Fitness Tracker Watch", "content": "Heart rate monitor with GPS tracking and sleep analysis. Tracks steps, calories, and workouts. Water-resistant to 50 meters."}, + {"id": "prod-15", "title": "Climbing Rope 60m", "content": "Dynamic single rope rated for lead climbing. Dry-treated sheath resists moisture absorption. 9.8mm diameter balances handling and durability."}, + {"id": "prod-16", "title": "Kayak Paddle Carbon", "content": "Lightweight carbon fibre paddle with adjustable ferrule. Dihedral blade design reduces flutter for efficient forward paddling on lakes and rivers."}, + {"id": "prod-17", "title": "Compression Running Tights", "content": "Graduated compression leggings improve blood circulation during runs. Reflective details for visibility and zip pocket for keys and phone."}, + {"id": "prod-18", "title": "Dry Bag 20L", "content": "Roll-top waterproof bag keeps belongings dry during kayaking, rafting, and beach trips. Welded seams and transparent window panel."}, + {"id": "prod-19", "title": "Trail Mix Energy Bars", "content": "Pack of 12 nut and seed bars with dark chocolate chips. High protein snack for hiking and endurance sports. No artificial flavours."}, + {"id": "prod-20", "title": "Polarised Sunglasses", "content": "UV400 polarised lenses reduce glare from water and snow. Lightweight frame with non-slip nose pads for cycling and fishing."}, + {"id": "prod-21", "title": "Foam Roller Recovery", "content": "High-density EVA foam roller for deep tissue massage and muscle recovery after intense workouts. Textured surface targets trigger points."}, + {"id": "prod-22", "title": "Snorkel Mask Full-Face", "content": "180-degree panoramic viewing with anti-fog design. Dry-top snorkel prevents water entry. GoPro mount for underwater photography."}, + {"id": "prod-23", "title": "Trekking Poles Pair", "content": "Adjustable aluminium trekking poles with cork grips. Shock-absorbing tips reduce knee strain on steep descents and long hikes."}, + {"id": "prod-24", "title": "Solar Power Bank", "content": "20,000mAh portable charger with built-in solar panel. Dual USB outputs charge phone and tablet simultaneously while camping off-grid."}, + {"id": "prod-25", "title": "Wetsuit 3mm Full", "content": "Neoprene full-body wetsuit for surfing and open-water swimming. Sealed seams and back zip entry. Provides warmth in waters down to 15°C."}, + {"id": "prod-26", "title": "Resistance Bands Set", "content": "Set of 5 latex bands in graduated strengths for home workouts. Includes door anchor, ankle straps, and carrying bag for travel fitness."}, + {"id": "prod-27", "title": "Mountain Bike Gloves", "content": "Padded gel cycling gloves with touchscreen-compatible fingertips. Breathable mesh back and silicone grip palm for handlebar control."}, + {"id": "prod-28", "title": "Camping Stove Portable", "content": "Compact gas canister stove that boils water in 3 minutes. Foldable legs and piezo ignition. Weighs just 350g for lightweight backpacking."}, + {"id": "prod-29", "title": "Swim Goggles Racing", "content": "Low-profile competitive swimming goggles with mirrored lenses. Adjustable nose bridge and silicone gasket seal prevent leaking during laps."}, + {"id": "prod-30", "title": "Down Sleeping Bag", "content": "Lightweight 800-fill goose down mummy bag rated to -10°C. Compression sack included for compact packing on multi-day treks."}, + {"id": "prod-31", "title": "Basketball Indoor/Outdoor", "content": "Official size 7 composite leather basketball with deep channel design for superior grip. Suitable for both indoor courts and concrete playgrounds."}, + {"id": "prod-32", "title": "Skateboard Complete", "content": "Canadian maple deck with 52mm wheels and ABEC-7 bearings. Concave shape suits tricks and street skating for beginners and intermediates."}, + {"id": "prod-33", "title": "GPS Handheld Navigator", "content": "Rugged outdoor GPS device with topographic maps and 16-hour battery. Barometric altimeter and electronic compass for backcountry navigation."}, + {"id": "prod-34", "title": "Protein Shaker Bottle", "content": "700ml leak-proof shaker with wire whisk ball for smooth protein shakes. BPA-free tritan plastic and measurement markings on the side."}, + {"id": "prod-35", "title": "Surfboard Shortboard 6ft", "content": "High-performance epoxy shortboard with thruster fin setup. Rounded pin tail for quick turns on steep hollow waves."}, + {"id": "prod-36", "title": "Hiking Backpack 45L", "content": "Multi-day trekking pack with adjustable torso length and padded hip belt. Rain cover included. Multiple compartments for organised packing."}, + {"id": "prod-37", "title": "Table Tennis Set", "content": "Retractable net with two paddles and three balls. Clamps to any table up to 2 inches thick for instant ping pong games at home or office."}, + {"id": "prod-38", "title": "Ski Goggles Anti-Fog", "content": "Dual-lens ski goggles with magnetic lens swap system. OTG design fits over prescription glasses. Helmet-compatible adjustable strap."}, + {"id": "prod-39", "title": "Jump Rope Speed", "content": "Ball-bearing speed rope with adjustable steel cable. Lightweight handles with foam grip. Ideal for double-unders and HIIT cardio training."}, + {"id": "prod-40", "title": "Fishing Rod Telescopic", "content": "Carbon fibre telescopic rod that collapses to 45cm for travel. Medium action suits freshwater lake and river fishing for trout and bass."}, + {"id": "prod-41", "title": "Climbing Chalk Bag", "content": "Drawstring chalk bag with fleece lining and brush holder loop. Belt attachment and zippered pocket for keys. Essential for bouldering."}, + {"id": "prod-42", "title": "Inflatable Stand-Up Paddleboard", "content": "All-round SUP board that inflates to 15 PSI in 5 minutes. Non-slip deck pad and bungee storage for calm lake paddling and coastal touring."}, + {"id": "prod-43", "title": "Weightlifting Belt Leather", "content": "10mm thick genuine leather powerlifting belt with single prong buckle. Provides lumbar support for heavy squats and deadlifts."}, + {"id": "prod-44", "title": "Badminton Racket Pair", "content": "Two lightweight graphite rackets with carrying case and three shuttlecocks. Pre-strung at 22 lbs tension for recreational garden play."}, + {"id": "prod-45", "title": "Bike Repair Multi-Tool", "content": "16-function cycling multi-tool with allen keys, screwdrivers, chain breaker, and tyre levers. Compact enough to fit in a jersey pocket."}, + {"id": "prod-46", "title": "Scuba Diving Fins", "content": "Open-heel adjustable fins with channel thrust blade design. Spring heel straps for easy donning. Suitable for warm and cold water diving."}, + {"id": "prod-47", "title": "Boxing Gloves 12oz", "content": "Multi-layer foam padding with ventilated palm and secure velcro wrist closure. Durable synthetic leather for bag work and sparring."}, + {"id": "prod-48", "title": "Thermal Base Layer", "content": "Merino wool and polyester blend long-sleeve top. Flat-lock seams prevent chafing. Regulates body temperature for skiing and winter hiking."}, + {"id": "prod-49", "title": "Frisbee Golf Disc Set", "content": "Set of 3 discs — driver, midrange, and putter — for disc golf courses. Durable plastic in high-visibility colours with flight ratings printed."}, + {"id": "prod-50", "title": "Action Camera Waterproof", "content": "4K video at 60fps with electronic image stabilisation. Waterproof to 10m without housing. Voice control and WiFi live preview on phone."} + ] +} diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_search.py b/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_search.py new file mode 100644 index 000000000..b961e138d --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/integration/test_search.py @@ -0,0 +1,268 @@ +"""Integration tests for the search API. + +Tests semantic, lexical, and hybrid search modes against a 50-product +catalog to demonstrate how each mode handles different query types. + +Requirements: + - Stack must be deployed (mise run sam:deploy) + - STACK_NAME and AWS_REGION env vars are read from mise.toml + +Usage: + mise run test:integration +""" + +import json +import os +import time +from pathlib import Path + +import boto3 +import pytest +import requests +from requests_aws4auth import AWS4Auth + +STACK_NAME = os.environ.get("STACK_NAME") +REGION = os.environ.get("AWS_REGION") + +if not STACK_NAME or not REGION: + pytest.exit( + "STACK_NAME and AWS_REGION environment variables must be set to run integration tests", + returncode=1, + ) + + +def _get_auth(): + """Get SigV4 auth for API Gateway IAM authorization.""" + credentials = boto3.Session().get_credentials().get_frozen_credentials() + return AWS4Auth( + credentials.access_key, + credentials.secret_key, + REGION, + "execute-api", + session_token=credentials.token, + ) + + +@pytest.fixture(scope="module") +def api_urls(): + """Get API URLs from CloudFormation stack outputs.""" + cfn = boto3.client("cloudformation", region_name=REGION) + response = cfn.describe_stacks(StackName=STACK_NAME) + outputs = { + o["OutputKey"]: o["OutputValue"] + for o in response["Stacks"][0]["Outputs"] + } + return { + "index": outputs["IndexApiUrl"], + "search": outputs["SearchApiUrl"], + "delete": outputs["DeleteApiUrl"], + } + + +@pytest.fixture(scope="module") +def seed_data(api_urls): + """Index test documents and wait for indexing to complete.""" + test_data_path = Path(__file__).parent / "test_data.json" + with open(test_data_path) as f: + payload = json.load(f) + + response = requests.post(api_urls["index"], json=payload, auth=_get_auth(), timeout=120) + assert response.status_code == 200, f"Failed to index: {response.text}" + + result = response.json() + assert result["errors"] is False, f"Bulk index had errors: {result}" + + # Wait for vector index to build + time.sleep(10) + + yield payload["documents"] + + # Cleanup — delete test documents via the API + doc_ids = [doc["id"] for doc in payload["documents"]] + requests.delete( + api_urls["delete"], + json={"ids": doc_ids}, + auth=_get_auth(), + timeout=30, + ) + + +def search(api_url, query, mode="semantic", size=5): + """Helper to perform a search request.""" + response = requests.post( + api_url, + json={"query": query, "mode": mode, "size": size}, + auth=_get_auth(), + timeout=30, + ) + assert response.status_code == 200, f"Search failed: {response.text}" + result = response.json() + + # Print query and results for demonstration + print(f"\n [{mode}] Query: \"{query}\"") + print(f" Results:") + for hit in result["results"]: + print(f" [{hit['score']:.4f}] {hit['title']}") + if not result["results"]: + print(" (no results)") + + return result + + +class TestSemanticSearch: + """Tests that demonstrate semantic (vector) matching.""" + + def test_synonym_matching(self, api_urls, seed_data): + """'shoes for the beach' should match Beach Sandals.""" + result = search(api_urls["search"], "shoes for the beach") + ids = [hit["id"] for hit in result["results"]] + assert "prod-02" in ids + + def test_conceptual_matching(self, api_urls, seed_data): + """'something to keep my feet warm' should match Wool Winter Socks.""" + result = search(api_urls["search"], "something to keep my feet warm") + ids = [hit["id"] for hit in result["results"]] + assert "prod-04" in ids + + def test_cross_domain_inference(self, api_urls, seed_data): + """'footwear for rainy trails' should match Waterproof Hiking Boots.""" + result = search(api_urls["search"], "footwear for rainy trails") + ids = [hit["id"] for hit in result["results"]] + assert "prod-01" in ids + + def test_abstract_activity(self, api_urls, seed_data): + """'relaxing outdoors' should match Camping Hammock.""" + result = search(api_urls["search"], "relaxing outdoors") + ids = [hit["id"] for hit in result["results"]] + assert "prod-07" in ids + + def test_british_english_synonyms(self, api_urls, seed_data): + """'jogging trainers' should match Running Sneakers Pro.""" + result = search(api_urls["search"], "jogging trainers") + ids = [hit["id"] for hit in result["results"]] + assert "prod-03" in ids + + def test_concept_mapping(self, api_urls, seed_data): + """'staying hydrated while exercising' should match Water Bottle.""" + result = search(api_urls["search"], "staying hydrated while exercising") + ids = [hit["id"] for hit in result["results"]] + assert "prod-08" in ids + + def test_activity_to_equipment(self, api_urls, seed_data): + """'recording my surf session' should match Action Camera.""" + result = search(api_urls["search"], "recording my surf session") + ids = [hit["id"] for hit in result["results"]] + assert "prod-50" in ids + + def test_problem_to_solution(self, api_urls, seed_data): + """'reduce knee strain hiking' should match Trekking Poles.""" + result = search(api_urls["search"], "reduce knee strain hiking") + ids = [hit["id"] for hit in result["results"]] + assert "prod-23" in ids + + def test_use_case_matching(self, api_urls, seed_data): + """'charging phone while camping' should match Solar Power Bank.""" + result = search(api_urls["search"], "charging phone while camping") + ids = [hit["id"] for hit in result["results"]] + assert "prod-24" in ids + + +class TestLexicalSearch: + """Tests that demonstrate lexical (BM25 + fuzzy) matching.""" + + def test_exact_keyword(self, api_urls, seed_data): + """'bluetooth speaker' should match the Bluetooth Speaker product.""" + result = search(api_urls["search"], "bluetooth speaker", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-05" in ids + + def test_fuzzy_typo_tolerance(self, api_urls, seed_data): + """'headpohnes' (typo) should still match Headphones via fuzziness=1.""" + result = search(api_urls["search"], "headpohnes", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-06" in ids + + def test_partial_product_name(self, api_urls, seed_data): + """'kayak paddle' should match the carbon kayak paddle.""" + result = search(api_urls["search"], "kayak paddle", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-16" in ids + + def test_specific_attribute(self, api_urls, seed_data): + """'gore-tex' should match the hiking boots.""" + result = search(api_urls["search"], "gore-tex", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-01" in ids + + def test_lexical_fails_on_synonyms(self, api_urls, seed_data): + """'trainers' should NOT match 'sneakers' in pure lexical mode. + + This demonstrates the limitation of keyword matching — it can't + bridge vocabulary gaps that semantic search handles. + """ + result = search(api_urls["search"], "trainers", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + # Running Sneakers should NOT appear because 'trainers' != 'sneakers' + assert "prod-03" not in ids + + +class TestSearchModeComparison: + """Tests that compare semantic vs lexical to illustrate differences.""" + + def test_semantic_finds_what_lexical_misses(self, api_urls, seed_data): + """'trainers' finds Running Sneakers semantically but not lexically.""" + semantic = search(api_urls["search"], "trainers", mode="semantic") + lexical = search(api_urls["search"], "trainers", mode="lexical") + + semantic_ids = [hit["id"] for hit in semantic["results"]] + lexical_ids = [hit["id"] for hit in lexical["results"]] + + assert "prod-03" in semantic_ids # Semantic understands trainers = sneakers + assert "prod-03" not in lexical_ids # Lexical can't bridge the gap + + def test_lexical_precision_on_brand_terms(self, api_urls, seed_data): + """'MIPS' should precisely match cycling helmet via lexical.""" + result = search(api_urls["search"], "MIPS", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-10" in ids + + def test_lexical_handles_model_numbers(self, api_urls, seed_data): + """'4K 60fps' should match action camera via lexical.""" + result = search(api_urls["search"], "4K 60fps", mode="lexical") + ids = [hit["id"] for hit in result["results"]] + assert "prod-50" in ids + + def test_semantic_handles_natural_language(self, api_urls, seed_data): + """'something to play at the office' should match Table Tennis.""" + result = search(api_urls["search"], "something to play at the office") + ids = [hit["id"] for hit in result["results"]] + assert "prod-37" in ids + + +class TestHybridSearch: + """Tests that demonstrate hybrid (lexical + semantic) with normalization.""" + + def test_hybrid_boosts_exact_match(self, api_urls, seed_data): + """'camping tent' should match the tent product in hybrid mode.""" + result = search(api_urls["search"], "camping tent", mode="hybrid") + ids = [hit["id"] for hit in result["results"]] + assert "prod-11" in ids + + def test_hybrid_finds_synonyms_and_keywords(self, api_urls, seed_data): + """'waterproof bag for kayaking' matches Dry Bag via both modes.""" + result = search(api_urls["search"], "waterproof bag for kayaking", mode="hybrid") + ids = [hit["id"] for hit in result["results"]] + assert "prod-18" in ids + + def test_hybrid_handles_typo(self, api_urls, seed_data): + """'skiign goggles' (typo) should still find ski goggles.""" + result = search(api_urls["search"], "ski goggles", mode="hybrid") + ids = [hit["id"] for hit in result["results"]] + assert "prod-38" in ids + + def test_hybrid_multi_intent(self, api_urls, seed_data): + """'swimming equipment' should return swim-related products.""" + result = search(api_urls["search"], "swimming equipment", mode="hybrid") + ids = [hit["id"] for hit in result["results"]] + swim_products = {"prod-22", "prod-25", "prod-29"} + assert len(swim_products & set(ids)) >= 1 diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/requirements.txt b/apigw-lambda-opensearch-serverless-nextgen/tests/requirements.txt new file mode 100644 index 000000000..7bd7ca3c9 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/requirements.txt @@ -0,0 +1,7 @@ +pytest>=8.0 +requests>=2.34.2 +requests-aws4auth>=1.3.2 +boto3>=1.43 +aws-lambda-powertools[tracer]>=3.0 +crhelper +opensearch-py>=2.8.0 diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/__init__.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/__init__.py new file mode 100644 index 000000000..e69de29bb diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/conftest.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/conftest.py new file mode 100644 index 000000000..fbbc58ddb --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/conftest.py @@ -0,0 +1,64 @@ +"""Unit test configuration — sets up isolated environment before any imports.""" + +import os +import sys +from dataclasses import dataclass + +import pytest + +# Prevent any real AWS service calls — fake credentials and region +_REGION = os.environ.get("AWS_REGION", "eu-west-1") + +os.environ["AWS_ACCESS_KEY_ID"] = "testing" +os.environ["AWS_SECRET_ACCESS_KEY"] = "testing" +os.environ["AWS_SECURITY_TOKEN"] = "testing" +os.environ["AWS_SESSION_TOKEN"] = "testing" +os.environ.setdefault("AWS_DEFAULT_REGION", _REGION) +os.environ.setdefault("AWS_REGION", _REGION) + +# Disable X-Ray tracing so Tracer uses a no-op provider +os.environ["POWERTOOLS_TRACE_DISABLED"] = "true" +os.environ["POWERTOOLS_SERVICE_NAME"] = "test" + +# Lambda function env vars +os.environ["COLLECTION_ENDPOINT"] = "https://test-collection.eu-west-1.aoss.amazonaws.com" +os.environ["COLLECTION_NAME"] = "semantic-search" +os.environ["INDEX_NAME"] = "documents" +os.environ["MODEL_ID"] = "test-model-id" +os.environ["INGEST_PIPELINE"] = "embedding-ingest-pipeline" + +# Add the layer source to the path so Lambda functions can import opensearch_client +sys.path.insert(0, os.path.join(os.path.dirname(__file__), "..", "..", "layers", "opensearch_client")) + + +@dataclass +class FakeLambdaContext: + """Minimal Lambda context for Powertools inject_lambda_context.""" + + function_name: str = "test-function" + memory_limit_in_mb: int = 256 + invoked_function_arn: str = f"arn:aws:lambda:{_REGION}:123456789012:function:test" + aws_request_id: str = "test-request-id" + + +@pytest.fixture +def lambda_context(): + """Provide a fake Lambda context for Powertools.""" + return FakeLambdaContext() + + +@pytest.fixture +def apigw_event(): + """Build a minimal API Gateway proxy event.""" + def _make(body=None, method="POST", path="/"): + return { + "body": body if isinstance(body, str) else __import__("json").dumps(body or {}), + "httpMethod": method, + "path": path, + "requestContext": { + "requestId": "test-request-id", + "stage": "Prod", + }, + "headers": {"Content-Type": "application/json"}, + } + return _make diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_collection_group.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_collection_group.py new file mode 100644 index 000000000..de82ac874 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_collection_group.py @@ -0,0 +1,151 @@ +"""Unit tests for the NextGen collection group custom resource handler.""" + +import sys +import os +import importlib.util +from unittest.mock import patch, MagicMock + +_cg_path = os.path.join(os.path.dirname(__file__), "..", "..", "lambda", "custom_resources", "nextgen_collection_group", "app.py") +_spec = importlib.util.spec_from_file_location("cg_app", _cg_path) +cg_app = importlib.util.module_from_spec(_spec) +sys.modules["cg_app"] = cg_app +_spec.loader.exec_module(cg_app) + +on_create = cg_app.on_create +on_update = cg_app.on_update +on_delete = cg_app.on_delete + + +def _make_event(props=None, physical_id=None): + """Build a minimal CloudFormation custom resource event.""" + event = { + "ResourceProperties": props or { + "Name": "test-collection-group", + "Description": "Test group", + "MaxIndexingCapacityInOCU": "8", + "MaxSearchCapacityInOCU": "8", + }, + } + if physical_id: + event["PhysicalResourceId"] = physical_id + return event + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_create_calls_api_with_correct_params(mock_client, mock_helper, lambda_context): + """Create handler calls create_collection_group with NextGen generation.""" + mock_client.create_collection_group.return_value = { + "createCollectionGroupDetail": { + "id": "abc123def456", + "arn": "arn:aws:aoss:eu-west-1:123456789012:collection-group/abc123def456", + "name": "test-collection-group", + } + } + + result = on_create(_make_event(), lambda_context) + + mock_client.create_collection_group.assert_called_once_with( + name="test-collection-group", + standbyReplicas="ENABLED", + generation="NEXTGEN", + description="Test group", + capacityLimits={ + "maxIndexingCapacityInOCU": 8, + "maxSearchCapacityInOCU": 8, + }, + ) + assert result == "abc123def456" + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_create_sets_helper_data(mock_client, mock_helper, lambda_context): + """Create handler populates helper.Data with outputs.""" + mock_helper.Data = {} + mock_client.create_collection_group.return_value = { + "createCollectionGroupDetail": { + "id": "abc123def456", + "arn": "arn:aws:aoss:eu-west-1:123456789012:collection-group/abc123def456", + "name": "test-collection-group", + } + } + + on_create(_make_event(), lambda_context) + + assert mock_helper.Data["Id"] == "abc123def456" + assert mock_helper.Data["Generation"] == "NEXTGEN" + assert mock_helper.Data["Name"] == "test-collection-group" + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_update_calls_api_with_physical_id(mock_client, mock_helper, lambda_context): + """Update handler uses the physical resource ID to update.""" + event = _make_event(physical_id="abc123def456") + + on_update(event, lambda_context) + + mock_client.update_collection_group.assert_called_once_with( + id="abc123def456", + description="Test group", + capacityLimits={ + "maxIndexingCapacityInOCU": 8, + "maxSearchCapacityInOCU": 8, + }, + ) + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_delete_calls_api(mock_client, mock_helper, lambda_context): + """Delete handler calls delete_collection_group.""" + event = _make_event(physical_id="abc123def456") + + on_delete(event, lambda_context) + + mock_client.delete_collection_group.assert_called_once_with(id="abc123def456") + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_delete_skips_invalid_physical_id(mock_client, mock_helper, lambda_context): + """Delete handler skips deletion when physical ID looks like a CFN logical ID.""" + event = _make_event(physical_id="CollectionGroup-ABCDEF") + + on_delete(event, lambda_context) + + mock_client.delete_collection_group.assert_not_called() + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_delete_handles_not_found(mock_client, mock_helper, lambda_context): + """Delete handler gracefully handles ResourceNotFoundException.""" + mock_client.exceptions.ResourceNotFoundException = type("ResourceNotFoundException", (Exception,), {}) + mock_client.delete_collection_group.side_effect = mock_client.exceptions.ResourceNotFoundException() + + event = _make_event(physical_id="abc123def456") + + # Should not raise + on_delete(event, lambda_context) + + +@patch("cg_app.helper") +@patch("cg_app.client") +def test_create_default_capacity(mock_client, mock_helper, lambda_context): + """Create uses default capacity of 2 when not specified.""" + mock_client.create_collection_group.return_value = { + "createCollectionGroupDetail": { + "id": "abc123", + "arn": "arn:aws:aoss:eu-west-1:123456789012:collection-group/abc123", + "name": "minimal", + } + } + + event = _make_event(props={"Name": "minimal"}) + on_create(event, lambda_context) + + call_kwargs = mock_client.create_collection_group.call_args[1] + assert call_kwargs["capacityLimits"]["maxIndexingCapacityInOCU"] == 2 + assert call_kwargs["capacityLimits"]["maxSearchCapacityInOCU"] == 2 diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_delete_documents.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_delete_documents.py new file mode 100644 index 000000000..dc132ffbc --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_delete_documents.py @@ -0,0 +1,121 @@ +"""Unit tests for the delete documents Lambda handler.""" + +import json +import sys +import os +import importlib.util +from unittest.mock import patch, MagicMock + +_delete_path = os.path.join(os.path.dirname(__file__), "..", "..", "lambda", "delete_documents", "app.py") +_spec = importlib.util.spec_from_file_location("delete_app", _delete_path) +delete_app = importlib.util.module_from_spec(_spec) +sys.modules["delete_app"] = delete_app +_spec.loader.exec_module(delete_app) + +handler = delete_app.handler + + +@patch("delete_app.get_client") +def test_delete_single_document(mock_get_client, apigw_event, lambda_context): + """Deleting a single document calls bulk with correct delete action.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={"ids": ["doc-1"]}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert "1 document" in body["message"] + assert body["errors"] is False + + +@patch("delete_app.get_client") +def test_delete_multiple_documents(mock_get_client, apigw_event, lambda_context): + """Deleting multiple documents sends all in a single bulk request.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={"ids": ["doc-1", "doc-2", "doc-3"]}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert "3 document" in body["message"] + + bulk_body = mock_client.bulk.call_args[1]["body"] + assert len(bulk_body) == 3 + assert all(item.get("delete") for item in bulk_body) + + +@patch("delete_app.get_client") +def test_delete_bulk_body_contains_correct_ids(mock_get_client, apigw_event, lambda_context): + """Bulk request contains the correct document IDs and index.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={"ids": ["doc-a", "doc-b"]}) + handler(event, lambda_context) + + bulk_body = mock_client.bulk.call_args[1]["body"] + assert bulk_body[0] == {"delete": {"_index": "documents", "_id": "doc-a"}} + assert bulk_body[1] == {"delete": {"_index": "documents", "_id": "doc-b"}} + + +@patch("delete_app.get_client") +def test_empty_ids_returns_400(mock_get_client, apigw_event, lambda_context): + """Empty ids array returns 400 without calling OpenSearch.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"ids": []}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + assert "no document" in json.loads(result["body"])["error"].lower() + mock_client.bulk.assert_not_called() + + +@patch("delete_app.get_client") +def test_missing_ids_key_returns_400(mock_get_client, apigw_event, lambda_context): + """Missing ids key returns 400.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"documents": ["doc-1"]}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + mock_client.bulk.assert_not_called() + + +@patch("delete_app.get_client") +def test_bulk_errors_reported(mock_get_client, apigw_event, lambda_context): + """Bulk errors from OpenSearch are surfaced in the response.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": True, "items": []} + + event = apigw_event(body={"ids": ["doc-1"]}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["errors"] is True + + +@patch("delete_app.get_client") +def test_opensearch_error_returns_500(mock_get_client, apigw_event, lambda_context): + """OpenSearch client exception returns 500.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.side_effect = Exception("Service unavailable") + + event = apigw_event(body={"ids": ["doc-1"]}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 500 + assert "Service unavailable" in json.loads(result["body"])["error"] diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_index_documents.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_index_documents.py new file mode 100644 index 000000000..9a65cfb42 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_index_documents.py @@ -0,0 +1,173 @@ +"""Unit tests for the index documents Lambda handler.""" + +import json +import sys +import os +import importlib.util +from unittest.mock import patch, MagicMock + +_index_path = os.path.join(os.path.dirname(__file__), "..", "..", "lambda", "index_documents", "app.py") +_spec = importlib.util.spec_from_file_location("index_app", _index_path) +index_app = importlib.util.module_from_spec(_spec) +sys.modules["index_app"] = index_app +_spec.loader.exec_module(index_app) + +handler = index_app.handler + + +@patch("index_app.get_client") +def test_index_single_document(mock_get_client, apigw_event, lambda_context): + """Indexing a single document calls bulk with correct structure.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "Test", "content": "Content"}] + }) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert "1 document" in body["message"] + assert body["errors"] is False + + +@patch("index_app.get_client") +def test_index_multiple_documents(mock_get_client, apigw_event, lambda_context): + """Indexing multiple documents sends all in a single bulk request.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + docs = [ + {"id": f"doc-{i}", "title": f"Title {i}", "content": f"Content {i}"} + for i in range(5) + ] + event = apigw_event(body={"documents": docs}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert "5 document" in body["message"] + + # Verify bulk was called with 10 items (action + source for each doc) + bulk_body = mock_client.bulk.call_args[1]["body"] + assert len(bulk_body) == 10 + + +@patch("index_app.get_client") +def test_index_uses_ingest_pipeline(mock_get_client, apigw_event, lambda_context): + """Bulk index request specifies the ingest pipeline for embedding generation.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "Test", "content": "Content"}] + }) + handler(event, lambda_context) + + call_kwargs = mock_client.bulk.call_args[1] + assert call_kwargs["pipeline"] == "embedding-ingest-pipeline" + + +@patch("index_app.get_client") +def test_embedding_text_combines_title_and_content(mock_get_client, apigw_event, lambda_context): + """The embedding_text field concatenates title and content.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "My Title", "content": "My content"}] + }) + handler(event, lambda_context) + + bulk_body = mock_client.bulk.call_args[1]["body"] + doc_body = bulk_body[1] + assert doc_body["embedding_text"] == "My Title. My content" + + +@patch("index_app.get_client") +def test_embedding_text_without_title(mock_get_client, apigw_event, lambda_context): + """When title is empty, embedding_text is just the content.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = {"errors": False, "items": []} + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "", "content": "Only content"}] + }) + handler(event, lambda_context) + + bulk_body = mock_client.bulk.call_args[1]["body"] + doc_body = bulk_body[1] + assert doc_body["embedding_text"] == "Only content" + + +@patch("index_app.get_client") +def test_empty_documents_returns_400(mock_get_client, apigw_event, lambda_context): + """Empty documents array returns 400 without calling OpenSearch.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"documents": []}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + assert "no documents" in json.loads(result["body"])["error"].lower() + mock_client.bulk.assert_not_called() + + +@patch("index_app.get_client") +def test_no_documents_key_returns_400(mock_get_client, apigw_event, lambda_context): + """Missing documents key returns 400.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"data": "something"}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + mock_client.bulk.assert_not_called() + + +@patch("index_app.get_client") +def test_bulk_errors_reported_in_response(mock_get_client, apigw_event, lambda_context): + """Bulk errors from OpenSearch are surfaced in the response.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.return_value = { + "errors": True, + "items": [ + {"index": {"_id": "doc-1", "error": {"type": "mapper_parsing_exception", "reason": "bad field"}}} + ], + } + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "Test", "content": "Content"}] + }) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["errors"] is True + assert len(body["item_errors"]) == 1 + assert body["item_errors"][0]["id"] == "doc-1" + + +@patch("index_app.get_client") +def test_opensearch_error_returns_500(mock_get_client, apigw_event, lambda_context): + """OpenSearch client exception returns 500.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.bulk.side_effect = Exception("Connection refused") + + event = apigw_event(body={ + "documents": [{"id": "doc-1", "title": "Test", "content": "Content"}] + }) + result = handler(event, lambda_context) + + assert result["statusCode"] == 500 + assert "Connection refused" in json.loads(result["body"])["error"] diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_search.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_search.py new file mode 100644 index 000000000..4dd15680c --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_search.py @@ -0,0 +1,171 @@ +"""Unit tests for the search Lambda handler.""" + +import json +import sys +import os +import importlib.util +from unittest.mock import patch, MagicMock + +# Import the search handler from its specific path +_search_path = os.path.join(os.path.dirname(__file__), "..", "..", "lambda", "search", "app.py") +_spec = importlib.util.spec_from_file_location("search_app", _search_path) +search_app = importlib.util.module_from_spec(_spec) +sys.modules["search_app"] = search_app +_spec.loader.exec_module(search_app) + +handler = search_app.handler + + +def _opensearch_response(hits, total=None): + """Build a mock OpenSearch search response.""" + if total is None: + total = len(hits) + return { + "hits": { + "total": {"value": total, "relation": "eq"}, + "hits": [ + { + "_id": h["id"], + "_score": h.get("score", 0.9), + "_source": {"title": h.get("title", ""), "content": h.get("content", "")}, + } + for h in hits + ], + } + } + + +@patch("search_app.get_client") +def test_semantic_search_returns_results(mock_get_client, apigw_event, lambda_context): + """Semantic search with valid query returns formatted results.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response( + [{"id": "doc-1", "score": 0.92, "title": "Test Doc", "content": "Test content"}] + ) + + event = apigw_event(body={"query": "test query", "mode": "semantic"}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 200 + body = json.loads(result["body"]) + assert body["mode"] == "semantic" + assert len(body["results"]) == 1 + assert body["results"][0]["id"] == "doc-1" + assert body["results"][0]["score"] == 0.92 + + +@patch("search_app.get_client") +def test_semantic_is_default_mode(mock_get_client, apigw_event, lambda_context): + """When no mode is specified, defaults to semantic.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response([]) + + event = apigw_event(body={"query": "test"}) + result = handler(event, lambda_context) + + body = json.loads(result["body"]) + assert body["mode"] == "semantic" + + +@patch("search_app.get_client") +def test_lexical_search_uses_multi_match(mock_get_client, apigw_event, lambda_context): + """Lexical mode sends a multi_match query with fuzzy matching.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response([]) + + event = apigw_event(body={"query": "hiking boots", "mode": "lexical"}) + handler(event, lambda_context) + + call_kwargs = mock_client.search.call_args[1] + assert "multi_match" in json.dumps(call_kwargs["body"]) + + +@patch("search_app.get_client") +def test_hybrid_search_uses_search_pipeline(mock_get_client, apigw_event, lambda_context): + """Hybrid mode passes search_pipeline param to OpenSearch.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response([]) + + event = apigw_event(body={"query": "waterproof bag", "mode": "hybrid"}) + handler(event, lambda_context) + + call_kwargs = mock_client.search.call_args[1] + assert call_kwargs["params"]["search_pipeline"] == "hybrid-search-pipeline" + + +@patch("search_app.get_client") +def test_hybrid_search_combines_lexical_and_neural(mock_get_client, apigw_event, lambda_context): + """Hybrid query body contains both multi_match and neural queries.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response([]) + + event = apigw_event(body={"query": "camping gear", "mode": "hybrid"}) + handler(event, lambda_context) + + search_body = mock_client.search.call_args[1]["body"] + assert "hybrid" in search_body["query"] + queries = search_body["query"]["hybrid"]["queries"] + query_str = json.dumps(queries) + assert "multi_match" in query_str + assert "neural" in query_str + + +@patch("search_app.get_client") +def test_custom_size_parameter(mock_get_client, apigw_event, lambda_context): + """Size parameter controls the number of results requested.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.return_value = _opensearch_response([]) + + event = apigw_event(body={"query": "test", "size": 10}) + handler(event, lambda_context) + + search_body = mock_client.search.call_args[1]["body"] + assert search_body["size"] == 10 + + +@patch("search_app.get_client") +def test_missing_query_returns_400(mock_get_client, apigw_event, lambda_context): + """Missing query field returns 400 without calling OpenSearch.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"mode": "semantic"}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + assert "query" in json.loads(result["body"])["error"].lower() + mock_client.search.assert_not_called() + + +@patch("search_app.get_client") +def test_invalid_mode_returns_400(mock_get_client, apigw_event, lambda_context): + """Invalid mode value returns 400.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + event = apigw_event(body={"query": "test", "mode": "invalid"}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 400 + assert "mode" in json.loads(result["body"])["error"].lower() + mock_client.search.assert_not_called() + + +@patch("search_app.get_client") +def test_opensearch_error_returns_500(mock_get_client, apigw_event, lambda_context): + """OpenSearch client exception returns 500 with error message.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.search.side_effect = Exception("Connection timeout") + + event = apigw_event(body={"query": "test"}) + result = handler(event, lambda_context) + + assert result["statusCode"] == 500 + assert "Connection timeout" in json.loads(result["body"])["error"] diff --git a/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_setup_pipeline.py b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_setup_pipeline.py new file mode 100644 index 000000000..1faee5923 --- /dev/null +++ b/apigw-lambda-opensearch-serverless-nextgen/tests/unit/test_setup_pipeline.py @@ -0,0 +1,188 @@ +"""Unit tests for the setup pipeline custom resource handler.""" + +import sys +import os +import importlib.util +from unittest.mock import patch, MagicMock + +import pytest + +_setup_path = os.path.join(os.path.dirname(__file__), "..", "..", "lambda", "custom_resources", "setup_pipeline", "app.py") +_spec = importlib.util.spec_from_file_location("setup_app", _setup_path) +setup_app = importlib.util.module_from_spec(_spec) +sys.modules["setup_app"] = setup_app +_spec.loader.exec_module(setup_app) + +on_create = setup_app.on_create +on_delete = setup_app.on_delete +_wait_for_model_deployed = setup_app._wait_for_model_deployed +_retry_with_backoff = setup_app._retry_with_backoff + + +def _make_event(physical_id=None): + """Build a minimal CloudFormation custom resource event.""" + event = { + "ResourceProperties": { + "CollectionEndpoint": "https://test.eu-west-1.aoss.amazonaws.com", + "ModelRoleArn": "arn:aws:iam::123456789012:role/MLRole", + "EmbeddingModelId": "amazon.titan-embed-text-v2:0", + "EmbeddingDimension": "1024", + }, + } + if physical_id: + event["PhysicalResourceId"] = physical_id + return event + + +@patch("setup_app.helper") +@patch("setup_app.get_client") +def test_create_registers_connector_and_model(mock_get_client, mock_helper, lambda_context): + """Create handler creates connector, registers model, deploys, and creates pipelines.""" + mock_helper.Data = {} + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_client.transport.perform_request.side_effect = [ + # Step 1: Create connector + {"connector_id": "conn-123"}, + # Step 2: Register model + {"model_id": "model-456"}, + # Step 3: Deploy model + {}, + # Step 3b: Wait for deploy (poll) + {"model_state": "DEPLOYED"}, + # Step 4: Create ingest pipeline + {"acknowledged": True}, + # Step 5: Create search pipeline + {"acknowledged": True}, + ] + + result = on_create(_make_event(), lambda_context) + + assert result == "conn-123/model-456" + assert mock_helper.Data["ModelId"] == "model-456" + assert mock_helper.Data["ConnectorId"] == "conn-123" + assert mock_helper.Data["IngestPipeline"] == "embedding-ingest-pipeline" + assert mock_helper.Data["SearchPipeline"] == "hybrid-search-pipeline" + + +@patch("setup_app.helper") +@patch("setup_app.get_client") +def test_create_connector_body_contains_bedrock_config(mock_get_client, mock_helper, lambda_context): + """Connector body includes correct Bedrock model and role ARN.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + + mock_client.transport.perform_request.side_effect = [ + {"connector_id": "conn-123"}, + {"model_id": "model-456"}, + {}, + {"model_state": "DEPLOYED"}, + {"acknowledged": True}, + {"acknowledged": True}, + ] + + on_create(_make_event(), lambda_context) + + # First call is the connector creation + first_call = mock_client.transport.perform_request.call_args_list[0] + assert first_call[0][0] == "POST" + assert "/_plugins/_ml/connectors/_create" in first_call[0][1] + connector_body = first_call[1]["body"] + assert connector_body["credential"]["roleArn"] == "arn:aws:iam::123456789012:role/MLRole" + + +@patch("setup_app.helper") +@patch("setup_app.get_client") +def test_delete_cleans_up_resources(mock_get_client, mock_helper, lambda_context): + """Delete handler removes pipelines, undeploys model, and deletes connector.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.transport.perform_request.return_value = {} + + event = _make_event(physical_id="conn-123/model-456") + on_delete(event, lambda_context) + + calls = mock_client.transport.perform_request.call_args_list + methods_and_paths = [(c[0][0], c[0][1]) for c in calls] + + assert ("DELETE", "/_ingest/pipeline/embedding-ingest-pipeline") in methods_and_paths + assert ("DELETE", "/_search/pipeline/hybrid-search-pipeline") in methods_and_paths + assert ("POST", "/_plugins/_ml/models/model-456/_undeploy") in methods_and_paths + assert ("DELETE", "/_plugins/_ml/models/model-456") in methods_and_paths + assert ("DELETE", "/_plugins/_ml/connectors/conn-123") in methods_and_paths + + +@patch("setup_app.helper") +@patch("setup_app.get_client") +def test_delete_handles_pipeline_errors_gracefully(mock_get_client, mock_helper, lambda_context): + """Delete handler continues even if pipeline deletion fails.""" + mock_client = MagicMock() + mock_get_client.return_value = mock_client + mock_client.transport.perform_request.side_effect = Exception("Not found") + + event = _make_event(physical_id="conn-123/model-456") + + # Should not raise + on_delete(event, lambda_context) + + +def test_wait_for_model_deployed_success(): + """_wait_for_model_deployed returns True when model reaches DEPLOYED.""" + mock_client = MagicMock() + mock_client.transport.perform_request.side_effect = [ + {"model_state": "REGISTERING"}, + {"model_state": "DEPLOYING"}, + {"model_state": "DEPLOYED"}, + ] + + result = _wait_for_model_deployed(mock_client, "model-1", max_attempts=5, delay=0) + assert result is True + + +def test_wait_for_model_deployed_failure(): + """_wait_for_model_deployed raises on DEPLOY_FAILED.""" + mock_client = MagicMock() + mock_client.transport.perform_request.return_value = {"model_state": "DEPLOY_FAILED"} + + with pytest.raises(RuntimeError, match="DEPLOY_FAILED"): + _wait_for_model_deployed(mock_client, "model-1", max_attempts=3, delay=0) + + +def test_wait_for_model_deployed_timeout(): + """_wait_for_model_deployed raises TimeoutError if model doesn't deploy.""" + mock_client = MagicMock() + mock_client.transport.perform_request.return_value = {"model_state": "DEPLOYING"} + + with pytest.raises(TimeoutError): + _wait_for_model_deployed(mock_client, "model-1", max_attempts=2, delay=0) + + +def test_retry_with_backoff_succeeds_on_first_try(): + """_retry_with_backoff returns immediately when function succeeds.""" + result = _retry_with_backoff(lambda: "success", max_attempts=3, initial_delay=0) + assert result == "success" + + +def test_retry_with_backoff_retries_on_403(): + """_retry_with_backoff retries on 403 Forbidden errors.""" + attempts = {"count": 0} + + def flaky(): + attempts["count"] += 1 + if attempts["count"] < 3: + raise Exception("403 Forbidden") + return "ok" + + result = _retry_with_backoff(flaky, max_attempts=5, initial_delay=0) + assert result == "ok" + assert attempts["count"] == 3 + + +def test_retry_with_backoff_raises_non_auth_errors(): + """_retry_with_backoff does not retry non-authorization errors.""" + def always_fails(): + raise ValueError("Something else broke") + + with pytest.raises(ValueError, match="Something else broke"): + _retry_with_backoff(always_fails, max_attempts=3, initial_delay=0)