Skip to content

[SLOP(claude-opus-4-8)] feat(util): add rate limiter primitive and ingress throttles for actor create and gateway websocket#5326

Open
MasterPtato wants to merge 1 commit into
stack/slop-claude-opus-4-8-feat-rivetkit-core-add-serde-duration-and-size-metrics-nqkrytvrfrom
stack/slop-claude-opus-4-8-feat-util-add-rate-limiter-primitive-and-ingress-throttles-for-actor-create-and-gateway-websocket-pnyswwtm
Open

[SLOP(claude-opus-4-8)] feat(util): add rate limiter primitive and ingress throttles for actor create and gateway websocket#5326
MasterPtato wants to merge 1 commit into
stack/slop-claude-opus-4-8-feat-rivetkit-core-add-serde-duration-and-size-metrics-nqkrytvrfrom
stack/slop-claude-opus-4-8-feat-util-add-rate-limiter-primitive-and-ingress-throttles-for-actor-create-and-gateway-websocket-pnyswwtm

Conversation

@MasterPtato

Copy link
Copy Markdown
Contributor

No description provided.

…gress throttles for actor create and gateway websocket
@MasterPtato

MasterPtato commented Jun 24, 2026

Copy link
Copy Markdown
Contributor Author

@claude

claude Bot commented Jun 24, 2026

Copy link
Copy Markdown
Contributor

Code Review

This PR adds a throttle module to rivet_util that unifies the existing Backoff type with a new RateLimiter supporting fixed-window and leaky-bucket algorithms, then wires up rate limiting for actor creation (per namespace) and WebSocket message ingestion (per connection).

The approach is sound and the test suite is well-structured with start_paused = true virtual-time tests. However, four issues survived verification:


1. FixedWindow::acquire() sleeps for the full period instead of the remaining window time

engine/packages/util/src/throttle.rs:242

} else {
    tokio::time::sleep(*period).await;   // ← wrong
    *requests_remaining = *requests_limit;
    *reset_time = Instant::now() + *period;
}

When tokens are exhausted mid-window the caller should sleep until reset_time, not for period more milliseconds. If the window is 100 ms and tokens are consumed at T=90 ms, the caller waits 100 ms (until T=190 ms) instead of 10 ms (until T=100 ms). The existing test fixed_window_acquire_blocks_until_window_resets does not catch this because it exhausts the window at T=0, making both values accidentally equal.

Fix: tokio::time::sleep_until(*reset_time).await;


2. FixedWindow::acquire() doesn't decrement after sleeping

engine/packages/util/src/throttle.rs:244

After the sleep the code resets to a full window without consuming the token for the caller:

*requests_remaining = *requests_limit;
*reset_time = Instant::now() + *period;
// ← missing: *requests_remaining -= 1;

A window configured for 2 requests will allow 3 — the one that slept plus 2 more — before sleeping again. The test never makes a fourth call, so it misses this.


3. LeakyBucket::try_acquire() unconditionally advances last_acquire, starving the bucket under rejection load

engine/packages/util/src/throttle.rs:199

let dt = now - *last_acquire;
*last_acquire = now;     // ← updated even on failed acquire

When the bucket is empty, a flood of failing try_acquire() calls each reset last_acquire to "now", so dt for the next call is nearly 0 and almost no credit drips in. A namespace that is being hammered can exhaust the burst and then never recover tokens even after the flood stops — the sustained drip rate is only meaningful if time actually passes between calls.

This affects actor-creation rate limiting directly (create.rs:387 calls try_acquire()).

Fix: only update last_acquire (and accumulate drip credit) when a token is successfully consumed, or separate the "compute drip" step from the "consume token" step so failed calls still accumulate fractional credit without preventing future recovery.


4. ws_to_tunnel_task: acquire() outside select! blocks shutdown signals

engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs:39

loop {
    rate_limit.acquire().await;   // ← can sleep up to 10 ms; abort signal unreachable here

    tokio::select! {
        res = ws_rx.try_next() => { ... }
        _ = ws_to_tunnel_abort_rx.changed() => { ... }
    }
}

acquire() calls tokio::time::sleep() directly. The task's abort/shutdown watch channel is only polled inside the select!, so a shutdown signal cannot preempt an in-progress sleep. With the default drip rate of 10 ms this is a bounded delay, but it means graceful shutdown is not truly prompt under rate-limited conditions.

Consider placing rate_limit.acquire() inside the select! alongside the abort arm, or using a select! that can interrupt the sleep directly.

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant