-
Notifications
You must be signed in to change notification settings - Fork 17
feat: parallel and map branch name #387
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -31,4 +31,6 @@ dist/ | |
| .kiro/ | ||
|
|
||
| /examples/build/* | ||
| /examples/*.zip | ||
| /examples/*.zip | ||
|
|
||
| .env | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,30 @@ | ||
| """Example demonstrating map operations with custom iteration naming.""" | ||
|
|
||
| from typing import Any | ||
|
|
||
| from aws_durable_execution_sdk_python.config import MapConfig | ||
| from aws_durable_execution_sdk_python.context import DurableContext | ||
| from aws_durable_execution_sdk_python.execution import durable_execution | ||
|
|
||
|
|
||
| @durable_execution | ||
| def handler(_event: Any, context: DurableContext) -> list[str]: | ||
| """Process orders using context.map() with custom iteration names.""" | ||
| orders = [ | ||
| {"id": "order-101", "amount": 25}, | ||
| {"id": "order-102", "amount": 50}, | ||
| {"id": "order-103", "amount": 75}, | ||
| ] | ||
|
|
||
| return context.map( | ||
| inputs=orders, | ||
| func=lambda ctx, order, index, _: ctx.step( | ||
| lambda _: f"processed-{order['id']}-${order['amount']}", | ||
| name=f"process_{order['id']}", | ||
| ), | ||
| name="process_orders", | ||
| config=MapConfig( | ||
| max_concurrency=2, | ||
| item_namer=lambda order, index: f"order-{order['id']}", | ||
| ), | ||
| ).get_results() |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,51 @@ | ||
| """Example demonstrating all parallel branch patterns.""" | ||
|
|
||
| from typing import Any | ||
|
|
||
| from aws_durable_execution_sdk_python.config import ParallelBranch, ParallelConfig | ||
| from aws_durable_execution_sdk_python.context import ( | ||
| DurableContext, | ||
| durable_parallel_branch, | ||
| ) | ||
| from aws_durable_execution_sdk_python.execution import durable_execution | ||
|
|
||
|
|
||
| @durable_parallel_branch(name="fetch-orders") | ||
| def fetch_orders(ctx: DurableContext) -> str: | ||
| return ctx.step(lambda _: "orders-loaded", name="load_orders") | ||
|
|
||
|
|
||
| @durable_parallel_branch() | ||
| def fetch_preferences(ctx: DurableContext) -> str: | ||
| return ctx.step(lambda _: "prefs-loaded", name="load_prefs") | ||
|
|
||
|
|
||
| @durable_execution | ||
| def handler(_event: Any, context: DurableContext) -> list[str]: | ||
| """Execute parallel branches using all supported patterns.""" | ||
|
|
||
| return context.parallel( | ||
| functions=[ | ||
| # 1. Named parallel branch with ParallelBranch | ||
| ParallelBranch( | ||
| func=lambda ctx: ctx.step( | ||
| lambda _: "user-data-loaded", name="load_user" | ||
| ), | ||
| name="fetch-user-data", | ||
| ), | ||
| # 2. Named parallel branch with decorator | ||
| fetch_orders(), | ||
| # 3. Unnamed parallel branch with decorator | ||
| fetch_preferences(), | ||
| # 4. Unnamed parallel branch with ParallelBranch | ||
| ParallelBranch( | ||
| func=lambda ctx: ctx.step( | ||
| lambda _: "metrics-loaded", name="load_metrics" | ||
| ), | ||
| ), | ||
| # 5. No wrapper, just a raw callable | ||
| lambda ctx: ctx.step(lambda _: "config-loaded", name="load_config"), | ||
| ], | ||
| name="load_all_data", | ||
| config=ParallelConfig(max_concurrency=3), | ||
| ).get_results() | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -941,6 +941,42 @@ | |
| "ExecutionTimeout": 300 | ||
| } | ||
| } | ||
| }, | ||
| "MapWithItemNamer": { | ||
| "Type": "AWS::Serverless::Function", | ||
| "Properties": { | ||
| "CodeUri": "build/", | ||
| "Handler": "map_with_item_namer.handler", | ||
| "Description": "Map operation with custom item_namer for iteration naming", | ||
| "Role": { | ||
| "Fn::GetAtt": [ | ||
| "DurableFunctionRole", | ||
| "Arn" | ||
| ] | ||
| }, | ||
| "DurableConfig": { | ||
| "RetentionPeriodInDays": 7, | ||
| "ExecutionTimeout": 300 | ||
| } | ||
| } | ||
| }, | ||
| "ParallelWithNamedBranches": { | ||
| "Type": "AWS::Serverless::Function", | ||
| "Properties": { | ||
| "CodeUri": "build/", | ||
| "Handler": "parallel_with_named_branches.handler", | ||
| "Description": "Parallel operation with named branches using ParallelBranch", | ||
| "Role": { | ||
| "Fn::GetAtt": [ | ||
| "DurableFunctionRole", | ||
| "Arn" | ||
| ] | ||
| }, | ||
| "DurableConfig": { | ||
| "RetentionPeriodInDays": 7, | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: this could be in Globals |
||
| "ExecutionTimeout": 300 | ||
| } | ||
| } | ||
| } | ||
| } | ||
| } | ||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,39 @@ | ||
| """Tests for map_with_item_namer example.""" | ||
|
|
||
| import pytest | ||
| from src.map import map_with_item_namer | ||
| from test.conftest import deserialize_operation_payload | ||
|
|
||
| from aws_durable_execution_sdk_python.execution import InvocationStatus | ||
| from aws_durable_execution_sdk_python.lambda_service import ( | ||
| OperationStatus, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.example | ||
| @pytest.mark.durable_execution( | ||
| handler=map_with_item_namer.handler, | ||
| lambda_function_name="map with item namer", | ||
| ) | ||
| def test_map_with_item_namer(durable_runner): | ||
| """Test map example with custom item_namer for iteration naming.""" | ||
| with durable_runner: | ||
| result = durable_runner.run(input="test", timeout=10) | ||
|
|
||
| assert result.status is InvocationStatus.SUCCEEDED | ||
| assert deserialize_operation_payload(result.result) == [ | ||
| "processed-order-101-$25", | ||
| "processed-order-102-$50", | ||
| "processed-order-103-$75", | ||
| ] | ||
|
|
||
| # Get the map operation | ||
| map_op = result.get_context("process_orders") | ||
| assert map_op is not None | ||
| assert map_op.status is OperationStatus.SUCCEEDED | ||
|
|
||
| # Verify custom iteration names from item_namer | ||
| assert len(map_op.child_operations) == 3 | ||
| child_names = {op.name for op in map_op.child_operations} | ||
| expected_names = {"order-order-101", "order-order-102", "order-order-103"} | ||
| assert child_names == expected_names |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,57 @@ | ||
| """Tests for parallel_with_named_branches example.""" | ||
|
|
||
| import pytest | ||
| from src.parallel import parallel_with_named_branches | ||
| from test.conftest import deserialize_operation_payload | ||
|
|
||
| from aws_durable_execution_sdk_python.execution import InvocationStatus | ||
| from aws_durable_execution_sdk_python.lambda_service import ( | ||
| OperationStatus, | ||
| OperationType, | ||
| ) | ||
|
|
||
|
|
||
| @pytest.mark.example | ||
| @pytest.mark.durable_execution( | ||
| handler=parallel_with_named_branches.handler, | ||
| lambda_function_name="parallel with named branches", | ||
| ) | ||
| def test_parallel_with_named_branches(durable_runner): | ||
| """Test parallel example with all branch patterns.""" | ||
| with durable_runner: | ||
| result = durable_runner.run(input="test", timeout=10) | ||
|
|
||
| assert result.status is InvocationStatus.SUCCEEDED | ||
| assert deserialize_operation_payload(result.result) == [ | ||
| "user-data-loaded", | ||
| "orders-loaded", | ||
| "prefs-loaded", | ||
| "metrics-loaded", | ||
| "config-loaded", | ||
| ] | ||
|
|
||
| # Get the parallel operation | ||
| parallel_op = result.get_context("load_all_data") | ||
| assert parallel_op is not None | ||
| assert parallel_op.status is OperationStatus.SUCCEEDED | ||
|
|
||
| # Verify branch names: named branches have custom names, unnamed use defaults | ||
| assert len(parallel_op.child_operations) == 5 | ||
|
|
||
| child_names = [op.name for op in parallel_op.child_operations] | ||
|
|
||
| # 1. Named ParallelBranch | ||
| assert child_names[0] == "fetch-user-data" | ||
| # 2. Named decorator | ||
| assert child_names[1] == "fetch-orders" | ||
| # 3. Unnamed decorator (None name falls back to index-based default) | ||
| assert child_names[2] == "parallel-branch-2" | ||
| # 4. Unnamed ParallelBranch (None name falls back to index-based default) | ||
| assert child_names[3] == "parallel-branch-3" | ||
| # 5. Raw callable (no ParallelBranch wrapper, index-based default) | ||
| assert child_names[4] == "parallel-branch-4" | ||
|
|
||
| # Verify all children succeeded | ||
| for child in parallel_op.child_operations: | ||
| assert child.operation_type == OperationType.CONTEXT | ||
| assert child.status is OperationStatus.SUCCEEDED |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -194,6 +194,14 @@ def execute_item( | |
| """Execute a single executable in a child context and return the result.""" | ||
| raise NotImplementedError | ||
|
|
||
| def get_iteration_name(self, index: int) -> str: | ||
| """Get the display name for an iteration/branch at the given index. | ||
|
|
||
| Subclasses can override this to provide custom naming (e.g., from item_namer | ||
| or branch names). The default returns "{name_prefix}{index}". | ||
| """ | ||
| return f"{self.name_prefix}{index}" | ||
|
|
||
| def execute( | ||
| self, execution_state: ExecutionState, executor_context: DurableContext | ||
| ) -> BatchResult[ResultType]: | ||
|
|
@@ -410,7 +418,7 @@ def _execute_item_in_child_context( | |
| operation_id: str = executor_context._create_step_id_for_logical_step( # noqa: SLF001 | ||
| executable.index | ||
| ) | ||
| name: str = f"{self.name_prefix}{executable.index}" | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Not sure why we did this for both parallel and map. For parallel branches that use Python functions, they already have their unique names in |
||
| name: str = self.get_iteration_name(executable.index) | ||
| is_virtual: bool = self.nesting_type is NestingType.FLAT | ||
|
|
||
| child_context: DurableContext = executor_context.create_child_context( | ||
|
|
||
Uh oh!
There was an error while loading. Please reload this page.