Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

54 changes: 54 additions & 0 deletions engine/artifacts/config-schema.json

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

16 changes: 16 additions & 0 deletions engine/packages/config/src/config/pegboard.rs
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,14 @@ pub struct Pegboard {
pub envoy_expire_scheduler_max_concurrent_expires: Option<usize>,
/// Maximum pending envoys tracked by the read-path envoy expire scheduler.
pub envoy_expire_scheduler_max_pending: Option<usize>,
/// Max burst of inbound WebSocket messages on a single envoy connection before throttling.
pub envoy_websocket_rate_limit_requests: Option<u64>,
/// Time to regain one inbound WebSocket message token on a single envoy connection.
///
/// Unit is in microseconds. The envoy connection multiplexes every actor on a runner, so the
/// sustained ceiling is far higher than the per-client gateway limit and needs sub-millisecond
/// granularity to express.
pub envoy_websocket_rate_limit_drip_rate_us: Option<u64>,

// === Serverless Settings ===
/// **Deprecated** Configure the drain period in the runner config.
Expand Down Expand Up @@ -391,6 +399,14 @@ impl Pegboard {
self.gateway_websocket_rate_limit_drip_rate_ms.unwrap_or(10)
}

pub fn envoy_websocket_rate_limit_requests(&self) -> u64 {
self.envoy_websocket_rate_limit_requests.unwrap_or(16_384)
}

pub fn envoy_websocket_rate_limit_drip_rate_us(&self) -> u64 {
self.envoy_websocket_rate_limit_drip_rate_us.unwrap_or(200)
}

pub fn actor_create_rate_limit_requests(&self) -> u64 {
self.actor_create_rate_limit_requests.unwrap_or(500)
}
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/guard/src/routing/envoy.rs
Original file line number Diff line number Diff line change
Expand Up @@ -93,6 +93,6 @@ async fn route_envoy_internal(
tracing::debug!("authenticated envoy connection");
}

let tunnel = pegboard_envoy::PegboardEnvoyWs::new(ctx.clone());
let tunnel = pegboard_envoy::PegboardEnvoyWs::new(&ctx);
Ok(RoutingOutput::CustomServe(Arc::new(tunnel)))
}
1 change: 1 addition & 0 deletions engine/packages/pegboard-envoy/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ depot-client.workspace = true
depot-client-embedded.workspace = true
rivet-runtime.workspace = true
rivet-types.workspace = true
rivet-util.workspace = true
scc.workspace = true
serde_bare.workspace = true
serde_json.workspace = true
Expand Down
8 changes: 2 additions & 6 deletions engine/packages/pegboard-envoy/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,12 +45,8 @@ pub struct PegboardEnvoyWs {
}

impl PegboardEnvoyWs {
pub fn new(ctx: StandaloneCtx) -> Self {
metrics::prepopulate();

let service = Self { ctx: ctx.clone() };

service
pub fn new(ctx: &StandaloneCtx) -> Self {
Self { ctx: ctx.clone() }
}
}

Expand Down
69 changes: 0 additions & 69 deletions engine/packages/pegboard-envoy/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -371,72 +371,3 @@ pub fn set_envoy_connection_state(
(None, None) => {}
}
}

pub fn prepopulate() {
ENVOY_CONNECTED.with_label_values(&["", ""]).set(0);
for state in EnvoyState::ALL {
ENVOY_CONNECTIONS_BY_STATE
.with_label_values(&["", "", "", state.as_str()])
.set(0);
}
for (state, reasons) in [
(EnvoyState::Starting, &["websocket_accepted"][..]),
(EnvoyState::Connected, &["init_complete"][..]),
(EnvoyState::Stopping, &["envoy_reported_stopping"][..]),
(
EnvoyState::Disconnected,
&[
"init_failed",
"websocket_closed",
"evicted",
"going_away",
"connection_error",
][..],
),
(EnvoyState::Lost, &["ping_timeout"][..]),
(EnvoyState::Stopped, &["graceful_shutdown_complete"][..]),
] {
for reason in reasons {
ENVOY_STATE_TRANSITION_TOTAL
.with_label_values(&["", "", "", state.as_str(), reason])
.inc_by(0);
}
}
let _ = ENVOY_LIFETIME_SECONDS.with_label_values(&["", ""]);
let _ = ENVOY_PING_LAG_SECONDS.with_label_values(&["", ""]);
for result in ["ok", "no_subscribers", "error"] {
TUNNEL_PUBLISH_TOTAL
.with_label_values(&["", "", result])
.inc_by(0);
}
TUNNEL_TASKS_ACTIVE.with_label_values(&["", ""]).set(0);
WS_RESPONSES_IN_FLIGHT.set(0);
for task_kind in ["kv", "sqlite_page", "remote_sqlite", "tunnel_message"] {
ACTOR_TASKS_ACTIVE.with_label_values(&[task_kind]).set(0);
}
for branch in ["ws_msg", "completed_task"] {
let _ = WS_TO_TUNNEL_BRANCH_DURATION.with_label_values(&[branch]);
}
for result in ["ok", "error", "timeout"] {
let _ = ACTOR_WAKE_DURATION.with_label_values(&["", "", result]);
}
let _ = SQLITE_COMMIT_ENVOY_DISPATCH_DURATION.with_label_values(&["", ""]);
let _ = SQLITE_COMMIT_ENVOY_RESPONSE_DURATION.with_label_values(&["", ""]);
for request_type in ["get_pages", "commit", "exec", "execute"] {
for result in ["ok", "error"] {
SQLITE_REQUEST_TOTAL
.with_label_values(&["", "", request_type, result])
.inc_by(0);
let _ = SQLITE_REQUEST_DURATION.with_label_values(&["", "", request_type, result]);
}
for direction in ["request", "response"] {
let _ = SQLITE_REQUEST_PAGES.with_label_values(&["", "", request_type, direction]);
}
let _ = SQLITE_REQUEST_DIRTY_PAGES.with_label_values(&["", "", request_type]);
for direction in ["request", "response"] {
SQLITE_REQUEST_PAYLOAD_BYTES
.with_label_values(&["", "", request_type, direction])
.inc_by(0);
}
}
}
35 changes: 34 additions & 1 deletion engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,13 @@

const MAX_REMOTE_SQL_BIND_BYTES: usize = 128 * 1024;

/// Max number of pages a single `get_pages` request may ask for. Each requested page number is ~4
/// bytes on the wire but forces the engine to fetch and materialize up to a full 4 KiB page inside
/// one UDB transaction, so an uncapped list is a large cost-asymmetry amplifier from an untrusted
/// runner. Sized well above observed production read batches (max ~1024 pages); the commit path has
/// its own lower cap (`MAX_COMMIT_DIRTY_PAGES`) since writes batch smaller than reads.
const MAX_GET_PAGES_PER_REQUEST: usize = 8192;

/// Wall-clock threshold above which a single handle_message invocation is logged as a head-of-line
/// blocking risk. The ws_to_tunnel_task loop is strictly serial per envoy, so any handler that
/// spends longer than this delays every subsequent WS message from the same envoy (including
Expand Down Expand Up @@ -461,9 +468,25 @@
let mut term_signal = rivet_runtime::TermSignal::get();
let mut task_manager = TaskManager::new(ctx.clone(), conn.clone());

// Leaky bucket rate limit on consuming envoy ws messages. The envoy connection multiplexes
// every actor on a runner, so this bounds the rate at which a single untrusted runner can drive
// engine work (task spawns, KV/SQLite ops). Reads are paused while empty, applying TCP
// backpressure to the runner rather than dropping protocol messages.

Check warning on line 474 in engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs

View workflow job for this annotation

GitHub Actions / Rustfmt

Diff in /home/runner/work/rivet/rivet/engine/packages/pegboard-envoy/src/ws_to_tunnel_task.rs
let mut rate_limit = rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::LeakyBucket {
requests: ctx.config().pegboard().envoy_websocket_rate_limit_requests(),
drip_rate: Duration::from_micros(
ctx.config().pegboard().envoy_websocket_rate_limit_drip_rate_us(),
),
},
);

loop {
tokio::select! {
recv = recv_msg(&mut ws_rx, &mut ws_to_tunnel_abort_rx, &mut term_signal) => {
recv = async {
rate_limit.acquire().await;
recv_msg(&mut ws_rx, &mut ws_to_tunnel_abort_rx, &mut term_signal).await
} => {
let branch_start = Instant::now();
let branch_result: Result<Option<LifecycleResult>> = async {
match recv? {
Expand Down Expand Up @@ -1413,6 +1436,16 @@
conn: &Conn,
request: protocol::SqliteGetPagesRequest,
) -> Result<protocol::SqliteGetPagesResponse> {
if request.pgnos.len() > MAX_GET_PAGES_PER_REQUEST {
return Ok(protocol::SqliteGetPagesResponse::SqliteErrorResponse(
sqlite_protocol_error_response(&format!(
"sqlite get_pages requested {} pages, exceeding limit {}",
request.pgnos.len(),
MAX_GET_PAGES_PER_REQUEST,
)),
));
}

validate_sqlite_actor_for_request(ctx, conn, &request.actor_id, request.expected_generation)
.await?;

Expand Down
39 changes: 0 additions & 39 deletions engine/packages/pegboard-gateway2/src/metrics.rs
Original file line number Diff line number Diff line change
Expand Up @@ -66,42 +66,3 @@ lazy_static::lazy_static! {
*REGISTRY
).unwrap();
}

pub fn prepopulate() {
const RESULTS: &[&str] = &[
"success",
"client_disconnect",
"actor_ready_timeout",
"request_timeout",
"envoy_error",
];

for protocol in ["http", "websocket"] {
IN_FLIGHT.with_label_values(&["", "", protocol]).set(0);
IN_FLIGHT_DROPPED_TOTAL
.with_label_values(&["", "", protocol, "client_disconnect"])
.inc_by(0);
TUNNEL_PING_DURATION.with_label_values(&["", "", protocol]);
LAST_PONG_AGE_SECONDS.with_label_values(&["", "", protocol]);
REQUEST_RETRIES_TOTAL.with_label_values(&["", "", protocol, "1"]);

for result in RESULTS {
REQUEST_DURATION_SECONDS.with_label_values(&["", "", protocol, result]);
}

for reason in [
"server_close",
"client_close",
"abort",
"gc_timeout",
"shutdown",
] {
CLOSE_SENT_TOTAL
.with_label_values(&["", "", protocol, reason])
.inc_by(0);
}
}
for result in ["ok", "error", "timeout"] {
WEBSOCKET_OPEN_WAIT_SECONDS.with_label_values(&["", "", result]);
}
}
1 change: 0 additions & 1 deletion engine/packages/pegboard-gateway2/src/shared_state.rs
Original file line number Diff line number Diff line change
Expand Up @@ -160,7 +160,6 @@ pub struct SharedState(Arc<SharedStateInner>);

impl SharedState {
pub fn new(config: &rivet_config::Config, ups: PubSub) -> Self {
metrics::prepopulate();
init_slow_ping_threshold_from_env();

let gateway_id = protocol::util::generate_gateway_id();
Expand Down
17 changes: 11 additions & 6 deletions engine/packages/pegboard-gateway2/src/ws_to_tunnel_task.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,21 +25,26 @@ pub async fn task(
let mut ws_rx = ws_rx.lock().await;

// Leaky bucket rate limit on consuming ws messages
let pegboard_config = ctx.config().pegboard();
let mut rate_limit = rivet_util::throttle::RateLimiter::new(
rivet_util::throttle::RateLimitMethod::LeakyBucket {
requests: pegboard_config.gateway_websocket_rate_limit_requests(),
requests: ctx
.config()
.pegboard()
.gateway_websocket_rate_limit_requests(),
drip_rate: Duration::from_millis(
pegboard_config.gateway_websocket_rate_limit_drip_rate_ms(),
ctx.config()
.pegboard()
.gateway_websocket_rate_limit_drip_rate_ms(),
),
},
);

loop {
rate_limit.acquire().await;

tokio::select! {
res = ws_rx.try_next() => {
res = async {
rate_limit.acquire().await;
ws_rx.try_next().await
} => {
if let Some(msg) = res? {
ingress_bytes.fetch_add(msg.len() as u64, Ordering::AcqRel);

Expand Down
2 changes: 0 additions & 2 deletions engine/packages/pegboard-outbound/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,6 @@ const SSE_OPEN_WARN_THRESHOLD: Duration = Duration::from_secs(5);

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
metrics::prepopulate();

let cache = rivet_cache::CacheInner::from_env(&config, pools.clone())?;
let ctx = StandaloneCtx::new(
db::DatabaseKv::new(config.clone(), pools.clone()).await?,
Expand Down
Loading
Loading