Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
65 changes: 65 additions & 0 deletions examples/src/with_retry/with_retry_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,65 @@
"""Demonstrates with_retry wrapping a wait_for_callback operation.

The callback may fail multiple times before succeeding. The with_retry helper
retries the entire callback flow (including creating a new callback each attempt)
with exponential backoff between attempts.
"""

from typing import Any

from aws_durable_execution_sdk_python.config import Duration, WaitForCallbackConfig
from aws_durable_execution_sdk_python.context import DurableContext
from aws_durable_execution_sdk_python.execution import durable_execution
from aws_durable_execution_sdk_python.retries import (
RetryStrategyConfig,
WithRetryConfig,
with_retry,
)


@durable_execution
def handler(_event: Any, context: DurableContext) -> dict[str, Any]:
"""Handler demonstrating with_retry around a wait_for_callback.

The external system may fail to process the callback multiple times.
with_retry will re-create the callback and wait again on each retry,
with exponential backoff between attempts.
"""

def retryable_callback_flow(ctx: DurableContext, attempt: int) -> str:
"""The retryable block: create a callback and wait for the result."""

def submitter(callback_id: str, _callback_ctx) -> None:
"""Submit the callback ID to an external system."""
# In real usage, this would send the callback_id to an external
# system (e.g., via API call, SQS message, etc.)
pass

config = WaitForCallbackConfig(
timeout=Duration.from_seconds(30),
heartbeat_timeout=Duration.from_seconds(60),
)

return ctx.wait_for_callback(
submitter, name=f"external-call-attempt-{attempt}", config=config
)

retry_config = WithRetryConfig(
retry_strategy_config=RetryStrategyConfig(
max_attempts=5,
initial_delay=Duration.from_seconds(2),
backoff_rate=1.0,
),
)

result = with_retry(
context,
func=retryable_callback_flow,
config=retry_config,
name="callback-with-retry",
)

return {
"success": True,
"result": result,
}
69 changes: 69 additions & 0 deletions examples/test/with_retry/test_with_retry_callback.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,69 @@
"""Tests for with_retry_callback example.

Demonstrates that with_retry retries the entire wait_for_callback flow
when the callback fails. The external system fails 2 times before
succeeding on the 3rd attempt.
"""

import pytest
from src.with_retry import with_retry_callback
from test.conftest import deserialize_operation_payload

from aws_durable_execution_sdk_python.execution import InvocationStatus
from aws_durable_execution_sdk_python.lambda_service import ErrorObject


@pytest.mark.example
@pytest.mark.durable_execution(
handler=with_retry_callback.handler,
lambda_function_name="With Retry Callback",
)
def test_with_retry_callback_fails_twice_then_succeeds(durable_runner):
"""Test that with_retry retries the callback flow after failures.

The external system sends callback failure 2 times, then succeeds
on the 3rd attempt. with_retry handles the failures and retries
the entire wait_for_callback block.
"""
with durable_runner:
execution_arn = durable_runner.run_async(input=None, timeout=60)

# Attempt 1: external system fails
callback_id_1 = durable_runner.wait_for_callback(
execution_arn=execution_arn,
name="external-call-attempt-1 create callback id",
)
durable_runner.send_callback_failure(
callback_id=callback_id_1,
error=ErrorObject.from_message("External system unavailable"),
)

# Attempt 2: external system fails again
callback_id_2 = durable_runner.wait_for_callback(
execution_arn=execution_arn,
name="external-call-attempt-2 create callback id",
)
durable_runner.send_callback_failure(
callback_id=callback_id_2,
error=ErrorObject.from_message("External system timeout"),
)

# Attempt 3: external system succeeds
callback_id_3 = durable_runner.wait_for_callback(
execution_arn=execution_arn,
name="external-call-attempt-3 create callback id",
)
durable_runner.send_callback_success(
callback_id=callback_id_3,
result="approval granted".encode(),
)

result = durable_runner.wait_for_result(execution_arn=execution_arn)

assert result.status is InvocationStatus.SUCCEEDED

result_data = deserialize_operation_payload(result.result)
assert result_data == {
"success": True,
"result": "approval granted",
}
6 changes: 6 additions & 0 deletions src/aws_durable_execution_sdk_python/__init__.py
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,25 @@
# Core decorator - used in every durable function
from aws_durable_execution_sdk_python.execution import durable_execution

# Retry helpers
from aws_durable_execution_sdk_python.retries import WithRetryConfig, with_retry

# Essential context types - passed to user functions
from aws_durable_execution_sdk_python.types import StepContext


__all__ = [
"BatchResult",
"DurableContext",
"DurableExecutionsError",
"InvocationError",
"StepContext",
"ValidationError",
"WithRetryConfig",
"__version__",
"durable_execution",
"durable_step",
"durable_wait_for_callback",
"durable_with_child_context",
"with_retry",
]
97 changes: 96 additions & 1 deletion src/aws_durable_execution_sdk_python/retries.py
Original file line number Diff line number Diff line change
Expand Up @@ -5,13 +5,20 @@
import math
import re
from dataclasses import dataclass, field
from typing import TYPE_CHECKING
from typing import TYPE_CHECKING, TypeVar

from aws_durable_execution_sdk_python.config import Duration, JitterStrategy
from aws_durable_execution_sdk_python.exceptions import SuspendExecution


if TYPE_CHECKING:
from collections.abc import Callable

from aws_durable_execution_sdk_python.config import ChildConfig
from aws_durable_execution_sdk_python.types import DurableContext

T = TypeVar("T")

Numeric = int | float

# Default pattern that matches all error messages
Expand Down Expand Up @@ -172,3 +179,91 @@ def critical(cls) -> Callable[[Exception, int], RetryDecision]:
jitter_strategy=JitterStrategy.NONE,
)
)


@dataclass(frozen=True)
class WithRetryConfig:
"""Configuration for with_retry.

Wraps the existing RetryStrategyConfig (same config used for step
retries) and adds execution-mode options specific to with_retry.

Attributes:
retry_strategy_config: RetryStrategyConfig controlling retry
behavior (max_attempts, initial_delay, backoff_rate, jitter,
error filtering). The same config used for step retries.
wrap_with_run_in_child_context: Whether to wrap the retry loop in
a child context for isolation. Default True.
child_context_config: Optional ChildConfig forwarded to
run_in_child_context when wrapping is enabled. Ignored when
wrap_with_run_in_child_context is False.
"""

retry_strategy_config: RetryStrategyConfig
wrap_with_run_in_child_context: bool = True
child_context_config: ChildConfig | None = None


def with_retry(
context: DurableContext,
func: Callable[[DurableContext, int], T],
config: WithRetryConfig,
name: str | None = None,
) -> T:
"""Retry a block of durable logic with configurable backoff.

Semantically a run_in_child_context with a retry policy wrapped around
it — on failure the whole function body is re-run from the beginning
with configurable backoff.

Unlike context.step() which retries a single atomic operation,
with_retry retries an entire function body that may contain multiple
durable operations (steps, waits, invokes, callbacks, etc.).

Uses the existing RetryStrategyConfig (via WithRetryConfig), so retry
configuration is consistent across the SDK.

Args:
context: The DurableContext to execute within.
func: A callable that accepts (DurableContext, attempt: int) and
returns T. The function body may contain multiple durable
operations.
config: WithRetryConfig containing a RetryStrategyConfig plus
execution-mode options.
name: Optional name for the child context and backoff waits.
When provided, backoff waits are named
"{name}-backoff-{attempt}".

Returns:
The result of func on successful execution.

Raises:
The exception from the last failed attempt when retries are
exhausted or the retry strategy returns should_retry=False.
SuspendExecution: Re-raised immediately (SDK control flow).
"""
retry_strategy = create_retry_strategy(config.retry_strategy_config)

def run_loop(ctx: DurableContext) -> T:
attempt = 0
while True:
attempt += 1
try:
return func(ctx, attempt)
except SuspendExecution:
raise # SDK control flow - never intercept
except Exception as err:
decision = retry_strategy(err, attempt)
if not decision.should_retry:
raise
wait_name = f"{name}-backoff-{attempt}" if name else None
ctx.wait(duration=decision.delay, name=wait_name)

if config.wrap_with_run_in_child_context:
return context.run_in_child_context(
run_loop,
name=name,
config=config.child_context_config,
)
else:
return run_loop(context)
Loading