Refactor RetryStrategy to be async#671
Conversation
ubaskota
left a comment
There was a problem hiding this comment.
Thanks for the PR. It looks good overall. I have some clarifying questions inline where I could add them, plus two more below that didn't fit in any specific part:
-
The PR description mentions long-poll operations as motivation, and notes that a strategy "may need to wait internally." The current implementations only do computation (backoff math, quota checks) and no I/O operations. What does "waiting internally" mean concretely here?
-
Should
designs/retries.mdfile be updated with the change? Adding it here since GitHub doesn't let me comment on unchanged files.
|
|
||
| @lru_cache | ||
| def _create_retry_strategy( | ||
| self, retry_mode: RetryStrategyType, max_attempts: int | None |
There was a problem hiding this comment.
_create_retry_strategy is synchronous with @lru_cache. Is this intentional? It's just creating and returning retry strategy today, but do you see a case where this would need to be async?
There was a problem hiding this comment.
Anything is possible I suppose. This is an internal method, so it could be changed if needed. The calling method is already async.
| """ | ||
| self._max_capacity = initial_capacity | ||
| self._available_capacity = initial_capacity | ||
| self._lock = threading.Lock() |
There was a problem hiding this comment.
Is leaving threading.Lock intentional? If so, do you think it's worth adding a short comment explaining the choice? And do you see a scenario where we may need to make this async?
There was a problem hiding this comment.
This is existing code. At one point I had a thing where I completely rewrote all of this but it got too unwieldy and was missing the forest for the trees.
This is a thread lock to make sure it's thread safe. Since this is the kind of thing you might want to share across threads, that makes sense, even in an async context. You do stop the world, but in this case you aren't doing so for long.
An async lock actually would not make any sense in a token bucket class like this. In a threading world, you have no idea who's touching your memory and when. In an async world, you know exactly when you're potentially passing priority to another coroutine. None of the methods in this would actually pass priority if converted to async.
That could change if you add a wait-for-capacity mode, however. For example:
class AsyncTokenBucket:
def __init__(self, capacity: int = 500) -> None:
self._capacity = capacity
self._max_capacity = capacity
self._capacity_condition = asyncio.Condition()
async def drain(self, amount: int, wait: bool = False) -> bool:
assert amount > 0
# Immediately check the capacity without passing priority. If there's enough,
# go ahead and drain it.
if self._capacity >= amount:
self._capacity -= amount
return True
# If there's not enough capacity, wait for there to be capacity if waiting is
# enabled.
if wait:
async with self._capacity_condition:
# Wait until there's enough capacity to drain the requested amount.
await self._capacity_condition.wait_for(
lambda: self._capacity >= amount
)
self._capacity -= amount
return True
return False
async def fill(self, amount: int) -> None:
assert amount > 0
self._capacity = max(self._max_capacity, self._capacity + amount)
# Notify any waiting coroutines that there's more capacity.
async with self._capacity_condition:
self._capacity_condition.notify_all()This class IS NOT thread safe. But it does showcase an instance where an async lock makes sense. When there's not enough capacity in the bucket, priority is passed to the next coroutine. The waiting coroutine won't become active again until notify_all is checked. When it becomes active, it'll check to see if the condition is met. If it is, it'll drain its capacity and return. If it isn't, it'll become inactive again as it waits for another notify.
Note the start of the drain method in particular. It immediately checks and drains the capacity if there is enough. This is safe because it's async and we know that no other coroutine is running. In an equivalent threading method, the capacity could have changed between the time you check it and the time you drain it, thus requiring a lock.
Now if you remove the waiting capability, then the condition can go away and you no longer need async methods. It still won't be thread safe unless thread locks are added.
The proposed changes to the AWS standard retry mode require a delay even if the token bucket is empty. That is, you delay and then return the empty bucket error.
good point |
This refactors retry strategies to be async. This is needed for when strategies need to internally wait or if they need to protect access to a shared resource.
64efedf to
cc8a0bb
Compare
This refactors RetryStrategy to be async. This is necessary because a strategy may need to use a synchronization primitive or may need to internally wait.
For example, AWS retry strategies may need to wait internally even if a retry would not be permitted if the operation is a long-poll operation.
By submitting this pull request, I confirm that you can use, modify, copy, and redistribute this contribution, under the terms of your choice.