diff --git a/packages/dashmate/configs/defaults/getBaseConfigFactory.js b/packages/dashmate/configs/defaults/getBaseConfigFactory.js index 00ab3486264..6aad7e7bd3b 100644 --- a/packages/dashmate/configs/defaults/getBaseConfigFactory.js +++ b/packages/dashmate/configs/defaults/getBaseConfigFactory.js @@ -241,6 +241,9 @@ export default function getBaseConfigFactory() { blacklist: [], whitelist: [], enabled: true, + responseHeaders: { + enabled: true, + }, }, ssl: { enabled: false, diff --git a/packages/dashmate/configs/getConfigFileMigrationsFactory.js b/packages/dashmate/configs/getConfigFileMigrationsFactory.js index da87a2b3516..90f5b770978 100644 --- a/packages/dashmate/configs/getConfigFileMigrationsFactory.js +++ b/packages/dashmate/configs/getConfigFileMigrationsFactory.js @@ -1520,6 +1520,25 @@ export default function getConfigFileMigrationsFactory(homeDir, defaultConfigs) return configFile; }, + '4.0.0-rc.3': (configFile) => { + Object.entries(configFile.configs) + .forEach(([, options]) => { + // Add responseHeaders toggle to rate limiter (default true so existing + // deployments keep emitting RateLimit-* headers; rs-dapi-client depends + // on RateLimit-Reset to apply precise ban windows instead of the + // exponential health-ban ladder). + // Keyed at the next release (4.0.0-rc.3), not the already-released + // rc.2: the runner skips fromVersion===toVersion, so a key equal to + // an operator's current version never fires. Backfill runs once the + // package bumps to rc.3 (mirrors the 3.1.0 migration added at 3.1.0-dev.1). + if (options.platform?.gateway?.rateLimiter + && typeof options.platform.gateway.rateLimiter.responseHeaders === 'undefined') { + options.platform.gateway.rateLimiter.responseHeaders = base.get('platform.gateway.rateLimiter.responseHeaders'); + } + }); + + return configFile; + }, }; } diff --git a/packages/dashmate/docker-compose.rate_limiter.yml b/packages/dashmate/docker-compose.rate_limiter.yml index d652b7fa22d..cb6103febe0 100644 --- a/packages/dashmate/docker-compose.rate_limiter.yml +++ b/packages/dashmate/docker-compose.rate_limiter.yml @@ -42,6 +42,11 @@ services: - GRPC_MAX_CONNECTION_AGE=1h - GRPC_MAX_CONNECTION_AGE_GRACE=10m - GRPC_PORT=8081 + # Emit RateLimit-Limit / RateLimit-Remaining / RateLimit-Reset response + # headers so rs-dapi-client can read the exact reset window and ban the + # node for that duration instead of the exponential health-ban ladder. + # Controlled by platform.gateway.rateLimiter.responseHeaders.enabled. + - LIMIT_RESPONSE_HEADERS_ENABLED=${PLATFORM_GATEWAY_RATE_LIMITER_RESPONSE_HEADERS_ENABLED:?err} expose: - 8081 profiles: diff --git a/packages/dashmate/docs/config/gateway.md b/packages/dashmate/docs/config/gateway.md index bb2ccc063d5..11e6d17c2cd 100644 --- a/packages/dashmate/docs/config/gateway.md +++ b/packages/dashmate/docs/config/gateway.md @@ -159,6 +159,7 @@ The rate limiter protects the Platform from excessive requests: | `platform.gateway.rateLimiter.unit` | Time unit for rate limiting | `minute` | `hour` | | `platform.gateway.rateLimiter.whitelist` | IPs exempt from rate limiting | `[]` | `["192.168.1.1"]` | | `platform.gateway.rateLimiter.blacklist` | IPs blocked from all requests | `[]` | `["10.0.0.1"]` | +| `platform.gateway.rateLimiter.responseHeaders.enabled` | Emit `RateLimit-Limit`, `RateLimit-Remaining`, and `RateLimit-Reset` response headers. `rs-dapi-client` reads the Reset header to apply a precise ban window instead of the exponential health-ban ladder. Disable only for privacy reasons. | `true` | `false` | Available time units: - `second`: Per-second rate limiting diff --git a/packages/dashmate/src/config/configJsonSchema.js b/packages/dashmate/src/config/configJsonSchema.js index 075a7f30adf..127d9902f8e 100644 --- a/packages/dashmate/src/config/configJsonSchema.js +++ b/packages/dashmate/src/config/configJsonSchema.js @@ -690,8 +690,22 @@ export default { enabled: { type: 'boolean', }, + responseHeaders: { + type: 'object', + description: 'Control emission of RateLimit-* response headers (RateLimit-Limit, ' + + 'RateLimit-Remaining, RateLimit-Reset). When enabled, rs-dapi-client reads ' + + 'the Reset header to ban the node for the server-advertised window instead ' + + 'of the exponential health-ban ladder. Disable only for privacy reasons.', + properties: { + enabled: { + type: 'boolean', + }, + }, + additionalProperties: false, + required: ['enabled'], + }, }, - required: ['docker', 'enabled', 'unit', 'requestsPerUnit', 'blacklist', 'whitelist', 'metrics'], + required: ['docker', 'enabled', 'unit', 'requestsPerUnit', 'blacklist', 'whitelist', 'metrics', 'responseHeaders'], additionalProperties: false, }, ssl: { diff --git a/packages/dashmate/templates/platform/gateway/envoy.yaml.dot b/packages/dashmate/templates/platform/gateway/envoy.yaml.dot index 0712e7ccf1a..4340ccabe98 100644 --- a/packages/dashmate/templates/platform/gateway/envoy.yaml.dot +++ b/packages/dashmate/templates/platform/gateway/envoy.yaml.dot @@ -78,6 +78,23 @@ {{?}} http_filters: # TODO: Introduce when stable https://www.envoyproxy.io/docs/envoy/latest/configuration/http/http_filters/adaptive_concurrency_filter.html + # Filter order matters: cors and grpc_web MUST precede ratelimit so the + # over-limit RESOURCE_EXHAUSTED *local reply* gets CORS headers + grpc-web + # content-type framing on encode, letting browser (grpc-web) clients read + # RateLimit-Reset for the same node-backoff the native client does. A local + # reply only traverses encoder filters positioned ABOVE its generating + # filter (Envoy #11776), so ratelimit must come last (before router). + # TODO(verify on pinned Envoy build): the grpc_web encode behavior on a + # trailers-only over-limit local reply (keeps grpc-status+ratelimit-reset in + # HTTP headers vs reframing into a body trailer) is inferred from the + # gRPC-Web spec, not confirmed against dashmate's pinned image. Smoke-test + # before relying on browser parity (see PR #3951). + - name: envoy.filters.http.cors + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors + - name: envoy.filters.http.grpc_web + typed_config: + "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb {{? it.platform.gateway.rateLimiter.enabled}} - name: envoy.filters.http.ratelimit typed_config: @@ -96,12 +113,6 @@ timeout: 0.5s transport_api_version: V3 {{?}} - - name: envoy.filters.http.grpc_web - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.grpc_web.v3.GrpcWeb - - name: envoy.filters.http.cors - typed_config: - "@type": type.googleapis.com/envoy.extensions.filters.http.cors.v3.Cors - name: envoy.filters.http.router typed_config: "@type": type.googleapis.com/envoy.extensions.filters.http.router.v3.Router @@ -198,7 +209,7 @@ allow_methods: GET, PUT, DELETE, POST, OPTIONS allow_headers: keep-alive,user-agent,cache-control,content-type,content-transfer-encoding,custom-header-1,x-accept-content-transfer-encoding,x-accept-response-streaming,x-user-agent,x-grpc-web,grpc-timeout max_age: "1728000" - expose_headers: custom-header-1,grpc-status,grpc-message,code,drive-error-data-bin,dash-serialized-consensus-error-bin,stack-bin + expose_headers: custom-header-1,grpc-status,grpc-message,code,drive-error-data-bin,dash-serialized-consensus-error-bin,stack-bin,ratelimit-reset,ratelimit-limit,ratelimit-remaining static_resources: listeners: diff --git a/packages/rs-dapi-client/src/address_list.rs b/packages/rs-dapi-client/src/address_list.rs index fc1d734ccfe..5bac1674666 100644 --- a/packages/rs-dapi-client/src/address_list.rs +++ b/packages/rs-dapi-client/src/address_list.rs @@ -90,10 +90,18 @@ impl AddressStatus { /// Ban the [Address] and record the `reason` for the ban. /// - /// Applies the same exponential backoff as [`AddressStatus::ban`]; - /// the only difference is that `ban_reason` is stored so callers - /// (diagnostics, the iOS explorer) can surface why a node was - /// banned. + /// Applies exponential backoff: the ban window is `base × e^ban_count` + /// (where `ban_count` is the value *before* this call), and `banned_until` + /// is always re-based to `now + window` unconditionally, regardless of any + /// existing active ban. Concretely, a health failure on a node that already + /// holds a longer rate-limit window (set via [`AddressStatus::ban_for`]) will + /// re-base `banned_until` to the exponential value, which may be shorter. + /// This is intentional: the exponential health-ban ladder owns the window for + /// genuinely-unhealthy nodes; the no-shorten guarantee is deliberately scoped + /// to `ban_for → ban_for` sequences only. + /// + /// `ban_count` is incremented and `ban_reason` is updated unconditionally. + /// The counter resets to 0 on [`AddressStatus::unban`]. pub fn ban_with_reason(&mut self, base_ban_period: &Duration, reason: Option) { let coefficient = (self.ban_count as f64).exp(); let ban_period = Duration::from_secs_f64(base_ban_period.as_secs_f64() * coefficient); @@ -103,6 +111,38 @@ impl AddressStatus { self.ban_reason = reason; } + /// Ban the address for an exact `period` (server-advertised), bypassing the + /// exponential ladder used by [`AddressStatus::ban_with_reason`]. + /// + /// The ban window is flat (not exponential). `banned_until` is advanced to + /// `now + period` only when that timestamp is **later** than the current + /// `banned_until`, so a short-reset call never shortens a longer active ban + /// (health ban or a prior longer rate-limit ban). `ban_reason` is updated + /// only when the window is extended. `ban_count` is raised to + /// `max(ban_count, 1)` unconditionally so that `is_banned()` and + /// `ban_info()` correctly report the node as banned. Side-effect: a + /// previously-clean node (ban_count 0) enters the ladder at floor 1, + /// meaning its *next* genuine health failure via + /// [`AddressStatus::ban_with_reason`] uses `60 s × e¹ ≈ 163 s` rather + /// than the first-rung `60 s × e⁰ = 60 s`. The counter resets to 0 on + /// [`AddressStatus::unban`]. + /// + /// Note: the no-shorten guard applies only to `ban_for → ban_for` call + /// sequences. [`AddressStatus::ban_with_reason`] re-bases `banned_until` + /// unconditionally — see its docs for the intentional cross-method semantics. + pub fn ban_for(&mut self, period: Duration, reason: Option) { + let advertised_until = chrono::Utc::now() + period; + if self + .banned_until + .map(|current| current < advertised_until) + .unwrap_or(true) + { + self.banned_until = Some(advertised_until); + self.ban_reason = reason; + } + self.ban_count = self.ban_count.max(1); + } + /// Check if [Address] is banned. pub fn is_banned(&self) -> bool { self.ban_count > 0 @@ -182,6 +222,23 @@ impl AddressList { true } + /// Ban the address for an exact `period` (server-advertised); delegates to + /// [`AddressStatus::ban_for`] — see that method for the full contract + /// including the `ban_count` floor and ladder side-effect. + /// + /// Returns `false` if the address is not in the list. + pub fn ban_for(&self, address: &Address, period: Duration, reason: Option) -> bool { + let mut guard = self.addresses.write().unwrap(); + + let Some(status) = guard.get_mut(address) else { + return false; + }; + + status.ban_for(period, reason); + + true + } + /// Clears address' ban record /// Returns false if the address is not in the list. pub fn unban(&self, address: &Address) -> bool { @@ -237,12 +294,16 @@ impl AddressList { self.add(Address::try_from(uri).expect("valid uri")) } - /// Randomly select a not banned address. + /// Randomly select a not-banned address. + /// + /// An address is considered live when it has never been banned or when its + /// ban period has already expired. pub fn get_live_address(&self) -> Option
{ + // TODO(low): module-wide `.read()/.write().unwrap()` panics on a + // poisoned lock; adopt poison-tolerant locking consistently (SEC-003). let guard = self.addresses.read().unwrap(); let mut rng = SmallRng::from_entropy(); - let now = chrono::Utc::now(); guard @@ -755,4 +816,181 @@ mod tests { let list = AddressList::new(); assert!(list.ban_info().is_empty()); } + + #[test] + fn test_address_status_ban_for_sets_exact_window_and_min_ban_count() { + let mut status = AddressStatus::default(); + assert_eq!(status.ban_count, 0); + assert!(status.banned_until.is_none()); + + let before = chrono::Utc::now(); + status.ban_for(Duration::from_secs(45), Some("rate limited".into())); + let after = chrono::Utc::now(); + + // ban_count must be at least 1 so is_banned() / ban_info().banned are consistent. + assert_eq!(status.ban_count, 1, "ban_for sets ban_count to max(0,1)=1"); + + // banned_until should be roughly now + 45 s. + let until = status.banned_until.expect("banned_until must be set"); + let lower = (until - before).num_milliseconds() as f64 / 1000.0; + let upper = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lower >= 44.9, + "banned_until lower bound too short: {lower}s" + ); + assert!(upper <= 45.1, "banned_until upper bound too long: {upper}s"); + assert_eq!(status.ban_reason.as_deref(), Some("rate limited")); + } + + /// `ban_for` on a fresh node (ban_count = 0) raises ban_count to 1 (the + /// ladder floor). That means the *next* genuine health ban will escalate + /// from position 1 (~163 s) instead of position 0 (~60 s). This pins the + /// documented side-effect so regressions are caught. + #[test] + fn test_ban_for_raises_fresh_node_to_ladder_floor() { + let mut status = AddressStatus::default(); + assert_eq!(status.ban_count, 0, "starts clean"); + + // Rate-limit ban on a never-before-banned node. + status.ban_for(Duration::from_secs(10), Some("rl".into())); + assert_eq!( + status.ban_count, 1, + "ban_for must raise ban_count 0 → 1 (ladder floor)" + ); + + // Subsequent genuine health failure must escalate from the floor (1), + // yielding ~60 s × e^1 ≈ 163 s, NOT the first-rung ~60 s × e^0 = 60 s. + let base = Duration::from_secs(60); + let before = chrono::Utc::now(); + status.ban_with_reason(&base, None); // ban_count 1 → 2; window = 60s × e^1 + let after = chrono::Utc::now(); + assert_eq!(status.ban_count, 2); + + let until = status.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + let expected = 60.0_f64 * std::f64::consts::E; // ≈ 163 s + assert!( + lo >= expected - 0.5, + "window lower {lo:.1}s < expected {expected:.1}s (should escalate from floor 1)" + ); + assert!( + hi <= expected + 0.5, + "window upper {hi:.1}s > expected {expected:.1}s" + ); + } + + #[test] + fn test_address_status_ban_for_does_not_inflate_existing_ban_count() { + // A node already health-banned (ban_count = 3) gets rate-limited. + // ban_count must stay at 3, not grow to 4. + let mut status = AddressStatus::default(); + let base = Duration::from_secs(60); + status.ban_with_reason(&base, None); // → 1 + status.ban_with_reason(&base, None); // → 2 + status.ban_with_reason(&base, None); // → 3 + status.ban_for(Duration::from_secs(30), Some("rl".into())); + assert_eq!( + status.ban_count, 3, + "ban_for must not inflate ban_count above its existing value" + ); + } + + #[test] + fn test_address_list_ban_for_returns_false_for_unknown() { + let list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + assert!(!list.ban_for(&addr, Duration::from_secs(5), None)); + } + + #[test] + fn test_address_list_ban_for_bans_known_address() { + let mut list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + list.add(addr.clone()); + + assert!(list.ban_for(&addr, Duration::from_secs(60), Some("rl".into()))); + // The address must now be hidden from get_live_address. + assert!(list.get_live_address().is_none()); + // ban_count is 1 (ban_for sets max(0,1)). + let info = list.ban_info(); + assert_eq!(info.len(), 1); + assert!(info[0].banned); + assert_eq!(info[0].ban_count, 1); + } + + /// After `ban_for`'s window expires the address re-enters rotation via + /// `get_live_address`. We verify both directions: the node is hidden during + /// an active window, and becomes live once the window has passed. + /// + /// Window-expiry reinstatement is orthogonal to `unban()`: `get_live_address` + /// reinstates a node purely on `banned_until < now` regardless of `ban_count`, + /// so after expiry the node is live again while `is_banned()` (ban_count > 0) + /// is still true. This is a different path from `unban()`, which also zeroes + /// `ban_count`. + #[test] + fn test_ban_for_address_re_enters_rotation_after_window_expires() { + let mut list = AddressList::new(); + let addr: Address = "http://127.0.0.1:3000".parse().unwrap(); + list.add(addr.clone()); + + // Active 300-second window → node hidden. + assert!(list.ban_for(&addr, Duration::from_secs(300), Some("rl".into()))); + assert!( + list.get_live_address().is_none(), + "node must be hidden during active ban window" + ); + + // Simulate window expiry by back-dating banned_until — do NOT touch ban_count. + { + let mut guard = list.addresses.write().unwrap(); + let status = guard.get_mut(&addr).expect("addr must be in list"); + status.banned_until = Some(chrono::Utc::now() - Duration::from_secs(1)); + } + + // After window expiry the node must re-enter rotation … + assert!( + list.get_live_address().is_some(), + "address must re-enter rotation after ban window expires" + ); + // … but ban_count is still > 0, so is_banned() remains true. + // This distinguishes window-expiry from an explicit unban(). + assert!( + list.is_banned(&addr), + "is_banned() must still be true after window expiry (ban_count not reset)" + ); + } + + /// Invariant 1 at the ladder source: the exponential ban window is + /// `base × e^ban_count`, `ban_count` incrementing on each ban. This pins the + /// exact formula independently of the `update_address_ban_status` entrypoint. + #[test] + fn test_ban_ladder_windows_match_exponential_formula() { + let mut status = AddressStatus::default(); + let base_secs = 60.0_f64; + let base = Duration::from_secs(60); + + for n in 0..3usize { + // coefficient uses ban_count BEFORE this ban (== n here). + let before = chrono::Utc::now(); + status.ban(&base); + let after = chrono::Utc::now(); + + assert_eq!(status.ban_count, n + 1, "ban_count must increment"); + let period = base_secs * (n as f64).exp(); + let banned_until = status.banned_until.expect("banned_until is set"); + let lower = (banned_until - before).num_milliseconds() as f64 / 1000.0; + let upper = (banned_until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lower >= period - 0.05, + "ban #{} window lower bound {lower}s < expected {period}s", + n + 1 + ); + assert!( + upper <= period + 0.05, + "ban #{} window upper bound {upper}s > expected {period}s", + n + 1 + ); + } + } } diff --git a/packages/rs-dapi-client/src/dapi_client.rs b/packages/rs-dapi-client/src/dapi_client.rs index 72ccf31c378..aa7aea5dbd2 100644 --- a/packages/rs-dapi-client/src/dapi_client.rs +++ b/packages/rs-dapi-client/src/dapi_client.rs @@ -18,6 +18,17 @@ use crate::{ RequestSettings, }; +/// Intended minimum for the Envoy-advertised `RateLimit-Reset` ban duration. +/// Note: the `> 0` filter applied before the clamp already rejects 0 → `None`, +/// so this constant never actively clamps the lower bound — it documents intent +/// (the smallest meaningful reset is 1 s) and acts as the `.clamp(MIN, MAX)` +/// lower argument for clarity. +pub(crate) const MIN_RATE_LIMIT_BAN_SECS: u64 = 1; +/// Ceiling for the Envoy-advertised `RateLimit-Reset` ban duration. +/// Prevents a misconfigured or hostile header from parking a healthy node for +/// an unreasonably long time. +pub(crate) const MAX_RATE_LIMIT_BAN_SECS: u64 = 600; + /// General DAPI request error type. #[derive(Debug, thiserror::Error, Clone)] #[cfg_attr(feature = "mocks", derive(serde::Serialize, serde::Deserialize))] @@ -67,6 +78,13 @@ impl CanRetry for DapiClientError { DapiClientError::NoAvailableAddresses | DapiClientError::NoAvailableAddressesToRetry(_) ) } + + fn rate_limit_ban_duration(&self) -> Option { + match self { + DapiClientError::Transport(te) => te.rate_limit_ban_duration(), + _ => None, + } + } } /// Serialization of [DapiClientError]. @@ -187,7 +205,25 @@ pub fn update_address_ban_status( if error.can_retry() { if let Some(address) = error.address.as_ref() { if applied_settings.ban_failed_address { - if address_list.ban_with_reason(address, Some(error.to_string())) { + let reason = Some(error.to_string()); + let period_opt = error.rate_limit_ban_duration(); + let banned = match period_opt { + // Envoy advertised a reset window: ban for exactly that period. + // ban_count is set to max(ban_count,1) so diagnostics see the node + // as banned, but the exponential ladder is not inflated. + Some(period) => address_list.ban_for(address, period, reason), + // No rate-limit hint: normal exponential health-ban ladder. + None => address_list.ban_with_reason(address, reason), + }; + if banned { + if let Some(period) = period_opt { + tracing::debug!( + ?address, + ban_secs = period.as_secs(), + "rate-limited (ResourceExhausted): banning {address} for {}s (from RateLimit-Reset header)", + period.as_secs() + ); + } tracing::warn!( ?address, ?error, @@ -276,6 +312,150 @@ mod tests { assert!(!err.can_retry()); } + /// `rate_limit_ban_duration` returns `Some` only when the `ratelimit-reset` + /// header is present and positive on a `ResourceExhausted` response, and + /// the value is clamped to `[MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS]`. + #[test] + fn test_rate_limit_ban_duration_header_parse() { + use dapi_grpc::tonic::metadata::MetadataValue; + + // Helper: build a ResourceExhausted status with a ratelimit-reset header. + let make_rl_status = |header: Option<&str>| -> dapi_grpc::tonic::Status { + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + if let Some(v) = header { + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from(v).unwrap()); + } + status + }; + + // Normal header value: returned clamped. + let s = make_rl_status(Some("45")); + let dur = TransportError::Grpc(s).rate_limit_ban_duration(); + assert_eq!(dur, Some(Duration::from_secs(45))); + + // Value above MAX → clamped to MAX. + let s = make_rl_status(Some("9999")); + let dur = TransportError::Grpc(s).rate_limit_ban_duration(); + assert_eq!(dur, Some(Duration::from_secs(MAX_RATE_LIMIT_BAN_SECS))); + + // Clamp edge: exactly MIN (1) → 1 s (passes through unchanged). + let s = make_rl_status(Some("1")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(1)) + ); + + // Clamp edge: exactly MAX (600) → 600 s (not clamped). + let s = make_rl_status(Some("600")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(600)) + ); + + // One above MAX (601) → clamped to 600 s. + let s = make_rl_status(Some("601")); + assert_eq!( + TransportError::Grpc(s).rate_limit_ban_duration(), + Some(Duration::from_secs(600)) + ); + + // Value below MIN (0) → filtered to None before clamp. + let s = make_rl_status(Some("0")); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Non-numeric → None. + let s = make_rl_status(Some("garbage")); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Header absent → None. + let s = make_rl_status(None); + assert!(TransportError::Grpc(s).rate_limit_ban_duration().is_none()); + + // Non-ResourceExhausted code → None regardless of header. + let mut unavail = dapi_grpc::tonic::Status::unavailable("down"); + unavail + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert!(TransportError::Grpc(unavail) + .rate_limit_ban_duration() + .is_none()); + } + + /// When `ResourceExhausted` carries a valid `ratelimit-reset` header, + /// `update_address_ban_status` calls `ban_for` (exact period, no ladder + /// inflation); when the header is absent it falls through to `ban_with_reason` + /// (normal exponential ladder). + #[test] + fn test_update_address_ban_status_rate_limit_ban_path() { + use dapi_grpc::tonic::metadata::MetadataValue; + + let mut address_list = AddressList::new(); + let addr = mock_address(); + address_list.add(addr.clone()); + + // Build a ResourceExhausted status with ratelimit-reset: 45. + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &make_applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + // Node is banned for ~45 s. + assert!(entry.banned, "rate-limited node must be banned"); + assert_eq!(entry.ban_count, 1, "ban_count must be 1 after ban_for"); + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 44.9 && hi <= 45.1, + "ban window must be ~45 s, got lo={lo} hi={hi}" + ); + } + + /// When `ResourceExhausted` has NO `ratelimit-reset` header, + /// `update_address_ban_status` must fall back to the normal `ban_with_reason` + /// ladder (not produce a zero-second or panic ban). + #[test] + fn test_update_address_ban_status_rate_limit_no_header_uses_ladder() { + let mut address_list = AddressList::new(); + let addr = mock_address(); + address_list.add(addr.clone()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc( + dapi_grpc::tonic::Status::resource_exhausted("429"), + )), + retries: 0, + address: Some(addr.clone()), + }); + update_address_ban_status(&address_list, &result, &make_applied_settings(true)); + + // The ban ladder is invoked: first ban → ban_count = 1, window = 60 s. + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + assert!( + entry.banned, + "node must be banned on ResourceExhausted without header" + ); + assert_eq!( + entry.ban_count, 1, + "first health-ladder ban → ban_count = 1" + ); + } + #[cfg(feature = "mocks")] #[test] fn test_can_retry_mock_error() { diff --git a/packages/rs-dapi-client/src/executor.rs b/packages/rs-dapi-client/src/executor.rs index 963383742ae..cdab8c6db1a 100644 --- a/packages/rs-dapi-client/src/executor.rs +++ b/packages/rs-dapi-client/src/executor.rs @@ -83,6 +83,10 @@ impl CanRetry for ExecutionError { fn is_no_available_addresses(&self) -> bool { self.inner.is_no_available_addresses() } + + fn rate_limit_ban_duration(&self) -> Option { + self.inner.rate_limit_ban_duration() + } } /// Request execution response. diff --git a/packages/rs-dapi-client/src/lib.rs b/packages/rs-dapi-client/src/lib.rs index b31d2b35a2c..2f77b2f7fcc 100644 --- a/packages/rs-dapi-client/src/lib.rs +++ b/packages/rs-dapi-client/src/lib.rs @@ -95,6 +95,15 @@ pub trait CanRetry { false } + /// If this error is a gRPC `ResourceExhausted` (Envoy rate-limit) that + /// carries a `RateLimit-Reset` metadata header, returns the server-advertised + /// ban duration (clamped to a safe range). Returns `None` for all other + /// errors and for rate-limit errors that carry no usable header (the caller + /// falls back to the normal exponential ban ladder in that case). + fn rate_limit_ban_duration(&self) -> Option { + None + } + /// Get boolean flag that indicates if the error is retryable. /// /// Deprecated in favor of [CanRetry::can_retry]. diff --git a/packages/rs-dapi-client/src/transport.rs b/packages/rs-dapi-client/src/transport.rs index f488aeaffa2..bea0968fd2f 100644 --- a/packages/rs-dapi-client/src/transport.rs +++ b/packages/rs-dapi-client/src/transport.rs @@ -106,6 +106,12 @@ impl CanRetry for TransportError { TransportError::Grpc(status) => status.can_retry(), } } + + fn rate_limit_ban_duration(&self) -> Option { + match self { + TransportError::Grpc(status) => status.rate_limit_ban_duration(), + } + } } /// Serialization of [TransportError]. @@ -212,6 +218,65 @@ mod tests { assert!(!non_retryable.can_retry()); } + /// `rate_limit_ban_duration` returns `Some` only for `ResourceExhausted` with + /// a parseable positive `ratelimit-reset` header. Every other code returns + /// `None` regardless of headers. + #[test] + fn test_tonic_status_rate_limit_ban_duration() { + use dapi_grpc::tonic::metadata::MetadataValue; + + // ResourceExhausted + valid header → Some. + let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert_eq!( + status.rate_limit_ban_duration(), + Some(std::time::Duration::from_secs(30)) + ); + + // ResourceExhausted without header → None. + let no_header = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + assert!(no_header.rate_limit_ban_duration().is_none()); + + // Non-ResourceExhausted codes → None regardless. + for code in [ + Code::Ok, + Code::Unavailable, + Code::Internal, + Code::DeadlineExceeded, + ] { + let mut s = dapi_grpc::tonic::Status::new(code, "x"); + s.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + assert!( + s.rate_limit_ban_duration().is_none(), + "code {code:?} must return None" + ); + } + } + + #[test] + fn test_transport_error_rate_limit_ban_duration_delegates() { + use dapi_grpc::tonic::metadata::MetadataValue; + + let mut status = dapi_grpc::tonic::Status::new(Code::ResourceExhausted, "429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + let rate_limited = TransportError::Grpc(status); + assert_eq!( + rate_limited.rate_limit_ban_duration(), + Some(std::time::Duration::from_secs(45)) + ); + // Still retryable — rate-limit ban duration doesn't affect can_retry. + assert!(rate_limited.can_retry()); + + let unavailable = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down")); + assert!(unavailable.rate_limit_ban_duration().is_none()); + assert!(unavailable.can_retry()); + } + #[test] fn test_transport_error_clone() { let original = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("test message")); diff --git a/packages/rs-dapi-client/src/transport/grpc.rs b/packages/rs-dapi-client/src/transport/grpc.rs index 68e0c5ae684..4253ea3d95a 100644 --- a/packages/rs-dapi-client/src/transport/grpc.rs +++ b/packages/rs-dapi-client/src/transport/grpc.rs @@ -146,6 +146,39 @@ impl CanRetry for dapi_grpc::tonic::Status { | Unimplemented ) } + + /// Returns the Envoy-advertised ban duration for a `ResourceExhausted` + /// response, or `None` if this is not a rate-limit or carries no usable + /// `RateLimit-Reset` header. + /// + /// Envoy's global rate-limit filter emits `RateLimit-Reset: ` when + /// `LIMIT_RESPONSE_HEADERS_ENABLED=true` is set on the Lyft RLS container + /// (see `packages/dashmate/docker-compose.rate_limiter.yml`). The value is + /// the whole-second count until the per-IP window resets. + /// + /// Parse rules (adversarial-input safe): + /// * Non-`ResourceExhausted` code → `None`. + /// * Header absent, non-numeric, or `0` → `None` (caller uses normal ban + /// ladder). + /// * Valid positive integer → clamped to + /// [`[MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS]`] + /// (`dapi_client.rs`) and returned as `Some(Duration)`. + fn rate_limit_ban_duration(&self) -> Option { + use crate::dapi_client::{MAX_RATE_LIMIT_BAN_SECS, MIN_RATE_LIMIT_BAN_SECS}; + use dapi_grpc::tonic::Code; + if self.code() != Code::ResourceExhausted { + return None; + } + let secs = self + .metadata() + .get("ratelimit-reset") + .and_then(|v| v.to_str().ok()) + .and_then(|s| s.trim().parse::().ok()) + .filter(|&s| s > 0)?; + Some(std::time::Duration::from_secs( + secs.clamp(MIN_RATE_LIMIT_BAN_SECS, MAX_RATE_LIMIT_BAN_SECS), + )) + } } /// Macro to implement the `TransportRequest` trait for a given request type, response type, client type, and settings. diff --git a/packages/rs-dapi-client/tests/common/mod.rs b/packages/rs-dapi-client/tests/common/mod.rs new file mode 100644 index 00000000000..5a0c56a9c43 --- /dev/null +++ b/packages/rs-dapi-client/tests/common/mod.rs @@ -0,0 +1,106 @@ +//! Shared fake-transport harness for integration tests. +//! +//! Lives in `tests/common/mod.rs` (subdir form) so Cargo does **not** treat +//! it as a standalone test target. Include it from a test file with +//! `mod common;`. + +use std::sync::{Arc, Mutex}; + +use dapi_grpc::mock::Mockable; +use rs_dapi_client::transport::{ + AppliedRequestSettings, BoxFuture, TransportClient, TransportError, TransportRequest, +}; +use rs_dapi_client::{ConnectionPool, RequestSettings, Uri}; + +// ── Fake client ───────────────────────────────────────────────────────────── + +/// Minimal fake transport client: remembers the URI it was built for. +/// There is no real socket — `execute_transport` is never dispatched to the +/// network; the `ScriptedRequest` responder drives it synthetically. +pub struct FakeClient { + pub uri: Uri, +} + +impl TransportClient for FakeClient { + fn with_uri(uri: Uri, _pool: &ConnectionPool) -> Result { + Ok(Self { uri }) + } + + fn with_uri_and_settings( + uri: Uri, + _settings: &AppliedRequestSettings, + _pool: &ConnectionPool, + ) -> Result { + Ok(Self { uri }) + } +} + +// ── Fake response ──────────────────────────────────────────────────────────── + +/// Trivial fake response returned by the scripted responder on success. +#[derive(Debug, Clone)] +pub struct FakeResponse; + +impl Mockable for FakeResponse {} + +// ── Scripted request ───────────────────────────────────────────────────────── + +/// Generic scripted fake request. +/// +/// A caller-supplied `responder` closure decides what to return for each +/// attempt. The `hit_uris` field records every URI that +/// `execute_transport` was invoked with, in order, so tests can inspect +/// how many nodes were tried and which ones. +/// +/// The design handles both "N failures then success" (via a counter captured +/// in the closure) and "always ResourceExhausted+header" patterns. +#[derive(Clone)] +pub struct ScriptedRequest { + pub responder: Arc Result + Send + Sync>, + /// Every URI that `execute_transport` was called with, in invocation order. + pub hit_uris: Arc>>, +} + +impl ScriptedRequest { + /// Create a new `ScriptedRequest` driven by `responder`. + pub fn new( + responder: impl Fn(Uri) -> Result + Send + Sync + 'static, + ) -> Self { + Self { + responder: Arc::new(responder), + hit_uris: Arc::new(Mutex::new(Vec::new())), + } + } +} + +impl std::fmt::Debug for ScriptedRequest { + fn fmt(&self, f: &mut std::fmt::Formatter<'_>) -> std::fmt::Result { + f.debug_struct("ScriptedRequest") + .field("hit_uris", &self.hit_uris) + .finish() + } +} + +impl Mockable for ScriptedRequest {} + +impl TransportRequest for ScriptedRequest { + type Client = FakeClient; + type Response = FakeResponse; + + const SETTINGS_OVERRIDES: RequestSettings = RequestSettings::default(); + + fn method_name(&self) -> &'static str { + "fake_method" + } + + fn execute_transport<'c>( + self, + client: &'c mut Self::Client, + _settings: &AppliedRequestSettings, + ) -> BoxFuture<'c, Result> { + let uri = client.uri.clone(); + self.hit_uris.lock().unwrap().push(uri.clone()); + let result = (self.responder)(uri); + Box::pin(async move { result }) + } +} diff --git a/packages/rs-dapi-client/tests/rate_limit_ban.rs b/packages/rs-dapi-client/tests/rate_limit_ban.rs new file mode 100644 index 00000000000..6f63722a1c7 --- /dev/null +++ b/packages/rs-dapi-client/tests/rate_limit_ban.rs @@ -0,0 +1,454 @@ +//! Integration test: `ResourceExhausted` with a `RateLimit-Reset` header causes +//! the node to be banned for that exact period (`ban_for`), while a missing +//! header falls back to the normal exponential health-ban ladder. + +mod common; + +use common::ScriptedRequest; +use dapi_grpc::tonic::metadata::MetadataValue; +use rs_dapi_client::transport::{AppliedRequestSettings, TransportError}; +use rs_dapi_client::{ + update_address_ban_status, Address, AddressList, CanRetry, DapiClient, DapiClientError, + DapiRequestExecutor, ExecutionError, ExecutionResult, RequestSettings, +}; +use std::time::Duration; + +fn make_address() -> rs_dapi_client::Address { + "http://127.0.0.1:3000".parse().expect("valid address") +} + +fn applied_settings(ban: bool) -> AppliedRequestSettings { + AppliedRequestSettings { + connect_timeout: None, + timeout: Duration::from_secs(10), + retries: 5, + ban_failed_address: ban, + max_decoding_message_size: None, + #[cfg(not(target_arch = "wasm32"))] + ca_certificate: None, + } +} + +/// `ResourceExhausted` + `ratelimit-reset: 45` → `ban_for` with a ~45s window. +/// `ban_count` must be set to at least 1 (diagnostics) but NOT escalated further. +#[test] +fn test_resource_exhausted_with_header_bans_for_advertised_period() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + assert!( + entry.banned, + "node must be banned after ResourceExhausted+header" + ); + assert_eq!(entry.ban_count, 1, "ban_for sets ban_count to max(0,1)=1"); + + // Ban window must be approximately 45 s. + let until = entry.banned_until.expect("banned_until must be set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 44.9 && hi <= 45.1, + "ban window must be ~45 s; got lo={lo:.2}s hi={hi:.2}s" + ); +} + +/// Large `ratelimit-reset` values are clamped to MAX_RATE_LIMIT_BAN_SECS (600). +#[test] +fn test_ratelimit_reset_clamped_to_max() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("9999").unwrap()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + // Clamped at 600 s. + assert!( + lo >= 599.5 && hi <= 600.5, + "9999s must be clamped to 600s; got lo={lo:.2} hi={hi:.2}" + ); +} + +/// `ratelimit-reset: 0` or non-numeric → `None` → normal `ban_with_reason` ladder. +/// +/// The key assertion is the resulting `banned_until` window: the ladder's first +/// ban is `60 s × e^0 = 60 s`, **not** some header-derived value. Checking only +/// `ban_count == 1` would pass even if the wrong path (ban_for) were taken. +#[test] +fn test_zero_and_garbage_header_falls_back_to_ladder() { + // Default AddressList uses DEFAULT_BASE_BAN_PERIOD = 60 s. + // First ladder ban: coefficient = e^0 = 1.0, window = 60 s. + const EXPECTED_WINDOW_SECS: f64 = 60.0; + + for bad in &["0", "garbage", ""] { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + if !bad.is_empty() { + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from(*bad).unwrap()); + } + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc(status)), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + assert!( + entry.banned, + "bad header '{bad}' must still result in a ban via the ladder" + ); + assert_eq!( + entry.ban_count, 1, + "ladder ban → ban_count = 1 for header '{bad}'" + ); + + // The ban window must be the exponential ladder's first rung (~60 s), + // NOT a header-derived value. This assertion fails if ban_for were + // mistakenly called instead of ban_with_reason. + let until = entry + .banned_until + .expect("banned_until set for header '{bad}'"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= EXPECTED_WINDOW_SECS - 0.5, + "bad header '{bad}': ban window lower {lo:.1}s < expected ~{EXPECTED_WINDOW_SECS}s (ladder path)" + ); + assert!( + hi <= EXPECTED_WINDOW_SECS + 0.5, + "bad header '{bad}': ban window upper {hi:.1}s > expected ~{EXPECTED_WINDOW_SECS}s (should be ladder, not ban_for)" + ); + } +} + +/// Missing `ratelimit-reset` header → `None` → normal exponential health-ban ladder. +/// +/// Asserts the `banned_until` window is ~60 s (first ladder rung), NOT a +/// header-derived value. A mere `ban_count == 1` check would pass even if +/// `ban_for` were wrongly invoked (both paths yield ban_count 1 on first ban). +#[test] +fn test_missing_header_falls_back_to_ladder() { + let mut address_list = AddressList::new(); + let addr = make_address(); + address_list.add(addr.clone()); + + let result: ExecutionResult = Err(ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc( + dapi_grpc::tonic::Status::resource_exhausted("429"), + )), + retries: 0, + address: Some(addr.clone()), + }); + + let before = chrono::Utc::now(); + update_address_ban_status(&address_list, &result, &applied_settings(true)); + let after = chrono::Utc::now(); + + let info = address_list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + assert!( + entry.banned, + "missing header must still result in a ladder ban" + ); + assert_eq!(entry.ban_count, 1, "first ladder ban → ban_count = 1"); + + // Window must be the first exponential rung: 60 s × e^0 = 60 s. + let until = entry.banned_until.expect("banned_until set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 59.5, + "ladder window lower {lo:.1}s < expected ~60 s (missing header must use ladder)" + ); + assert!( + hi <= 60.5, + "ladder window upper {hi:.1}s > expected ~60 s (should be ladder, not ban_for)" + ); +} + +/// `ban_for` must never shorten an already-active ban. +/// +/// (a) LONG then SHORT → `banned_until` stays at the LONG window; `ban_reason` +/// is preserved from the LONG call (SHORT must not overwrite it). +/// (b) SHORT then LONG → `banned_until` extends to the LONG window; `ban_reason` +/// is adopted from the LONG call (window was extended, so reason updates). +/// (c) `ban_count` ends at ≥ 1 in both scenarios. +/// +/// QA-007: case (a) uses an exact `assert_eq!` on `banned_until` snapshots — +/// a last-wins regression would produce ~30 s and fail unambiguously. +/// QA-008: named reasons in both cases pin the `ban_reason` update contract. +#[test] +fn test_ban_for_never_shortens_active_ban() { + let addr: rs_dapi_client::Address = "http://127.0.0.1:3000".parse().expect("valid address"); + + // (a) LONG then SHORT — SHORT must NOT change banned_until or ban_reason. + { + let mut list = rs_dapi_client::AddressList::new(); + list.add(addr.clone()); + + let long_period = Duration::from_secs(300); + let short_period = Duration::from_secs(30); + + list.ban_for(&addr, long_period, Some("long-reason".into())); + + // Snapshot banned_until immediately after the LONG ban. + let snapshot = list + .ban_info() + .into_iter() + .find(|i| i.uri == addr.to_string()) + .unwrap() + .banned_until + .expect("banned_until set after LONG ban"); + + list.ban_for(&addr, short_period, Some("short-reason".into())); + + let info = list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + // Exact equality: last-wins regression would produce ~30 s here. + assert_eq!( + entry.banned_until.expect("banned_until must still be set"), + snapshot, + "(a) LONG→SHORT must not change banned_until (last-wins regression guard)" + ); + // ban_reason must be preserved from the LONG call. + assert_eq!( + entry.reason.as_deref(), + Some("long-reason"), + "(a) SHORT ban must not overwrite ban_reason set by LONG ban" + ); + assert!( + entry.ban_count >= 1, + "(a) ban_count must be >= 1; got {}", + entry.ban_count + ); + } + + // (b) SHORT then LONG — LONG must extend banned_until AND adopt ban_reason. + // A trailing ban_for(SHORT) must then be a no-op (kills last-wins regression + // in this direction; case (a) kills the symmetric MIN-wins regression). + { + let mut list = rs_dapi_client::AddressList::new(); + list.add(addr.clone()); + + let short_period = Duration::from_secs(30); + let long_period = Duration::from_secs(300); + + let before_long = chrono::Utc::now(); + list.ban_for(&addr, short_period, Some("short-reason".into())); + list.ban_for(&addr, long_period, Some("long-reason".into())); + + // Snapshot banned_until after LONG extension. + let snapshot_after_long = list + .ban_info() + .into_iter() + .find(|i| i.uri == addr.to_string()) + .unwrap() + .banned_until + .expect("banned_until set after LONG ban"); + + let before_trailing = before_long; // already elapsed < 1 ms + let until_after_long = snapshot_after_long; + let remaining = (until_after_long - before_trailing).num_milliseconds() as f64 / 1000.0; + assert!( + remaining >= 299.0, + "(b) SHORT→LONG must extend ban; remaining={remaining:.1}s, expected >=299s" + ); + + // Trailing SHORT — must not reduce the LONG window (last-wins guard). + list.ban_for(&addr, short_period, Some("trailing-short".into())); + + let info = list.ban_info(); + let entry = info.iter().find(|i| i.uri == addr.to_string()).unwrap(); + + // Exact equality: a last-wins impl would drop banned_until to ~30 s here. + assert_eq!( + entry.banned_until.expect("banned_until must still be set"), + snapshot_after_long, + "(b) trailing SHORT must not change banned_until set by LONG (last-wins regression guard)" + ); + // ban_reason must be adopted from the LONG call (window was extended). + assert_eq!( + entry.reason.as_deref(), + Some("long-reason"), + "(b) LONG ban must update ban_reason when it extends the window" + ); + assert!( + entry.ban_count >= 1, + "(b) ban_count must be >= 1; got {}", + entry.ban_count + ); + } +} + +/// `rate_limit_ban_duration` on `CanRetry` returns `Some` only for +/// `ResourceExhausted` with a parseable positive `ratelimit-reset`. +#[test] +fn test_rate_limit_ban_duration_trait_delegation() { + use rs_dapi_client::ExecutionError; + + // With header → Some(45s). + let mut s = dapi_grpc::tonic::Status::resource_exhausted("429"); + s.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("45").unwrap()); + let te = TransportError::Grpc(s); + assert_eq!(te.rate_limit_ban_duration(), Some(Duration::from_secs(45))); + + // Unavailable → None. + let unavail = TransportError::Grpc(dapi_grpc::tonic::Status::unavailable("down")); + assert!(unavail.rate_limit_ban_duration().is_none()); + + // ResourceExhausted without header → None. + let re_no_header = TransportError::Grpc(dapi_grpc::tonic::Status::resource_exhausted("429")); + assert!(re_no_header.rate_limit_ban_duration().is_none()); + + // DapiClientError delegates. + let dce = DapiClientError::Transport(TransportError::Grpc({ + let mut s2 = dapi_grpc::tonic::Status::resource_exhausted("429"); + s2.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("30").unwrap()); + s2 + })); + assert_eq!(dce.rate_limit_ban_duration(), Some(Duration::from_secs(30))); + + // ExecutionError delegates. + let ee: ExecutionError = ExecutionError { + inner: DapiClientError::Transport(TransportError::Grpc({ + let mut s3 = dapi_grpc::tonic::Status::resource_exhausted("429"); + s3.metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("20").unwrap()); + s3 + })), + retries: 0, + address: Some(make_address()), + }; + assert_eq!(ee.rate_limit_ban_duration(), Some(Duration::from_secs(20))); +} + +/// End-to-end proof: a `ResourceExhausted` response carrying `ratelimit-reset: +/// 300` that travels through the *real* `DapiClient::execute()` path causes the +/// node to be banned for the Envoy-advertised 300 s window (`ban_for`), NOT the +/// exponential ladder's first rung (~60 s). +/// +/// The 300 s figure is deliberately chosen so that: +/// - it differs unambiguously from the ladder's first rung (60 s), and +/// - it is within the [1, 600] s clamp range. +/// +/// `ban_count == 1` confirms the flat `ban_for` path, not the ladder. +#[tokio::test] +async fn rate_limited_node_banned_for_advertised_window_via_execute() { + // Single-node pool: after one ResourceExhausted the node is banned and no + // live address remains, which collapses to NoAvailableAddressesToRetry. + let node: Address = "http://127.0.0.1:10201".parse().expect("valid address"); + let address_list: AddressList = "http://127.0.0.1:10201" + .parse() + .expect("valid address list"); + + // The responder returns ResourceExhausted + ratelimit-reset: 300 on every call. + let request = ScriptedRequest::new(|_uri| { + let mut status = dapi_grpc::tonic::Status::resource_exhausted("429"); + status + .metadata_mut() + .insert("ratelimit-reset", MetadataValue::try_from("300").unwrap()); + Err(TransportError::Grpc(status)) + }); + + let client = DapiClient::new(address_list, RequestSettings::default()); + + let before = chrono::Utc::now(); + let err = client + .execute(request.clone(), RequestSettings::default()) + .await + .expect_err("single rate-limited node: expect NoAvailableAddressesToRetry"); + let after = chrono::Utc::now(); + + // The only node must be banned. + assert!( + client.address_list().is_banned(&node), + "rate-limited node must be banned after execute()" + ); + + // The error must be the address-exhaustion variant (single node, now banned). + match err.inner { + DapiClientError::NoAvailableAddressesToRetry(_) => {} + other => panic!( + "expected NoAvailableAddressesToRetry from exhausted single-node pool, got: {other:?}" + ), + } + + // The ban window must be ~300 s (from the ratelimit-reset header), NOT ~60 s + // (the ladder's first rung). This is the key assertion that proves ban_for + // was called through execute(), not ban_with_reason. + let info = client.address_list().ban_info(); + let entry = info + .iter() + .find(|i| i.uri == node.to_string()) + .expect("ban_info entry for the rate-limited node"); + + assert!(entry.banned, "node must be effectively banned"); + + // ban_count == 1 proves the flat ban_for floor, not the exponential ladder + // (both yield ban_count 1 on first ban, but the window assertion below + // disambiguates; this assertion additionally documents the intent). + assert_eq!( + entry.ban_count, 1, + "ban_for sets ban_count to 1 (flat floor, not ladder escalation)" + ); + + let until = entry.banned_until.expect("banned_until must be set"); + let lo = (until - before).num_milliseconds() as f64 / 1000.0; + let hi = (until - after).num_milliseconds() as f64 / 1000.0; + assert!( + lo >= 299.5 && hi <= 300.5, + "ban window must be ~300 s (ratelimit-reset header); got lo={lo:.3}s hi={hi:.3}s" + ); +} diff --git a/packages/rs-dapi-client/tests/unimplemented_failover.rs b/packages/rs-dapi-client/tests/unimplemented_failover.rs index 285f712175a..cd91c8fecd1 100644 --- a/packages/rs-dapi-client/tests/unimplemented_failover.rs +++ b/packages/rs-dapi-client/tests/unimplemented_failover.rs @@ -4,108 +4,39 @@ //! retry the request on another address. If every node is unimplemented, the //! error must still surface instead of retrying forever. +mod common; + use std::sync::{Arc, Mutex}; -use dapi_grpc::mock::Mockable; +use common::{FakeResponse, ScriptedRequest}; use dapi_grpc::tonic::{Code, Status}; -use rs_dapi_client::transport::{ - AppliedRequestSettings, BoxFuture, TransportClient, TransportError, TransportRequest, -}; +use rs_dapi_client::transport::TransportError; use rs_dapi_client::{ - Address, AddressList, CanRetry, ConnectionPool, DapiClient, DapiClientError, - DapiRequestExecutor, RequestSettings, Uri, + Address, AddressList, CanRetry, DapiClient, DapiClientError, DapiRequestExecutor, + RequestSettings, Uri, }; -/// Transport client that only remembers which node it was created for. -struct FakeClient { - uri: Uri, -} - -impl TransportClient for FakeClient { - fn with_uri(uri: Uri, _pool: &ConnectionPool) -> Result { - Ok(Self { uri }) - } - - fn with_uri_and_settings( - uri: Uri, - _settings: &AppliedRequestSettings, - _pool: &ConnectionPool, - ) -> Result { - Ok(Self { uri }) - } -} - -#[derive(Debug)] -struct FakeResponse; - -impl Mockable for FakeResponse {} - -#[derive(Debug, Default)] -struct State { - /// How many more attempts should be answered with UNIMPLEMENTED, - /// simulating nodes that run an older build. - unimplemented_responses_left: usize, - /// Nodes that answered UNIMPLEMENTED, in order. - unimplemented_uris: Vec, -} - -/// Request that answers UNIMPLEMENTED for the first N attempts (each failing -/// node gets banned by the executor, so each attempt hits a different node) -/// and succeeds afterwards. -#[derive(Clone, Debug)] -struct MixedVersionRequest { - state: Arc>, -} - -impl MixedVersionRequest { - fn with_unimplemented_responses(count: usize) -> Self { - Self { - state: Arc::new(Mutex::new(State { - unimplemented_responses_left: count, - unimplemented_uris: Vec::new(), - })), - } - } -} - -impl Mockable for MixedVersionRequest {} - -impl TransportRequest for MixedVersionRequest { - type Client = FakeClient; - type Response = FakeResponse; - - const SETTINGS_OVERRIDES: RequestSettings = RequestSettings::default(); - - fn method_name(&self) -> &'static str { - "fake_shielded_method" - } - - fn execute_transport<'c>( - self, - client: &'c mut Self::Client, - _settings: &AppliedRequestSettings, - ) -> BoxFuture<'c, Result> { - let result = { - let mut state = self.state.lock().unwrap(); - if state.unimplemented_responses_left > 0 { - state.unimplemented_responses_left -= 1; - state.unimplemented_uris.push(client.uri.clone()); - Err(TransportError::Grpc(Status::unimplemented( - "Operation is not implemented or not supported", - ))) - } else { - Ok(FakeResponse) - } - }; - - Box::pin(async move { result }) - } -} - #[tokio::test] async fn unimplemented_node_is_banned_and_request_retried_on_another() { - // One of the two nodes still runs an older build without the method. - let request = MixedVersionRequest::with_unimplemented_responses(1); + // The closure captures its own state: exactly one UNIMPLEMENTED response, + // then success. `error_uris` records which node answered UNIMPLEMENTED. + let error_uris: Arc>> = Default::default(); + let error_uris_c = error_uris.clone(); + + let request = ScriptedRequest::new(move |uri| { + let mut uris = error_uris_c.lock().unwrap(); + if uris.is_empty() { + // First call: simulate an old node that doesn't have the method yet. + uris.push(uri); + Err(TransportError::Grpc(Status::unimplemented( + "Operation is not implemented or not supported", + ))) + } else { + // Subsequent calls: upgraded node responds successfully. + Ok(FakeResponse) + } + }); + let address_list: AddressList = "http://127.0.0.1:10001,http://127.0.0.1:10002" .parse() .expect("valid address list"); @@ -117,13 +48,13 @@ async fn unimplemented_node_is_banned_and_request_retried_on_another() { .expect("request should succeed on the upgraded node"); let old_node_uri = { - let state = request.state.lock().unwrap(); + let uris = error_uris.lock().unwrap(); assert_eq!( - state.unimplemented_uris.len(), + uris.len(), 1, "exactly one node should have answered UNIMPLEMENTED" ); - state.unimplemented_uris[0].clone() + uris[0].clone() }; assert_eq!(response.retries, 1); @@ -143,7 +74,12 @@ async fn unimplemented_node_is_banned_and_request_retried_on_another() { #[tokio::test] async fn unimplemented_on_all_nodes_still_surfaces_error() { // No node implements the method: every attempt answers UNIMPLEMENTED. - let request = MixedVersionRequest::with_unimplemented_responses(usize::MAX); + // `hit_uris` counts total attempts (all of which are errors here). + let request = ScriptedRequest::new(|_uri| { + Err(TransportError::Grpc(Status::unimplemented( + "Operation is not implemented or not supported", + ))) + }); let address_list: AddressList = "http://127.0.0.1:10003,http://127.0.0.1:10004" .parse() .expect("valid address list"); @@ -155,7 +91,7 @@ async fn unimplemented_on_all_nodes_still_surfaces_error() { .expect_err("request must fail when no node implements the method"); // Both nodes were tried once each before the address list was exhausted. - assert_eq!(request.state.lock().unwrap().unimplemented_uris.len(), 2); + assert_eq!(request.hit_uris.lock().unwrap().len(), 2); assert!( !error.can_retry(), "exhausted-addresses error must not be retried by callers" @@ -186,7 +122,11 @@ async fn unimplemented_surfaces_retryable_when_pool_exceeds_retry_budget() { // Three live nodes, retry budget of 1 → at most retries + 1 = 2 attempts, so // the retry cap trips before all three addresses are banned. - let request = MixedVersionRequest::with_unimplemented_responses(usize::MAX); + let request = ScriptedRequest::new(|_uri| { + Err(TransportError::Grpc(Status::unimplemented( + "Operation is not implemented or not supported", + ))) + }); let address_list: AddressList = "http://127.0.0.1:10005,http://127.0.0.1:10006,http://127.0.0.1:10007" .parse() @@ -201,7 +141,7 @@ async fn unimplemented_surfaces_retryable_when_pool_exceeds_retry_budget() { // retries + 1 = 2 nodes were tried (and banned) before the retry cap tripped; // the third address is never reached. assert_eq!( - request.state.lock().unwrap().unimplemented_uris.len(), + request.hit_uris.lock().unwrap().len(), 2, "retry budget (1) caps attempts at 2 before the 3-node pool is exhausted" );