diff --git a/Cargo.lock b/Cargo.lock index bea893b3b6..fa28937016 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -4001,6 +4001,7 @@ dependencies = [ "rivet-pools", "rivet-runtime", "rivet-types", + "rivet-util", "rusqlite", "scc", "serde", diff --git a/engine/artifacts/config-schema.json b/engine/artifacts/config-schema.json index 5b7f92ffe2..6a08895e60 100644 --- a/engine/artifacts/config-schema.json +++ b/engine/artifacts/config-schema.json @@ -837,6 +837,24 @@ ], "format": "int64" }, + "actor_create_rate_limit_drip_rate_ms": { + "description": "Time to regain one actor creation token per namespace.\n\nUnit is in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "actor_create_rate_limit_requests": { + "description": "Max burst of actor creations per namespace before throttling.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "actor_retry_duration_threshold": { "description": "How long to wait after starting to attempt to reallocate before before setting actor to sleep.\n\nUnit is in milliseconds.", "type": [ @@ -968,6 +986,24 @@ "format": "uint64", "minimum": 0.0 }, + "envoy_websocket_rate_limit_drip_rate_us": { + "description": "Time to regain one inbound WebSocket message token on a single envoy connection.\n\nUnit is in microseconds. The envoy connection multiplexes every actor on a runner, so the sustained ceiling is far higher than the per-client gateway limit and needs sub-millisecond granularity to express.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "envoy_websocket_rate_limit_requests": { + "description": "Max burst of inbound WebSocket messages on a single envoy connection before throttling.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "gateway_gc_interval_ms": { "description": "GC interval for in-flight requests in milliseconds.", "type": [ @@ -1039,6 +1075,24 @@ "format": "uint64", "minimum": 0.0 }, + "gateway_websocket_rate_limit_drip_rate_ms": { + "description": "Time to regain one inbound WebSocket message token on a single connection.\n\nUnit is in milliseconds.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, + "gateway_websocket_rate_limit_requests": { + "description": "Max burst of inbound WebSocket messages on a single connection before throttling.", + "type": [ + "integer", + "null" + ], + "format": "uint64", + "minimum": 0.0 + }, "hibernating_request_eligible_threshold": { "description": "How long after last ping before considering a hibernating request disconnected.\n\nUnit is in milliseconds.", "type": [ diff --git a/engine/packages/config/src/config/pegboard.rs b/engine/packages/config/src/config/pegboard.rs index f05a98319f..f0b660401e 100644 --- a/engine/packages/config/src/config/pegboard.rs +++ b/engine/packages/config/src/config/pegboard.rs @@ -149,6 +149,14 @@ pub struct Pegboard { pub envoy_expire_scheduler_max_concurrent_expires: Option, /// Maximum pending envoys tracked by the read-path envoy expire scheduler. pub envoy_expire_scheduler_max_pending: Option, + /// Max burst of inbound WebSocket messages on a single envoy connection before throttling. + pub envoy_websocket_rate_limit_requests: Option, + /// Time to regain one inbound WebSocket message token on a single envoy connection. + /// + /// Unit is in microseconds. The envoy connection multiplexes every actor on a runner, so the + /// sustained ceiling is far higher than the per-client gateway limit and needs sub-millisecond + /// granularity to express. + pub envoy_websocket_rate_limit_drip_rate_us: Option, // === Serverless Settings === /// **Deprecated** Configure the drain period in the runner config. @@ -391,6 +399,14 @@ impl Pegboard { self.gateway_websocket_rate_limit_drip_rate_ms.unwrap_or(10) } + pub fn envoy_websocket_rate_limit_requests(&self) -> u64 { + self.envoy_websocket_rate_limit_requests.unwrap_or(16_384) + } + + pub fn envoy_websocket_rate_limit_drip_rate_us(&self) -> u64 { + self.envoy_websocket_rate_limit_drip_rate_us.unwrap_or(200) + } + pub fn actor_create_rate_limit_requests(&self) -> u64 { self.actor_create_rate_limit_requests.unwrap_or(500) } diff --git a/engine/packages/guard/src/routing/envoy.rs b/engine/packages/guard/src/routing/envoy.rs index f2f6ae6ea7..c7895068bf 100644 --- a/engine/packages/guard/src/routing/envoy.rs +++ b/engine/packages/guard/src/routing/envoy.rs @@ -93,6 +93,6 @@ async fn route_envoy_internal( tracing::debug!("authenticated envoy connection"); } - let tunnel = pegboard_envoy::PegboardEnvoyWs::new(ctx.clone()); + let tunnel = pegboard_envoy::PegboardEnvoyWs::new(&ctx); Ok(RoutingOutput::CustomServe(Arc::new(tunnel))) } diff --git a/engine/packages/pegboard-envoy/Cargo.toml b/engine/packages/pegboard-envoy/Cargo.toml index 29ad9a85be..4d7f5f54ab 100644 --- a/engine/packages/pegboard-envoy/Cargo.toml +++ b/engine/packages/pegboard-envoy/Cargo.toml @@ -32,6 +32,7 @@ depot-client.workspace = true depot-client-embedded.workspace = true rivet-runtime.workspace = true rivet-types.workspace = true +rivet-util.workspace = true scc.workspace = true serde_bare.workspace = true serde_json.workspace = true diff --git a/engine/packages/pegboard-envoy/src/lib.rs b/engine/packages/pegboard-envoy/src/lib.rs index bcb5d2c77d..99ebe04cc7 100644 --- a/engine/packages/pegboard-envoy/src/lib.rs +++ b/engine/packages/pegboard-envoy/src/lib.rs @@ -45,12 +45,8 @@ pub struct PegboardEnvoyWs { } impl PegboardEnvoyWs { - pub fn new(ctx: StandaloneCtx) -> Self { - metrics::prepopulate(); - - let service = Self { ctx: ctx.clone() }; - - service + pub fn new(ctx: &StandaloneCtx) -> Self { + Self { ctx: ctx.clone() } } } diff --git a/engine/packages/pegboard-envoy/src/metrics.rs b/engine/packages/pegboard-envoy/src/metrics.rs index d44f177081..ad8303ac40 100644 --- a/engine/packages/pegboard-envoy/src/metrics.rs +++ b/engine/packages/pegboard-envoy/src/metrics.rs @@ -371,72 +371,3 @@ pub fn set_envoy_connection_state( (None, None) => {} } } - -pub fn prepopulate() { - ENVOY_CONNECTED.with_label_values(&["", ""]).set(0); - for state in EnvoyState::ALL { - ENVOY_CONNECTIONS_BY_STATE - .with_label_values(&["", "", "", state.as_str()]) - .set(0); - } - for (state, reasons) in [ - (EnvoyState::Starting, &["websocket_accepted"][..]), - (EnvoyState::Connected, &["init_complete"][..]), - (EnvoyState::Stopping, &["envoy_reported_stopping"][..]), - ( - EnvoyState::Disconnected, - &[ - "init_failed", - "websocket_closed", - "evicted", - "going_away", - "connection_error", - ][..], - ), - (EnvoyState::Lost, &["ping_timeout"][..]), - (EnvoyState::Stopped, &["graceful_shutdown_complete"][..]), - ] { - for reason in reasons { - ENVOY_STATE_TRANSITION_TOTAL - .with_label_values(&["", "", "", state.as_str(), reason]) - .inc_by(0); - } - } - let _ = ENVOY_LIFETIME_SECONDS.with_label_values(&["", ""]); - let _ = ENVOY_PING_LAG_SECONDS.with_label_values(&["", ""]); - for result in ["ok", "no_subscribers", "error"] { - TUNNEL_PUBLISH_TOTAL - .with_label_values(&["", "", result]) - .inc_by(0); - } - TUNNEL_TASKS_ACTIVE.with_label_values(&["", ""]).set(0); - WS_RESPONSES_IN_FLIGHT.set(0); - for task_kind in ["kv", "sqlite_page", "remote_sqlite", "tunnel_message"] { - ACTOR_TASKS_ACTIVE.with_label_values(&[task_kind]).set(0); - } - for branch in ["ws_msg", "completed_task"] { - let _ = WS_TO_TUNNEL_BRANCH_DURATION.with_label_values(&[branch]); - } - for result in ["ok", "error", "timeout"] { - let _ = ACTOR_WAKE_DURATION.with_label_values(&["", "", result]); - } - let _ = SQLITE_COMMIT_ENVOY_DISPATCH_DURATION.with_label_values(&["", ""]); - let _ = SQLITE_COMMIT_ENVOY_RESPONSE_DURATION.with_label_values(&["", ""]); - for request_type in ["get_pages", "commit", "exec", "execute"] { - for result in ["ok", "error"] { - SQLITE_REQUEST_TOTAL - .with_label_values(&["", "", request_type, result]) - .inc_by(0); - let _ = SQLITE_REQUEST_DURATION.with_label_values(&["", "", request_type, result]); - } - for direction in ["request", "response"] { - let _ = SQLITE_REQUEST_PAGES.with_label_values(&["", "", request_type, direction]); - } - let _ = SQLITE_REQUEST_DIRTY_PAGES.with_label_values(&["", "", request_type]); - for direction in ["request", "response"] { - SQLITE_REQUEST_PAYLOAD_BYTES - .with_label_values(&["", "", request_type, direction]) - .inc_by(0); - } - } -} diff --git a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs index 1e3bedc558..746bea158d 100644 --- a/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs @@ -45,6 +45,13 @@ use crate::{ const MAX_REMOTE_SQL_BIND_BYTES: usize = 128 * 1024; +/// Max number of pages a single `get_pages` request may ask for. Each requested page number is ~4 +/// bytes on the wire but forces the engine to fetch and materialize up to a full 4 KiB page inside +/// one UDB transaction, so an uncapped list is a large cost-asymmetry amplifier from an untrusted +/// runner. Sized well above observed production read batches (max ~1024 pages); the commit path has +/// its own lower cap (`MAX_COMMIT_DIRTY_PAGES`) since writes batch smaller than reads. +const MAX_GET_PAGES_PER_REQUEST: usize = 8192; + /// Wall-clock threshold above which a single handle_message invocation is logged as a head-of-line /// blocking risk. The ws_to_tunnel_task loop is strictly serial per envoy, so any handler that /// spends longer than this delays every subsequent WS message from the same envoy (including @@ -461,9 +468,25 @@ pub async fn task_inner( let mut term_signal = rivet_runtime::TermSignal::get(); let mut task_manager = TaskManager::new(ctx.clone(), conn.clone()); + // Leaky bucket rate limit on consuming envoy ws messages. The envoy connection multiplexes + // every actor on a runner, so this bounds the rate at which a single untrusted runner can drive + // engine work (task spawns, KV/SQLite ops). Reads are paused while empty, applying TCP + // backpressure to the runner rather than dropping protocol messages. + let mut rate_limit = rivet_util::throttle::RateLimiter::new( + rivet_util::throttle::RateLimitMethod::LeakyBucket { + requests: ctx.config().pegboard().envoy_websocket_rate_limit_requests(), + drip_rate: Duration::from_micros( + ctx.config().pegboard().envoy_websocket_rate_limit_drip_rate_us(), + ), + }, + ); + loop { tokio::select! { - recv = recv_msg(&mut ws_rx, &mut ws_to_tunnel_abort_rx, &mut term_signal) => { + recv = async { + rate_limit.acquire().await; + recv_msg(&mut ws_rx, &mut ws_to_tunnel_abort_rx, &mut term_signal).await + } => { let branch_start = Instant::now(); let branch_result: Result> = async { match recv? { @@ -1413,6 +1436,16 @@ async fn handle_sqlite_get_pages( conn: &Conn, request: protocol::SqliteGetPagesRequest, ) -> Result { + if request.pgnos.len() > MAX_GET_PAGES_PER_REQUEST { + return Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse( + sqlite_protocol_error_response(&format!( + "sqlite get_pages requested {} pages, exceeding limit {}", + request.pgnos.len(), + MAX_GET_PAGES_PER_REQUEST, + )), + )); + } + validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation) .await?; diff --git a/engine/packages/pegboard-gateway2/src/metrics.rs b/engine/packages/pegboard-gateway2/src/metrics.rs index e9c7a4acfe..244df7f882 100644 --- a/engine/packages/pegboard-gateway2/src/metrics.rs +++ b/engine/packages/pegboard-gateway2/src/metrics.rs @@ -66,42 +66,3 @@ lazy_static::lazy_static! { *REGISTRY ).unwrap(); } - -pub fn prepopulate() { - const RESULTS: &[&str] = &[ - "success", - "client_disconnect", - "actor_ready_timeout", - "request_timeout", - "envoy_error", - ]; - - for protocol in ["http", "websocket"] { - IN_FLIGHT.with_label_values(&["", "", protocol]).set(0); - IN_FLIGHT_DROPPED_TOTAL - .with_label_values(&["", "", protocol, "client_disconnect"]) - .inc_by(0); - TUNNEL_PING_DURATION.with_label_values(&["", "", protocol]); - LAST_PONG_AGE_SECONDS.with_label_values(&["", "", protocol]); - REQUEST_RETRIES_TOTAL.with_label_values(&["", "", protocol, "1"]); - - for result in RESULTS { - REQUEST_DURATION_SECONDS.with_label_values(&["", "", protocol, result]); - } - - for reason in [ - "server_close", - "client_close", - "abort", - "gc_timeout", - "shutdown", - ] { - CLOSE_SENT_TOTAL - .with_label_values(&["", "", protocol, reason]) - .inc_by(0); - } - } - for result in ["ok", "error", "timeout"] { - WEBSOCKET_OPEN_WAIT_SECONDS.with_label_values(&["", "", result]); - } -} diff --git a/engine/packages/pegboard-gateway2/src/shared_state.rs b/engine/packages/pegboard-gateway2/src/shared_state.rs index 67ad84a7b8..b3de6e5887 100644 --- a/engine/packages/pegboard-gateway2/src/shared_state.rs +++ b/engine/packages/pegboard-gateway2/src/shared_state.rs @@ -160,7 +160,6 @@ pub struct SharedState(Arc); impl SharedState { pub fn new(config: &rivet_config::Config, ups: PubSub) -> Self { - metrics::prepopulate(); init_slow_ping_threshold_from_env(); let gateway_id = protocol::util::generate_gateway_id(); diff --git a/engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs b/engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs index 66ac4836d3..27dec6914c 100644 --- a/engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs +++ b/engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs @@ -25,21 +25,26 @@ pub async fn task( let mut ws_rx = ws_rx.lock().await; // Leaky bucket rate limit on consuming ws messages - let pegboard_config = ctx.config().pegboard(); let mut rate_limit = rivet_util::throttle::RateLimiter::new( rivet_util::throttle::RateLimitMethod::LeakyBucket { - requests: pegboard_config.gateway_websocket_rate_limit_requests(), + requests: ctx + .config() + .pegboard() + .gateway_websocket_rate_limit_requests(), drip_rate: Duration::from_millis( - pegboard_config.gateway_websocket_rate_limit_drip_rate_ms(), + ctx.config() + .pegboard() + .gateway_websocket_rate_limit_drip_rate_ms(), ), }, ); loop { - rate_limit.acquire().await; - tokio::select! { - res = ws_rx.try_next() => { + res = async { + rate_limit.acquire().await; + ws_rx.try_next().await + } => { if let Some(msg) = res? { ingress_bytes.fetch_add(msg.len() as u64, Ordering::AcqRel); diff --git a/engine/packages/pegboard-outbound/src/lib.rs b/engine/packages/pegboard-outbound/src/lib.rs index 5d3f3a325d..00697ea7b0 100644 --- a/engine/packages/pegboard-outbound/src/lib.rs +++ b/engine/packages/pegboard-outbound/src/lib.rs @@ -25,8 +25,6 @@ const SSE_OPEN_WARN_THRESHOLD: Duration = Duration::from_secs(5); #[tracing::instrument(skip_all)] pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> { - metrics::prepopulate(); - let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?; let ctx = StandaloneCtx::new( db::DatabaseKv::new(config.clone(), pools.clone()).await?, diff --git a/engine/packages/pegboard-outbound/src/metrics.rs b/engine/packages/pegboard-outbound/src/metrics.rs index 3f74c822e7..a61cd4fefc 100644 --- a/engine/packages/pegboard-outbound/src/metrics.rs +++ b/engine/packages/pegboard-outbound/src/metrics.rs @@ -42,50 +42,3 @@ lazy_static::lazy_static! { *REGISTRY ).unwrap(); } - -pub fn prepopulate() { - const ERRORS: &[&str] = &[ - "http_error", - "connection_error", - "stream_ended_early", - "invalid_payload", - "downgrade", - "internal", - ]; - const STATUSES: &[&str] = &["429", "503", "5xx", "4xx", "2xx", "other", ""]; - const RESULTS: &[&str] = &[ - "success", - "error_http_429", - "error_http_503", - "error_http_5xx", - "error_http_4xx", - "error_http_other", - "error_connection", - "error_stream_ended", - "error_invalid_payload", - "error_downgrade", - "error_internal", - ]; - const DRAIN_REASONS: &[&str] = &[ - "lifespan_reached", - "going_away", - "actor_lost", - "connection_lost", - "term_signal", - "", - ]; - - for error in ERRORS { - for status in STATUSES { - REQ_ERROR_TOTAL - .with_label_values(&["", "", error, status]) - .inc_by(0); - } - } - - for result in RESULTS { - for drain_reason in DRAIN_REASONS { - REQ_DURATION_SECONDS.with_label_values(&["", "", result, drain_reason]); - } - } -} diff --git a/engine/packages/pegboard/src/ops/actor/create.rs b/engine/packages/pegboard/src/ops/actor/create.rs index a0cb51bca0..b2695c16d9 100644 --- a/engine/packages/pegboard/src/ops/actor/create.rs +++ b/engine/packages/pegboard/src/ops/actor/create.rs @@ -37,7 +37,7 @@ pub struct Output { #[operation] pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result { - let rate_limiter = RATE_LIMITERS + let rate_limit = RATE_LIMITERS .get_or_init(|| { Cache::builder() .max_capacity(10_000) @@ -59,7 +59,7 @@ pub async fn pegboard_actor_create(ctx: &OperationCtx, input: &Input) -> Result< .await; // Limit actor creation per namespace id - if !rate_limiter.value().lock().await.try_acquire() { + if !rate_limit.value().lock().await.try_acquire() { return Err(crate::errors::Actor::CreationRateLimit.build()); }