Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

5 changes: 5 additions & 0 deletions engine/artifacts/errors/actor.creation_rate_limit.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

30 changes: 30 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -117,6 +117,12 @@ pub struct Pegboard {
pub gateway_hws_max_pending_size: Option<u64>,
/// Max HTTP request body size in bytes for requests to actors.
pub gateway_http_max_request_body_size: Option<usize>,
/// Max burst of inbound WebSocket messages on a single connection before throttling.
pub gateway_websocket_rate_limit_requests: Option<u64>,
/// Time to regain one inbound WebSocket message token on a single connection.
///
/// Unit is in milliseconds.
pub gateway_websocket_rate_limit_drip_rate_ms: Option<u64>,

// === Envoy Settings ===
/// How long to wait before considering an envoy lost and evicting all of its actors.
Expand Down Expand Up @@ -162,6 +168,14 @@ pub struct Pegboard {
///
/// Unit is in bytes. Default: 1,048,576 (1 MiB).
pub preload_max_total_bytes: Option<u64>,

// === Rate Limiting ===
/// Max burst of actor creations per namespace before throttling.
pub actor_create_rate_limit_requests: Option<u64>,
/// Time to regain one actor creation token per namespace.
///
/// Unit is in milliseconds.
pub actor_create_rate_limit_drip_rate_ms: Option<u64>,
}

impl Pegboard {
Expand Down Expand Up @@ -369,6 +383,22 @@ impl Pegboard {
self.serverless_drain_grace_period.unwrap_or(10_000)
}

pub fn gateway_websocket_rate_limit_requests(&self) -> u64 {
self.gateway_websocket_rate_limit_requests.unwrap_or(2_000)
}

pub fn gateway_websocket_rate_limit_drip_rate_ms(&self) -> u64 {
self.gateway_websocket_rate_limit_drip_rate_ms.unwrap_or(10)
}

pub fn actor_create_rate_limit_requests(&self) -> u64 {
self.actor_create_rate_limit_requests.unwrap_or(500)
}

pub fn actor_create_rate_limit_drip_rate_ms(&self) -> u64 {
self.actor_create_rate_limit_drip_rate_ms.unwrap_or(10)
}

pub fn preload_max_total_bytes(&self) -> u64 {
self.preload_max_total_bytes.unwrap_or(1_048_576)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/ctx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -139,7 +139,7 @@ impl MessageCtx {
M: Message,
{
// Infinite backoff since we want to wait until the service reboots.
let mut backoff = rivet_util::backoff::Backoff::default_infinite();
let mut backoff = rivet_util::throttle::Backoff::default_infinite();
loop {
// Ignore for infinite backoff
backoff.tick().await;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/error.rs
Original file line number Diff line number Diff line change
Expand Up @@ -193,7 +193,7 @@ impl WorkflowError {
| WorkflowError::ActivityTimeout(_, error_count)
| WorkflowError::OperationTimeout(_, error_count) => {
// NOTE: Max retry is handled in `WorkflowCtx::activity`
let mut backoff = rivet_util::backoff::Backoff::new_at(
let mut backoff = rivet_util::throttle::Backoff::new_at(
8,
None,
RETRY_TIMEOUT_MS,
Expand Down
16 changes: 9 additions & 7 deletions engine/packages/guard-core/src/proxy_service.rs
Original file line number Diff line number Diff line change
Expand Up @@ -32,7 +32,7 @@ use crate::RouteTarget;
use crate::request_context::RequestContext;
use crate::response_body::ResponseBody;
use crate::route::{CacheKeyFn, ResolveRouteOutput, RouteCache, RoutingFn, RoutingOutput};
use crate::utils::{InFlightCounter, RateLimiter};
use crate::utils::InFlightCounter;
use crate::{
WebSocketHandle, custom_serve::HibernationResult, errors, metrics, task_group::TaskGroup, utils,
};
Expand All @@ -56,7 +56,7 @@ pub struct ProxyState {
>,
route_cache: RouteCache,
// We use moka::Cache instead of scc::HashMap because it automatically handles TTL and capacity
rate_limiters: Cache<std::net::IpAddr, Arc<Mutex<RateLimiter>>>,
rate_limiters: Cache<std::net::IpAddr, Arc<Mutex<rivet_util::throttle::RateLimiter>>>,
in_flight_counters: Cache<std::net::IpAddr, Arc<Mutex<InFlightCounter>>>,
in_flight_requests: Cache<protocol::RequestId, ()>,

Expand Down Expand Up @@ -98,11 +98,11 @@ impl ProxyState {
route_cache: RouteCache::new(route_cache_ttl),
rate_limiters: Cache::builder()
.max_capacity(10_000)
.time_to_live(PROXY_STATE_CACHE_TTL)
.time_to_idle(PROXY_STATE_CACHE_TTL)
.build(),
in_flight_counters: Cache::builder()
.max_capacity(10_000)
.time_to_live(PROXY_STATE_CACHE_TTL)
.time_to_idle(PROXY_STATE_CACHE_TTL)
.build(),
in_flight_requests: Cache::builder().max_capacity(10_000_000).build(),
tasks: TaskGroup::new(),
Expand Down Expand Up @@ -217,9 +217,11 @@ impl ProxyState {
if let Some(existing_limiter) = self.rate_limiters.get(&req_ctx.client_ip).await {
existing_limiter
} else {
let new_limiter = Arc::new(Mutex::new(RateLimiter::new(
req_ctx.rate_limit.requests,
req_ctx.rate_limit.period,
let new_limiter = Arc::new(Mutex::new(rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::FixedWindow {
requests: req_ctx.rate_limit.requests,
period: Duration::from_secs(req_ctx.rate_limit.period),
},
)));
self.rate_limiters
.insert(req_ctx.client_ip, new_limiter.clone())
Expand Down
44 changes: 4 additions & 40 deletions engine/packages/guard-core/src/utils.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,7 @@ use hyper::header::HeaderName;
use rivet_api_builder::{ErrorResponse, RawErrorResponse};
use rivet_error::{INTERNAL_ERROR, RivetError};
use rivet_util::Id;
use std::time::{Duration, Instant};
use std::time::Duration;
use tokio_tungstenite::tungstenite::protocol::{CloseFrame, frame::coding::CloseCode};
use url::Url;

Expand All @@ -19,7 +19,7 @@ const X_RIVET_TARGET: HeaderName = HeaderName::from_static("x-rivet-target");
const X_RIVET_ACTOR: HeaderName = HeaderName::from_static("x-rivet-actor");
const X_RIVET_TOKEN: HeaderName = HeaderName::from_static("x-rivet-token");

// In-flight requests counter
// In-flight requests counter (semaphore)
pub(crate) struct InFlightCounter {
count: usize,
max: usize,
Expand All @@ -44,43 +44,6 @@ impl InFlightCounter {
}
}

// Rate limiter
pub(crate) struct RateLimiter {
requests_remaining: u64,
reset_time: Instant,
requests_limit: u64,
period: Duration,
}

impl RateLimiter {
pub(crate) fn new(requests: u64, period_seconds: u64) -> Self {
Self {
requests_remaining: requests,
reset_time: Instant::now() + Duration::from_secs(period_seconds),
requests_limit: requests,
period: Duration::from_secs(period_seconds),
}
}

pub(crate) fn try_acquire(&mut self) -> bool {
let now = Instant::now();

// Check if we need to reset the counter
if now >= self.reset_time {
self.requests_remaining = self.requests_limit;
self.reset_time = now + self.period;
}

// Try to consume a request
if self.requests_remaining > 0 {
self.requests_remaining -= 1;
true
} else {
false
}
}
}

// Calculate backoff duration for a given retry attempt
pub(crate) fn calculate_backoff(attempt: u32, initial_interval: u64) -> Duration {
Duration::from_millis(initial_interval * 2u64.pow(attempt - 1))
Expand Down Expand Up @@ -177,7 +140,6 @@ pub(crate) fn err_into_response(err: anyhow::Error) -> Result<Response<ResponseB
("guard", "routing_error") => StatusCode::BAD_GATEWAY,
("guard", "request_timeout") => StatusCode::GATEWAY_TIMEOUT,
("guard", "retry_attempts_exceeded") => StatusCode::BAD_GATEWAY,
("actor", "not_found") => StatusCode::NOT_FOUND,
("guard", "service_unavailable") => StatusCode::SERVICE_UNAVAILABLE,
("guard", "actor_stopped_while_waiting") => StatusCode::SERVICE_UNAVAILABLE,
("guard", "tunnel_request_aborted") => StatusCode::SERVICE_UNAVAILABLE,
Expand All @@ -188,6 +150,8 @@ pub(crate) fn err_into_response(err: anyhow::Error) -> Result<Response<ResponseB
("guard", "no_route") => StatusCode::NOT_FOUND,
("guard", "invalid_request_body") => StatusCode::PAYLOAD_TOO_LARGE,
("guard", "invalid_response_body") => StatusCode::BAD_GATEWAY,
("actor", "creation_rate_limit") => StatusCode::TOO_MANY_REQUESTS,
("actor", "not_found") => StatusCode::NOT_FOUND,
_ => StatusCode::BAD_REQUEST,
};

Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard-gateway2/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -674,6 +674,7 @@ impl PegboardGateway2 {
);
let ws_to_tunnel = tokio::spawn(
ws_to_tunnel_task::task(
ctx.clone(),
in_flight_req.clone(),
ws_rx,
ingress_bytes.clone(),
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -661,7 +661,7 @@ impl InFlightRequestHandle {
// Cap retries so a permanently-gone receiver fails fast instead of pinning the
// request forever. Worst-case backoff total is ~19s, which stays under the default
// tunnel ping timeout (30s) so the ping path can take over if the receiver is truly lost.
let mut backoff = rivet_util::backoff::Backoff::new(6, Some(8), 100, 5);
let mut backoff = rivet_util::throttle::Backoff::new(6, Some(8), 100, 5);
let first_attempt_at = Instant::now();
let mut attempt = 0;
loop {
Expand Down
16 changes: 16 additions & 0 deletions engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
@@ -1,11 +1,13 @@
use anyhow::Result;
use futures_util::TryStreamExt;
use gas::prelude::*;
use rivet_envoy_protocol as protocol;
use rivet_guard_core::websocket_handle::WebSocketReceiver;
use std::sync::{
Arc,
atomic::{AtomicU64, Ordering},
};
use std::time::Duration;
use tokio::sync::{Mutex, watch};
use tokio_tungstenite::tungstenite::Message;

Expand All @@ -14,14 +16,28 @@ use crate::shared_state::{InFlightRequestHandle, display_id};

#[tracing::instrument(name = "ws_to_tunnel_task", skip_all)]
pub async fn task(
ctx: StandaloneCtx,
in_flight_req: InFlightRequestHandle,
ws_rx: Arc<Mutex<WebSocketReceiver>>,
ingress_bytes: Arc<AtomicU64>,
mut ws_to_tunnel_abort_rx: watch::Receiver<()>,
) -> Result<LifecycleResult> {
let mut ws_rx = ws_rx.lock().await;

// Leaky bucket rate limit on consuming ws messages
let pegboard_config = ctx.config().pegboard();
let mut rate_limit = rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::LeakyBucket {
requests: pegboard_config.gateway_websocket_rate_limit_requests(),
drip_rate: Duration::from_millis(
pegboard_config.gateway_websocket_rate_limit_drip_rate_ms(),
),
},
);

loop {
rate_limit.acquire().await;

tokio::select! {
res = ws_rx.try_next() => {
if let Some(msg) = res? {
Expand Down
1 change: 1 addition & 0 deletions engine/packages/pegboard/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ foundationdb-tuple.workspace = true
futures-util.workspace = true
gas.workspace = true
lazy_static.workspace = true
moka.workspace = true
namespace.workspace = true
nix.workspace = true
rand.workspace = true
Expand Down
6 changes: 6 additions & 0 deletions engine/packages/pegboard/src/errors.rs
Original file line number Diff line number Diff line change
Expand Up @@ -13,6 +13,12 @@ pub enum Actor {
#[error("namespace_not_found", "The namespace does not exist.")]
NamespaceNotFound,

#[error(
"creation_rate_limit",
"Too many actors created at once. Try again later."
)]
CreationRateLimit,

#[error(
"input_too_large",
"Actor input too large.",
Expand Down
34 changes: 34 additions & 0 deletions engine/packages/pegboard/src/ops/actor/create.rs
Original file line number Diff line number Diff line change
@@ -1,7 +1,15 @@
use anyhow::{Context, Result};
use gas::prelude::*;
use moka::future::Cache;
use rivet_api_util::{Method, request_remote_datacenter};
use rivet_types::actors::{Actor, CrashPolicy};
use std::sync::{Arc, OnceLock};
use std::time::Duration;
use tokio::sync::Mutex;

const RATE_LIMITER_CACHE_TTL: Duration = Duration::from_secs(60 * 60);
static RATE_LIMITERS: OnceLock<Cache<Id, Arc<Mutex<rivet_util::throttle::RateLimiter>>>> =
OnceLock::new();

#[derive(Debug)]
pub struct Input {
Expand Down Expand Up @@ -29,6 +37,32 @@ pub struct Output {

#[operation]
pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result<Output> {
let rate_limiter = RATE_LIMITERS
.get_or_init(|| {
Cache::builder()
.max_capacity(10_000)
.time_to_idle(RATE_LIMITER_CACHE_TTL)
.build()
})
.entry(input.namespace_id)
.or_insert_with(async {
let pegboard_config = ctx.config().pegboard();
Arc::new(Mutex::new(rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::LeakyBucket {
requests: pegboard_config.actor_create_rate_limit_requests(),
drip_rate: Duration::from_millis(
pegboard_config.actor_create_rate_limit_drip_rate_ms(),
),
},
)))
})
.await;

// Limit actor creation per namespace id
if !rate_limiter.value().lock().await.try_acquire() {
return Err(crate::errors::Actor::CreationRateLimit.build());
}

// Set up subscriptions before dispatching workflow
let (
mut create_sub,
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/pegboard/src/workflows/actor/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -1307,8 +1307,8 @@ fn reschedule_backoff(
retry_count: usize,
base_retry_timeout: usize,
max_exponent: usize,
) -> util::backoff::Backoff {
util::backoff::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count)
) -> util::throttle::Backoff {
util::throttle::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count)
}

#[derive(Debug, Serialize, Deserialize)]
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/pegboard/src/workflows/actor2/runtime.rs
Original file line number Diff line number Diff line change
Expand Up @@ -830,7 +830,7 @@ async fn compare_retry(
if reset {
state.reschedule_ts = None;
} else {
let backoff = util::backoff::Backoff::new_at(
let backoff = util::throttle::Backoff::new_at(
ctx.config().pegboard().reschedule_backoff_max_exponent(),
None,
ctx.config().pegboard().base_retry_timeout(),
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@ async fn poll_metadata(ctx: &ActivityCtx, input: &PollMetadataInput) -> Result<P
let base_poll_interval = metadata_poll_interval
.unwrap_or(default_poll_interval)
.max(min_poll_interval);
let backoff = util::backoff::Backoff::new_at(
let backoff = util::throttle::Backoff::new_at(
8,
None,
base_poll_interval as usize,
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/pegboard/src/workflows/serverless/conn.rs
Original file line number Diff line number Diff line change
Expand Up @@ -635,8 +635,8 @@ fn reconnect_backoff(
retry_count: usize,
base_retry_timeout: usize,
max_exponent: usize,
) -> util::backoff::Backoff {
util::backoff::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count)
) -> util::throttle::Backoff {
util::throttle::Backoff::new_at(max_exponent, None, base_retry_timeout, 500, retry_count)
}

/// Report an error to the error tracker workflow.
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/universalpubsub/src/driver/postgres/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,7 +5,7 @@ use base64::engine::general_purpose::STANDARD_NO_PAD as BASE64;
use deadpool_postgres::{Config, ManagerConfig, Pool, PoolConfig, RecyclingMethod, Runtime};
use futures_util::future::poll_fn;
use rivet_postgres_util::build_tls_config;
use rivet_util::backoff::Backoff;
use rivet_util::throttle::Backoff;
use scc::HashMap;
use std::hash::{DefaultHasher, Hash, Hasher};
use std::path::PathBuf;
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/universalpubsub/src/pubsub.rs
Original file line number Diff line number Diff line change
Expand Up @@ -8,7 +8,7 @@ use scc::HashMap;
use tokio::sync::broadcast;
use uuid::Uuid;

use rivet_util::backoff::Backoff;
use rivet_util::throttle::Backoff;

use crate::chunking::{ChunkTracker, FastPath, encode_chunk, split_payload_into_chunks};
use crate::driver::{PubSubDriverHandle, PublishOpts, SubscriberDriverHandle};
Expand Down
Loading
Loading