Skip to content

Refactor RetryStrategy to be async#671

Open
JordonPhillips wants to merge 2 commits into
developfrom
async-retry-2
Open

Refactor RetryStrategy to be async#671
JordonPhillips wants to merge 2 commits into
developfrom
async-retry-2

Conversation

@JordonPhillips
Copy link
Copy Markdown
Contributor

This refactors RetryStrategy to be async. This is necessary because a strategy may need to use a synchronization primitive or may need to internally wait.

For example, AWS retry strategies may need to wait internally even if a retry would not be permitted if the operation is a long-poll operation.

By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.

@JordonPhillips JordonPhillips requested a review from a team as a code owner March 27, 2026 16:10
Copy link
Copy Markdown
Contributor

@ubaskota ubaskota left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Thanks for the PR. It looks good overall. I have some clarifying questions inline where I could add them, plus two more below that didn't fit in any specific part:



  • The PR description mentions long-poll operations as motivation, and notes that a strategy "may need to wait internally." The current implementations only do computation (backoff math, quota checks) and no I/O operations. What does "waiting internally" mean concretely here?

  • Should designs/retries.md file be updated with the change? Adding it here since GitHub doesn't let me comment on unchanged files.


@lru_cache
def _create_retry_strategy(
self, retry_mode: RetryStrategyType, max_attempts: int | None
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

_create_retry_strategy is synchronous with @lru_cache. Is this intentional? It's just creating and returning retry strategy today, but do you see a case where this would need to be async?

Copy link
Copy Markdown
Contributor Author

@JordonPhillips JordonPhillips May 28, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Anything is possible I suppose. This is an internal method, so it could be changed if needed. The calling method is already async.

"""
self._max_capacity = initial_capacity
self._available_capacity = initial_capacity
self._lock = threading.Lock()
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is leaving threading.Lock intentional? If so, do you think it's worth adding a short comment explaining the choice? And do you see a scenario where we may need to make this async?

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This is existing code. At one point I had a thing where I completely rewrote all of this but it got too unwieldy and was missing the forest for the trees.

This is a thread lock to make sure it's thread safe. Since this is the kind of thing you might want to share across threads, that makes sense, even in an async context. You do stop the world, but in this case you aren't doing so for long.

An async lock actually would not make any sense in a token bucket class like this. In a threading world, you have no idea who's touching your memory and when. In an async world, you know exactly when you're potentially passing priority to another coroutine. None of the methods in this would actually pass priority if converted to async.

That could change if you add a wait-for-capacity mode, however. For example:

class AsyncTokenBucket:
    def __init__(self, capacity: int = 500) -> None:
        self._capacity = capacity
        self._max_capacity = capacity
        self._capacity_condition = asyncio.Condition()

    async def drain(self, amount: int, wait: bool = False) -> bool:
        assert amount > 0

        # Immediately check the capacity without passing priority. If there's enough,
        # go ahead and drain it.
        if self._capacity >= amount:
            self._capacity -= amount
            return True

        # If there's not enough capacity, wait for there to be capacity if waiting is
        # enabled.
        if wait:
            async with self._capacity_condition:
                # Wait until there's enough capacity to drain the requested amount.
                await self._capacity_condition.wait_for(
                    lambda: self._capacity >= amount
                )
                self._capacity -= amount
                return True

        return False

    async def fill(self, amount: int) -> None:
        assert amount > 0
        self._capacity = max(self._max_capacity, self._capacity + amount)

        # Notify any waiting coroutines that there's more capacity.
        async with self._capacity_condition:
            self._capacity_condition.notify_all()

This class IS NOT thread safe. But it does showcase an instance where an async lock makes sense. When there's not enough capacity in the bucket, priority is passed to the next coroutine. The waiting coroutine won't become active again until notify_all is checked. When it becomes active, it'll check to see if the condition is met. If it is, it'll drain its capacity and return. If it isn't, it'll become inactive again as it waits for another notify.

Note the start of the drain method in particular. It immediately checks and drains the capacity if there is enough. This is safe because it's async and we know that no other coroutine is running. In an equivalent threading method, the capacity could have changed between the time you check it and the time you drain it, thus requiring a lock.

Now if you remove the waiting capability, then the condition can go away and you no longer need async methods. It still won't be thread safe unless thread locks are added.

@JordonPhillips
Copy link
Copy Markdown
Contributor Author

JordonPhillips commented May 28, 2026

The PR description mentions long-poll operations as motivation, and notes that a strategy "may need to wait internally." The current implementations only do computation (backoff math, quota checks) and no I/O operations. What does "waiting internally" mean concretely here?

The proposed changes to the AWS standard retry mode require a delay even if the token bucket is empty. That is, you delay and then return the empty bucket error.

Should designs/retries.md file be updated with the change? Adding it here since GitHub doesn't let me comment on unchanged files.

good point

This refactors retry strategies to be async. This is needed for
when strategies need to internally wait or if they need to
protect access to a shared resource.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants