From b6fefaaeea812c87b9b1d883f1a3d872a09adb20 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:07:56 +1200 Subject: [PATCH 01/15] Add slow sync strike out logic --- crates/block-producer/src/lib.rs | 2 +- crates/rpc/src/server/api.rs | 3 + .../samples/02-with-account-files/bridge.mac | Bin 37762 -> 37762 bytes crates/store/src/state/subscription.rs | 113 +++++++++++++----- crates/utils/src/lib.rs | 5 + 5 files changed, 89 insertions(+), 34 deletions(-) diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index 35691a6e6..c9f460b50 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -64,7 +64,7 @@ const SERVER_MEMPOOL_EXPIRATION_SLACK: u32 = 2; const CACHED_MEMPOOL_STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5); /// How often a block is created. -pub const DEFAULT_BLOCK_INTERVAL: Duration = Duration::from_secs(3); +pub use miden_node_utils::DEFAULT_BLOCK_INTERVAL; /// How often a batch is created. pub const DEFAULT_BATCH_INTERVAL: Duration = Duration::from_secs(1); diff --git a/crates/rpc/src/server/api.rs b/crates/rpc/src/server/api.rs index 45dc2a79c..115fb9344 100644 --- a/crates/rpc/src/server/api.rs +++ b/crates/rpc/src/server/api.rs @@ -1209,6 +1209,9 @@ fn state_subscription_error_to_status(err: StateSubscriptionError) -> Status { "failed to load proof for block {block_num}: {}", source.as_report() )), + StateSubscriptionError::TooSlow => { + Status::resource_exhausted("subscriber is too slow to keep up with the chain") + }, } } diff --git a/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac b/crates/store/src/genesis/config/samples/02-with-account-files/bridge.mac index 6437e37b2a98f85a7cf5d6faa21d534ec0f49f01..8412cfe5cb431e2e9b554f126ab3e959d5d57864 100644 GIT binary patch delta 3925 zcmai%_dnE+1Av{86Fy32)*E+Zg>1*Cv*#gu#o3(59u;@?%(%13A$xPqiV!YYA(;&s zSvi>*rSJDY`27C7o}Zs5x{5NoiZY{)={R?|k+zA&Ke^Vnqt)5Fr8^A7RIw?sHolTmnrI+Y7v<7wvm^xl3%DN7W;CUVM^=HzCZl z9}NGV^vOG@sPVVcE+X|3N@}@K98kaHN6x>Zr6n;wMz3X${E3g&`9nZC6E#H6 zi9nE0MZHEG_?~zxM zx;F3wM0;&1=waqryj;S4$z#4jcl%Q%;ttSoz`6$06@m)lmQuuO|5emc$NwykV{&7Qw*{OK`Y3=>Wi>9dri8%uqq;*qz%!;xsLRyPGyMDoZa_X+fA?E;G)n;1rQAd<=9pzLAkzm43mcV$gciP>0^SI$#(8%9pbF^uNS|i%S zN~;L(V5~=k+}IX4kX_QG<2#2x1SNrM;~Ng@Y{oZ6^xTq6op*OhIEDg}-*!DuVft`n zA{Zq)Qu^JI^@e>?s+^FkXfSC$Rb)!m67rp2#SqrvERfLm;QTalUqpPLyiNr7^p)!K$Y}EhILqw%eowdlce6*?TzQ`_@nF~Ai>2DA+lUDC z#-yF!Wx4Yccm479SX74ix!lN`a1f+(sn4>1lgZNOGuGU81j9|5S1h;W#|tK-5?96vl*gFKp?RM!A`=M9i{xF%O03ML;ow-a>po z0T<>0hac{4RSj|lTo@~zQu;Yjaoi>kjXCTjf}25KwHhA=Xt{)bi00sx5A!Hc%FQgi zcTMi?{z1RWS*+5Ld`Zvh!W!<(0|4N}8UZ;ReG3!kJihbU#UR}|FoWp2)oO=*(Px|r z#Nre^0f0_u<8>OSC2PA?e08H!&Y?S)oMhlPaL)DH*a^b=}0eIlTzCfyIDiOq1 zs8FwEl~w94a~@~mSg)ru%FkhXb^`)QaCvE7Ybh)8f+l3BX;q`w_qM{XzqAc&zL+k= z>NVWL|0O!f&LEAGtOTl0`Ryy850e;pR-RMU>s9$FcJDNR`Mbjr;s+j^;PaJZMiOc~ z7a7h~gxC5g&xVN(4s6i+DTh<^0+2o*#4EK=jyVhpyv)+rvV|_Dlw#yhHLS;ODXL2# z5hV^~gq{QL+d45}Yme2VHG?ZXvldh5L9QS*=GGDwX%WrZqS3JUaI=l8&q3Q3gIse% zY%2`wm64MH+*NlY_GECKUMoNg?d;pccgNfdtr~VnlJH%V*vg%qE*IW06TY4g56zWu zov0P0ZOk7T@^r-Y5y^^iqL1!}vShA1rc{{ZYv%ur z_vNa;fx}7{j&9d+NdyG-GL6buWW#Fje6H$?`R9|}qN9f5PN7|?FR|Y@;+w)v+?jU? zTII{hfl>c0MWfKb(t>VV>GjXk0phr858>RJ;5mCK>XqL*95APR>SQXN!khp(oETeO zh9(#LR<&QnOPP0D1rRZD1YNPPZkvMGbt(%^s{S@Veazf5=&yx<`chvwjnqo<$Lt;Q zw#4^EGH(s7zL7QawFd{0%C&{{zKyv^eq$xkJc({dnm&GQ-|_S~^~%x)eE(OX`ZtTm zc<#xDOn-u+4*&)-V}gE1!J17|Qs!5>)7+wF20bb0Icey>dk+F*Bp4wrDIQIQyPLkC68aLxVI+SeMe?=s$?nmY=M#Swymj zf;C!i`=8JXi?}pq~%}Hk-7UIalHu%hWr_fPp+xVIkd_CjbF{{HXEX8ND4XV zhHTT>?~QDIxZA3BW6qnxE-ECpM8=A_j)?&m77f6&qFg3H3I*e=^rNTg$YIR3cyQn` zBt09t=I$m2g8hkJe9yekP$#feqqWd9tsocd3j)V2L<30eBPc2| zc@_-;rgoK}NR?IBL_ z%1kE0iy%d&WRu_uV)b$MTVh4{o1x9ELiHsVX{{`BaG+#fxEIbk20*)L^OZZK9N4hK z-mo^=U&r!Eah0AJyDQT$`{)ldXrgM|F$|-W>6L3oPjhx@)>AbNQS={)KL%BG#hRRmS3?XwU^itzJn ze(IY)5Q4>x9eO4$<$3SsBf7;fuWJ%iGNQrI3F`_^r~GIQCW?Wjs@{@*{4*rhl3KX4 zU4gL0^9L>GEcj_o=QU;QD0(X0{cHWbWjiCD$A%TN5X`HW2odyYai6w=9&e?hzeP}H z=+E$U>Yd3alVnb-%4<_3~ZrGV+qhEC}cz!eT|zp5xtmpa@E# zmxK7|a#p!v4b&9a=zuLiQ^jm|VB_9nMB{!MgR}{1lQAfttB+S4*wkPNCuvqq2O?T(ADG=8k&t=5Ko>wOzAA zLFwQvg?YqgBcEnUj{l_lUQlG-Q_xc!M*@Hjyy7;8Q>*`4{>b1v_=(>|#gIFw)YOjO zS6VN!DJIQi_mx~mRiobKP}*Ehki(~>w2NTNU-z;Ih&VQ*lEqNT*$zc1^(4>9WozK} z;1ii=s4`$A-H%H6v@#9y$~pG)^c5(crCr;Dl*ifkt}M^wAB1MF1Cy)fPQwdDNt;5@ z9Ii6~K$59d?FI*Es+Lm^=0-VA+Ni=T#F&{^ClU@5uu~R}L#08{)_yYiAypmMC%)sG zhtgkQ{?_2iPv(ZEs~(<$hErN1qJAPz^T%2qNLHvMvfFmm60_gYQ>hf$>3Zb+I+`20dB>x` zHyjz}R5OzWtsTYh^V#=mP<&ZaL0wRbA^Fo{vOPj=`0IG%`^{^cH*3bV+su&@9d z3ljGmuao!^R7wtAQ&9EnP~k(_B!$o4@~gsL`p36@TwIQ z)s3LrK)Hu!5H%>x&3Zm_l^f81OmE18-|($^X~Zi~yVT+xuPwy7_wp-52HfDA?&_q| z;VD^ky<+e0i1tpu$nOg;>?smSqz9z91F4sO7QTA`EA=Ty(irkvR&r`mz2 z@NyE_HpYbNhFjk{{U0|oV`ssr(6ymOyXcx(ITmZvK9N_nUTz2L%6F2-%>27%AI-?G zY~=2rS>RK|rnO}J%E_|EF!|>alA-*hz;E|oXCHRP%C^uyj^rpX+8q5h)9E#)F^hrXhGggI(B zrX^UNTsLOTB#ot=zjEQf)(WSH)BhjA|8k=M delta 3925 zcmai%_dnE+1II~LU1s^nd^?<#aoHoC5E&Uo&d$ipJi_7Zv)6sB!dcmbGcw91dqqfA zX54Xg)){?2|H1e7=i~M3^Yx4(6rkZK*Pi ze25D$?xKU8?%Y%$z zY`SXedO!^=YjaOi4{jhnR_=jg0xV##RSmG7%%3?k!$z*$!qR(^Vz-A#iPEDBINR|j z^_{RTb&!R+-2K#BBsP7yW(Itkl`X?_&F{c(V+ic$$g$iZ+NmMDs(A2={SPj=|p4BS)I*- z*jm5H;tS}Z-QKfYuW8^8p;MN4F$jC)HD%C1BWZnQjmfqIU_aqiOaeZI4(gOS2_Y{z*uCn_8@Ek-$>4;hW0?`Mdk&MS%%4;P($_g#uZLvg2; z03E8ld5Q`D7TeDfqB(i9AeaB!JJZ1J^L_{-%*t1%X28Y9N0=14RUgT-v;d2+-ZXg@ zvr4<*utsllZZLJ7ht0|>+%`M~OdZ`{Dt##5BzIe^da3N!;oMI0P@LDytJuGX6>f83 zxvP=+E5RRarn@)0kOvtN%HJDbq}A}Q_m!g9?*D12{nwNJ?k4euyhoq)$}O%{gu#!B zyKLE1Uhk>x!AV@~YbKIQ zY;61NdTri-CvN{(+b%+ik{=m|m%4rw>etTHhg?Gu$|G%AVLzbl65Vo09b|aJZ%k zK)XGl*o@&W)5~Do*8Sg=FLfJS57qhL`Kj9eZC}X7Y+F_F?=bVP0ARpCPU7zRGW9o# z{+XXz2Q%vQ*R?MdMVLBmOl7{f8Q?MOPB7pR*d=C~E+v8RuR@E0p4zzGI>A}kp~4{~P-e`q`=xNt7sao#gezmeENpk_>q+sRCvNcy~E+>Q28 z$LS<&w|)$=vvXRj9T65*Z$d8Hi}1~0PMrXNz8Z}mHM#c$>x0mGhddZEA9fX3`c%TG zcJ9BOVBS)MQE&zT8ZDc|XKa&w!3AGZ4zL`mjP~m5QbGP)^@cE3cTk9KD-DV3$SrZcd1J?u{P`vYgE8bQ znc-t+wET5Z5zPF5BcKLpS|s-JxFlVr?{B-H{2l&OceI=DuQ{saEz{ zX*J+w!m<*|e>|`(A+I1?z>Zr%&~6x=V|Y);{<9O=G=+OP3Y__9fJZZL{hzUsK?&LAHL zBIeYs0OfHddv`fEFlQH(^#OL4qL?v0LHYqoHlg5v4cW%Sw*=?T>K51yy6b&UGFv-h zE<~i;h#5j&H1NfHiqBkVf@aNCPsUi?^Za;xUT$7nHfb;eJ&(on)D zJ>Y%zjXj<;(2S_CQtE@E&PL$^@B>KStYFPJpUs`au?V@R{uu|y;+N?hmtqWfMgGy7 zqP}g(=6)hb(i~3VH9!Y&Q|e#*Sjzqr&}mj)8uHAtwi)b`%X)Is$~@9ts3+r49NIOiQZi)>Pd-9FS=;If0msBNvL#K;60^4;e8MI}p8 znaCaG^d&ShiDceFHIi`N?DFr2;w80xzus}#y9%b1ucPRTr7_Ksx3fZsTi(o%&{i`; zj&q1NT|q8rA&KYS1E4!u;;Vgx{Iu{m>4935r4=S10)X`!Dy4e_drrA|s(gtmnaHkF z&}2wi@fGY*X^%F-N*I{3xi^2yMjk&SV)wDxg4UlIH`%{xmP`gV8KkWNms^HyUmDkAL?Zw{ytmG$>Tol8|8gt5a>*=lrfj_+VFA*=e& zwK_9=zmtxL&anIt)4q&WNtcKIRgZ33Cs=mvqBc^1K4JDj@eN~TTv9+g#4 z0!m#?M7DP32_U~wPUkwAo3p2cO8<4bCE|4vWg^~uaDsb~#uS~D_0C34+7j9yE8#ik z115+(j#+hUvbZnPtA9GUk*4L{mhz)y`SlmOke&l0pBMfg7V+=CB0iuz@zBL`-)j(r zcRlGG61uT{2>ucm`;+-tC7nrr?LA%ajXGLJApGU?QA-d5fwzbzh zu|4tH4tmPPLcIXH!^Z4apjQ_1pfK zT7o!3hl54-1%Y)twgRIN0{|~&`|g-ExE4qV@!3OVaCevjg@%0@{suA(sm6yeenMjc zVgS?_bON!S^VL?Tlr>JU;gd4Kx}cCE6M=O2{qyh0L&?}pid%E^YOhZUFQ%kw#>Je) zH&nE$G8dLCKmt!kj=?X@d>oY)!rCpBJrnQ|k-=?{$ZtpG13fw(^aUbqW>Pee7`8QG z@uS&KY5a53ac08wh8H(7Bff*i|297LS{kQR!my(O)H-bi=Iwb|zg|3l6~j>+cMh=) z$KovA4(<~8$(Q^iWZR1>LQrwWU!za%4Zd&l_=Yw8+ii)H65E$ zXiq|Wb-SQg8(+{G5f@NLc$Q@LZGVjcNGrtpr}R!zx35-x{-_#lKMObBye7k(j!TYm zG!R0Gzg zVEC6_4yZ^{r-IX(b$)&QXLh02&dg5IfAMj5?In{cz_1|vh9t4op3kGcOCw5FYfiG2 z#3{svQKB$U+f95z$kt<*f%e5&QYlS*#YGn7L>S%Wo-4ClQo|75mTQ)P(#|HAf5&38JUqE&xCaS;Oil826z{_?G@UwMjZ2q|4@aq$t;RR@l?Hbz0w z`fk{Wx$xMG_gE>6Sv-JB!yC&A_s=#et-q1gr{nkp#_nBk!VXKTnAqRfmTV)YTB}Y$ z8c+kD7X5|bvI2b(YklG$Uj3%^J>*$3%$<4SG*h~!F#It%tDNSrbek6wY#%OhQY^7yaQG@ zFDv29NLHc0ACRgsCRU4~w0SG-<%O~?%Ej(-=WRE`JCn(1+Fyi3!42*8{7(-aJ6s~V z@@GGNr>9-3zQH)jCvzise)Ez}YuT;sxX@6Kk3$j;l8~Z2JAI zt-D021Vm^Y%23o+^X z?o_yom7auIP`H z^qYI|)ZD=cmqnj;xxjlR*sECHXV_Bydt!D=;Nh!qQHk5mLU^MOjvpQ*A>1>C3NtD# zh5L>SskkCKG)_GjC*FH#x87)|VGVlpYCtshk4CBZQ8YW9mcp+D*5MA_C}KN$1qPol zEqCO>%_oLY!A8NVrkZk-32L5_sqV^4@3Qn>^|7MAC|GvgQ%PYUzfu+Sb1J(n, + tip_rx: watch::Receiver, state: Arc, tx: &mpsc::Sender>, ) -> Result<(), StateSubscriptionError> { - let mut next = from; - loop { - let mut tip = *tip_rx.borrow_and_update(); - while next <= tip { - let block = fetch_block(next, &cache, &state).await?; - tip = *tip_rx.borrow_and_update(); - if tx - .send(Ok(BlockSubscriptionEvent { block, committed_chain_tip: tip })) - .await - .is_err() - { - return Ok(()); - } - next = next.child(); - } - if tip_rx.changed().await.is_err() { - return Ok(()); - } - } + run_stream( + from, + tip_rx, + tx, + |block_num| { + let cache = cache.clone(); + let state = Arc::clone(&state); + async move { fetch_block(block_num, &cache, &state).await } + }, + |_, block, committed_chain_tip| BlockSubscriptionEvent { + block, + committed_chain_tip, + }, + ) + .await } async fn run_proof_stream( from: BlockNumber, cache: ProofCache, - mut tip_rx: watch::Receiver, + tip_rx: watch::Receiver, state: Arc, tx: &mpsc::Sender>, ) -> Result<(), StateSubscriptionError> { + run_stream( + from, + tip_rx, + tx, + |block_num| { + let cache = cache.clone(); + let state = Arc::clone(&state); + async move { fetch_proof(block_num, &cache, &state).await } + }, + |block_num, proof, proven_chain_tip| ProofSubscriptionEvent { + block_num, + proof, + proven_chain_tip, + }, + ) + .await +} + +/// Drives a generic subscription stream, replaying history then following live tip advances. +/// +/// Calls `fetch` for each block in sequence starting from `from`, builds an event with +/// `build_event(block_num, data, tip)`, and sends it to `tx`. Disconnects the subscriber +/// with [`StateSubscriptionError::TooSlow`] if sending blocks for [`MAX_SLOW_STRIKES`] +/// consecutive [`SEND_TIMEOUT`] intervals. +async fn run_stream( + from: BlockNumber, + mut tip_rx: watch::Receiver, + tx: &mpsc::Sender>, + fetch: F, + build_event: impl Fn(BlockNumber, Vec, BlockNumber) -> E, +) -> Result<(), StateSubscriptionError> +where + F: Fn(BlockNumber) -> Fut, + Fut: Future, StateSubscriptionError>>, +{ let mut next = from; + let mut slow_strikes = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let proof = fetch_proof(next, &cache, &state).await?; + let data = fetch(next).await?; tip = *tip_rx.borrow_and_update(); - if tx - .send(Ok(ProofSubscriptionEvent { - block_num: next, - proof, - proven_chain_tip: tip, - })) - .await - .is_err() - { - return Ok(()); - } + let permit = loop { + match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { + Ok(Ok(permit)) => { + slow_strikes = 0; + break permit; + } + Ok(Err(_)) => return Ok(()), + Err(_) => { + slow_strikes += 1; + if slow_strikes >= MAX_SLOW_STRIKES { + return Err(StateSubscriptionError::TooSlow); + } + } + } + }; + permit.send(Ok(build_event(next, data, tip))); next = next.child(); } if tip_rx.changed().await.is_err() { diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 96e81b49a..4b96787aa 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,4 +1,9 @@ pub mod block_cache; + +/// The default block production interval. +/// +/// Used as a timing reference across crates (e.g. subscription send timeouts). +pub const DEFAULT_BLOCK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(3); pub mod clap; pub mod cors; pub mod crypto; From a042c4f2f347d7c3f8b3629ab5cb6a3d0a4356c0 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:13:35 +1200 Subject: [PATCH 02/15] Add const for capacity --- crates/store/src/state/subscription.rs | 6 ++++-- 1 file changed, 4 insertions(+), 2 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index d94d2e9fa..f0114e6f5 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -13,6 +13,8 @@ use tokio_stream::wrappers::ReceiverStream; use super::{BlockCache, ProofCache, State}; use crate::errors::DatabaseError; +/// Buffered messages per subscriber before back-pressure begins and slow-strike timeouts apply. +const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; /// How long to wait for a subscriber to accept a message before counting a strike. const SEND_TIMEOUT: Duration = DEFAULT_BLOCK_INTERVAL; /// Number of consecutive send timeouts before a subscriber is considered too slow and disconnected. @@ -95,7 +97,7 @@ fn build_block_stream( tip_rx: watch::Receiver, state: Arc, ) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); tokio::spawn(async move { if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await { let _ = tx.send(Err(err)).await; @@ -110,7 +112,7 @@ fn build_proof_stream( tip_rx: watch::Receiver, state: Arc, ) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(32); + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); tokio::spawn(async move { if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await { let _ = tx.send(Err(err)).await; From f002548204daa9667ea94ba15be7ea6ea28cdda8 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 14:18:29 +1200 Subject: [PATCH 03/15] Lint --- crates/store/src/state/subscription.rs | 9 +++------ 1 file changed, 3 insertions(+), 6 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index f0114e6f5..afd3258db 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -140,10 +140,7 @@ async fn run_block_stream( let state = Arc::clone(&state); async move { fetch_block(block_num, &cache, &state).await } }, - |_, block, committed_chain_tip| BlockSubscriptionEvent { - block, - committed_chain_tip, - }, + |_, block, committed_chain_tip| BlockSubscriptionEvent { block, committed_chain_tip }, ) .await } @@ -202,14 +199,14 @@ where Ok(Ok(permit)) => { slow_strikes = 0; break permit; - } + }, Ok(Err(_)) => return Ok(()), Err(_) => { slow_strikes += 1; if slow_strikes >= MAX_SLOW_STRIKES { return Err(StateSubscriptionError::TooSlow); } - } + }, } }; permit.send(Ok(build_event(next, data, tip))); From 15ab59c4f64ae0de272d840344b04d91d1179480 Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 4 Jun 2026 17:06:32 +1200 Subject: [PATCH 04/15] Add SubscriptionSource trait --- crates/store/src/state/subscription.rs | 200 +++++++++++-------------- 1 file changed, 85 insertions(+), 115 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index afd3258db..cf9e0599b 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -68,131 +68,131 @@ impl State { /// Streams committed blocks starting from `from`, replaying historical blocks first and then /// following live commits. pub fn block_subscription(self: &Arc, from: BlockNumber) -> BlockSubscriptionStream { - Box::pin(build_block_stream( + Box::pin(build_stream( from, - self.block_cache.clone(), self.subscribe_committed_tip(), - Arc::clone(self), + BlockSource { cache: self.block_cache.clone(), state: Arc::clone(self) }, )) } /// Streams block proofs starting from `from`, replaying historical proofs first and then /// following newly proven blocks. pub fn proof_subscription(self: &Arc, from: BlockNumber) -> ProofSubscriptionStream { - Box::pin(build_proof_stream( + Box::pin(build_stream( from, - self.proof_cache.clone(), self.subscribe_proven_tip(), - Arc::clone(self), + ProofSource { cache: self.proof_cache.clone(), state: Arc::clone(self) }, )) } } -// STREAM BUILDERS +// SUBSCRIPTION SOURCE // ================================================================================================ -fn build_block_stream( - from: BlockNumber, +trait SubscriptionSource: Send + Sync + 'static { + type Event: Send + 'static; + + fn fetch( + &self, + block_num: BlockNumber, + ) -> impl Future, StateSubscriptionError>> + Send + '_; + + fn build_event(&self, block_num: BlockNumber, data: Vec, tip: BlockNumber) -> Self::Event; +} + +struct BlockSource { cache: BlockCache, - tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); - tokio::spawn(async move { - if let Err(err) = run_block_stream(from, cache, tip_rx, state, &tx).await { - let _ = tx.send(Err(err)).await; +} + +impl SubscriptionSource for BlockSource { + type Event = BlockSubscriptionEvent; + + async fn fetch(&self, block_num: BlockNumber) -> Result, StateSubscriptionError> { + if let Some(entry) = self.cache.get(block_num) { + return Ok(entry.block_bytes().to_vec()); } - }); - ReceiverStream::new(rx) + self.state + .load_block(block_num) + .await + .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })? + .ok_or(StateSubscriptionError::BlockNotFound(block_num)) + } + + fn build_event( + &self, + _block_num: BlockNumber, + block: Vec, + committed_chain_tip: BlockNumber, + ) -> BlockSubscriptionEvent { + BlockSubscriptionEvent { block, committed_chain_tip } + } } -fn build_proof_stream( - from: BlockNumber, +struct ProofSource { cache: ProofCache, - tip_rx: watch::Receiver, state: Arc, -) -> impl Stream> + Send + 'static { - let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); - tokio::spawn(async move { - if let Err(err) = run_proof_stream(from, cache, tip_rx, state, &tx).await { - let _ = tx.send(Err(err)).await; - } - }); - ReceiverStream::new(rx) } -// STREAM TASKS -// ================================================================================================ +impl SubscriptionSource for ProofSource { + type Event = ProofSubscriptionEvent; -async fn run_block_stream( - from: BlockNumber, - cache: BlockCache, - tip_rx: watch::Receiver, - state: Arc, - tx: &mpsc::Sender>, -) -> Result<(), StateSubscriptionError> { - run_stream( - from, - tip_rx, - tx, - |block_num| { - let cache = cache.clone(); - let state = Arc::clone(&state); - async move { fetch_block(block_num, &cache, &state).await } - }, - |_, block, committed_chain_tip| BlockSubscriptionEvent { block, committed_chain_tip }, - ) - .await + async fn fetch(&self, block_num: BlockNumber) -> Result, StateSubscriptionError> { + if let Some(entry) = self.cache.get(block_num) { + return Ok(entry.proof_bytes().to_vec()); + } + self.state + .load_proof(block_num) + .await + .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })? + .ok_or(StateSubscriptionError::ProofNotFound(block_num)) + } + + fn build_event( + &self, + block_num: BlockNumber, + proof: Vec, + proven_chain_tip: BlockNumber, + ) -> ProofSubscriptionEvent { + ProofSubscriptionEvent { block_num, proof, proven_chain_tip } + } } -async fn run_proof_stream( +// STREAM +// ================================================================================================ + +fn build_stream( from: BlockNumber, - cache: ProofCache, tip_rx: watch::Receiver, - state: Arc, - tx: &mpsc::Sender>, -) -> Result<(), StateSubscriptionError> { - run_stream( - from, - tip_rx, - tx, - |block_num| { - let cache = cache.clone(); - let state = Arc::clone(&state); - async move { fetch_proof(block_num, &cache, &state).await } - }, - |block_num, proof, proven_chain_tip| ProofSubscriptionEvent { - block_num, - proof, - proven_chain_tip, - }, - ) - .await + source: S, +) -> impl Stream> + Send + 'static { + let (tx, rx) = mpsc::channel(SUBSCRIBER_CHANNEL_CAPACITY); + tokio::spawn(async move { + if let Err(err) = run_stream(from, tip_rx, &tx, source).await { + let _ = tx.send(Err(err)).await; + } + }); + ReceiverStream::new(rx) } /// Drives a generic subscription stream, replaying history then following live tip advances. /// -/// Calls `fetch` for each block in sequence starting from `from`, builds an event with -/// `build_event(block_num, data, tip)`, and sends it to `tx`. Disconnects the subscriber -/// with [`StateSubscriptionError::TooSlow`] if sending blocks for [`MAX_SLOW_STRIKES`] -/// consecutive [`SEND_TIMEOUT`] intervals. -async fn run_stream( +/// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an +/// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the +/// subscriber with [`StateSubscriptionError::TooSlow`] if sending blocks for +/// [`MAX_SLOW_STRIKES`] consecutive [`SEND_TIMEOUT`] intervals. +async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, - tx: &mpsc::Sender>, - fetch: F, - build_event: impl Fn(BlockNumber, Vec, BlockNumber) -> E, -) -> Result<(), StateSubscriptionError> -where - F: Fn(BlockNumber) -> Fut, - Fut: Future, StateSubscriptionError>>, -{ + tx: &mpsc::Sender>, + source: S, +) -> Result<(), StateSubscriptionError> { let mut next = from; let mut slow_strikes = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let data = fetch(next).await?; + let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); let permit = loop { match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { @@ -209,7 +209,7 @@ where }, } }; - permit.send(Ok(build_event(next, data, tip))); + permit.send(Ok(source.build_event(next, data, tip))); next = next.child(); } if tip_rx.changed().await.is_err() { @@ -217,33 +217,3 @@ where } } } - -async fn fetch_block( - block_num: BlockNumber, - cache: &BlockCache, - state: &State, -) -> Result, StateSubscriptionError> { - if let Some(entry) = cache.get(block_num) { - return Ok(entry.block_bytes().to_vec()); - } - state - .load_block(block_num) - .await - .map_err(|source| StateSubscriptionError::BlockLoad { block_num, source })? - .ok_or(StateSubscriptionError::BlockNotFound(block_num)) -} - -async fn fetch_proof( - block_num: BlockNumber, - cache: &ProofCache, - state: &State, -) -> Result, StateSubscriptionError> { - if let Some(entry) = cache.get(block_num) { - return Ok(entry.proof_bytes().to_vec()); - } - state - .load_proof(block_num) - .await - .map_err(|source| StateSubscriptionError::ProofLoad { block_num, source })? - .ok_or(StateSubscriptionError::ProofNotFound(block_num)) -} From f5b443877ca57b25e999c7a716409f769469f51b Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:29:54 +1200 Subject: [PATCH 05/15] Use block limit logic --- crates/block-producer/src/lib.rs | 2 +- crates/store/src/state/subscription.rs | 51 +++++++++++++------------- crates/utils/src/lib.rs | 4 -- 3 files changed, 27 insertions(+), 30 deletions(-) diff --git a/crates/block-producer/src/lib.rs b/crates/block-producer/src/lib.rs index c9f460b50..35691a6e6 100644 --- a/crates/block-producer/src/lib.rs +++ b/crates/block-producer/src/lib.rs @@ -64,7 +64,7 @@ const SERVER_MEMPOOL_EXPIRATION_SLACK: u32 = 2; const CACHED_MEMPOOL_STATS_UPDATE_INTERVAL: Duration = Duration::from_secs(5); /// How often a block is created. -pub use miden_node_utils::DEFAULT_BLOCK_INTERVAL; +pub const DEFAULT_BLOCK_INTERVAL: Duration = Duration::from_secs(3); /// How often a batch is created. pub const DEFAULT_BATCH_INTERVAL: Duration = Duration::from_secs(1); diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index cf9e0599b..cb078dfc4 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -3,7 +3,6 @@ use std::pin::Pin; use std::sync::Arc; use std::time::Duration; -use miden_node_utils::DEFAULT_BLOCK_INTERVAL; use miden_protocol::block::BlockNumber; use thiserror::Error; use tokio::sync::{mpsc, watch}; @@ -13,12 +12,12 @@ use tokio_stream::wrappers::ReceiverStream; use super::{BlockCache, ProofCache, State}; use crate::errors::DatabaseError; -/// Buffered messages per subscriber before back-pressure begins and slow-strike timeouts apply. +/// Buffered messages per subscriber before back-pressure begins. const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; -/// How long to wait for a subscriber to accept a message before counting a strike. -const SEND_TIMEOUT: Duration = DEFAULT_BLOCK_INTERVAL; -/// Number of consecutive send timeouts before a subscriber is considered too slow and disconnected. -const MAX_SLOW_STRIKES: u32 = 5; +/// Number of blocks beyond the smallest gap observed so far before a subscriber is disconnected. +const MAX_SLOW_GAP: u32 = 100; +/// Safety-net timeout for a single send when the client has stalled. +const SEND_TIMEOUT: Duration = Duration::from_secs(10); // SUBSCRIPTION EVENTS // ================================================================================================ @@ -71,7 +70,10 @@ impl State { Box::pin(build_stream( from, self.subscribe_committed_tip(), - BlockSource { cache: self.block_cache.clone(), state: Arc::clone(self) }, + BlockSource { + cache: self.block_cache.clone(), + state: Arc::clone(self), + }, )) } @@ -81,7 +83,10 @@ impl State { Box::pin(build_stream( from, self.subscribe_proven_tip(), - ProofSource { cache: self.proof_cache.clone(), state: Arc::clone(self) }, + ProofSource { + cache: self.proof_cache.clone(), + state: Arc::clone(self), + }, )) } } @@ -179,8 +184,9 @@ fn build_stream( /// /// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an /// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the -/// subscriber with [`StateSubscriptionError::TooSlow`] if sending blocks for -/// [`MAX_SLOW_STRIKES`] consecutive [`SEND_TIMEOUT`] intervals. +/// subscriber with [`StateSubscriptionError::TooSlow`] if the gap between the tip and the next +/// block to send exceeds the minimum gap ever observed plus [`MAX_SLOW_GAP`], or if a single send +/// blocks for longer than [`SEND_TIMEOUT`] (safety net for a stalled client). async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, @@ -188,26 +194,21 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut slow_strikes = 0u32; + let mut min_gap = u32::MAX; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { + let gap = tip.as_u32() - next.as_u32(); + min_gap = min_gap.min(gap); + if gap > min_gap + MAX_SLOW_GAP { + return Err(StateSubscriptionError::TooSlow); + } let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); - let permit = loop { - match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { - Ok(Ok(permit)) => { - slow_strikes = 0; - break permit; - }, - Ok(Err(_)) => return Ok(()), - Err(_) => { - slow_strikes += 1; - if slow_strikes >= MAX_SLOW_STRIKES { - return Err(StateSubscriptionError::TooSlow); - } - }, - } + let permit = match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { + Ok(Ok(permit)) => permit, + Ok(Err(_)) => return Ok(()), + Err(_) => return Err(StateSubscriptionError::TooSlow), }; permit.send(Ok(source.build_event(next, data, tip))); next = next.child(); diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 4b96787aa..840de3887 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,9 +1,5 @@ pub mod block_cache; -/// The default block production interval. -/// -/// Used as a timing reference across crates (e.g. subscription send timeouts). -pub const DEFAULT_BLOCK_INTERVAL: std::time::Duration = std::time::Duration::from_secs(3); pub mod clap; pub mod cors; pub mod crypto; From 173419382cd45ca5406beafa7977492445a25fa5 Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:34:59 +1200 Subject: [PATCH 06/15] RM whitespace --- crates/utils/src/lib.rs | 1 - 1 file changed, 1 deletion(-) diff --git a/crates/utils/src/lib.rs b/crates/utils/src/lib.rs index 840de3887..96e81b49a 100644 --- a/crates/utils/src/lib.rs +++ b/crates/utils/src/lib.rs @@ -1,5 +1,4 @@ pub mod block_cache; - pub mod clap; pub mod cors; pub mod crypto; From 409dd2d9a96a84bca5722cf6598e5d1cf8c45834 Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 5 Jun 2026 12:42:26 +1200 Subject: [PATCH 07/15] Changelog --- CHANGELOG.md | 1 + 1 file changed, 1 insertion(+) diff --git a/CHANGELOG.md b/CHANGELOG.md index b721b86b1..45018e2bb 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -57,6 +57,7 @@ - Persisted attachments of private output notes when applying a block, so they are now returned by `GetNotesById` ([#2172](https://github.com/0xMiden/node/pull/2172)). - [BREAKING] `miden-ntx-builder` now requires a remote transaction prover to be configured ([#2179](https://github.com/0xMiden/node/pull/2179)). - [BREAKING] Replaced `StoreStatus` with `chain_tip` field in `RpcStatus` ([#2187](https://github.com/0xMiden/node/pull/2187)). +- Added logic to disconnect slow block and proof stream clients ([#2196](https://github.com/0xMiden/node/pull/2196)). ## v0.14.11 (TBD) From ef7f63c3336d1ab5581a381b385440ad5d168125 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 8 Jun 2026 10:39:30 +1200 Subject: [PATCH 08/15] RM gap logic --- crates/store/src/state/subscription.rs | 13 ++----------- 1 file changed, 2 insertions(+), 11 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index cb078dfc4..700b605da 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -14,8 +14,6 @@ use crate::errors::DatabaseError; /// Buffered messages per subscriber before back-pressure begins. const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; -/// Number of blocks beyond the smallest gap observed so far before a subscriber is disconnected. -const MAX_SLOW_GAP: u32 = 100; /// Safety-net timeout for a single send when the client has stalled. const SEND_TIMEOUT: Duration = Duration::from_secs(10); @@ -184,9 +182,8 @@ fn build_stream( /// /// Calls [`SubscriptionSource::fetch`] for each block in sequence starting from `from`, builds an /// event with [`SubscriptionSource::build_event`], and sends it to `tx`. Disconnects the -/// subscriber with [`StateSubscriptionError::TooSlow`] if the gap between the tip and the next -/// block to send exceeds the minimum gap ever observed plus [`MAX_SLOW_GAP`], or if a single send -/// blocks for longer than [`SEND_TIMEOUT`] (safety net for a stalled client). +/// subscriber with [`StateSubscriptionError::TooSlow`] if a single send blocks for longer than [`SEND_TIMEOUT`] +/// which may occur only after the buffer has [`SUBSCRIBER_CHANNEL_CAPACITY`] blocks queued. async fn run_stream( from: BlockNumber, mut tip_rx: watch::Receiver, @@ -194,15 +191,9 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut min_gap = u32::MAX; loop { let mut tip = *tip_rx.borrow_and_update(); while next <= tip { - let gap = tip.as_u32() - next.as_u32(); - min_gap = min_gap.min(gap); - if gap > min_gap + MAX_SLOW_GAP { - return Err(StateSubscriptionError::TooSlow); - } let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); let permit = match tokio::time::timeout(SEND_TIMEOUT, tx.reserve()).await { From 7385ce58cdd2d23ed45b56e1c96baf72f2272f54 Mon Sep 17 00:00:00 2001 From: sergerad Date: Mon, 8 Jun 2026 14:10:50 +1200 Subject: [PATCH 09/15] Update docker cache line --- Dockerfile | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index 5059c4bc5..cc14b5d4e 100644 --- a/Dockerfile +++ b/Dockerfile @@ -34,13 +34,13 @@ COPY --from=planner /app/recipe.json recipe.json # caches are fragile when concurrent CI builds race or a build is interrupted. RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,sharing=locked,target=/app/target \ + --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ cargo chef cook --release --recipe-path recipe.json # Build application COPY . . RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,sharing=locked,target=/app/target \ + --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ cargo build --release --locked --bin ${BIN} && \ mkdir -p /app/bin && \ cp /app/target/release/${BIN} /app/bin/${BIN} From 5cd5a3260561185d91870dfd79d5e19b8aa4225e Mon Sep 17 00:00:00 2001 From: sergerad Date: Tue, 9 Jun 2026 13:39:10 +1200 Subject: [PATCH 10/15] Update Dockerfile --- Dockerfile | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/Dockerfile b/Dockerfile index d8156decf..54b7ea09c 100644 --- a/Dockerfile +++ b/Dockerfile @@ -27,6 +27,10 @@ RUN cargo chef prepare --recipe-path recipe.json FROM chef AS builder ARG BIN +# Disable incremental compilation: Docker normalises COPY timestamps, which +# breaks Rust's mtime-based fingerprinting and causes stale .rlib reuse. +# The /app/target cache still accelerates builds via pre-compiled dep .rlibs. +ENV CARGO_INCREMENTAL=0 COPY --from=planner /app/recipe.json recipe.json # Build dependencies while preserving Cargo artifacts across layer invalidations. # @@ -34,13 +38,13 @@ COPY --from=planner /app/recipe.json recipe.json # caches are fragile when concurrent CI builds race or a build is interrupted. RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ + --mount=type=cache,sharing=locked,target=/app/target \ cargo chef cook --release --recipe-path recipe.json # Build application COPY . . RUN --mount=type=cache,sharing=locked,target=/usr/local/cargo/registry \ --mount=type=cache,sharing=locked,target=/usr/local/cargo/git/db \ - --mount=type=cache,id=cargo-target-v2,sharing=locked,target=/app/target \ + --mount=type=cache,sharing=locked,target=/app/target \ cargo build --release --locked --bin ${BIN} && \ mkdir -p /app/bin && \ cp /app/target/release/${BIN} /app/bin/${BIN} From ce66cd3c16cdee4b343925fc7f1f287b6405033c Mon Sep 17 00:00:00 2001 From: sergerad Date: Thu, 11 Jun 2026 12:10:47 +1200 Subject: [PATCH 11/15] Add running total gap logic --- crates/store/src/state/subscription.rs | 102 +++++++++++++++++++++++++ 1 file changed, 102 insertions(+) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 700b605da..ac625d5ef 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -16,6 +16,8 @@ use crate::errors::DatabaseError; const SUBSCRIBER_CHANNEL_CAPACITY: usize = 32; /// Safety-net timeout for a single send when the client has stalled. const SEND_TIMEOUT: Duration = Duration::from_secs(10); +/// Maximum running block-gap allowed before a subscriber is disconnected. +const MAX_RUNNING_GAP: u32 = 100u32; // SUBSCRIPTION EVENTS // ================================================================================================ @@ -191,8 +193,14 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; + let mut previous_gap = 0u32; + let mut running_gap = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); + + let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); + (previous_gap, running_gap) = check_growing_gap(current_gap, previous_gap, running_gap)?; + while next <= tip { let data = source.fetch(next).await?; tip = *tip_rx.borrow_and_update(); @@ -209,3 +217,97 @@ async fn run_stream( } } } + +/// Tracks how many blocks a subscriber's gap to the tip has grown across consecutive checks. +/// +/// Tracks a running total of how far a subscriber's gap to the tip has grown. +/// +/// The total increases by the block-count delta each time the gap grows, and decreases by the +/// delta each time it shrinks (saturating at zero). Returns updated `(previous_gap, running_gap)` +/// on success, or [`StateSubscriptionError::TooSlow`] once the running total exceeds +/// [`MAX_RUNNING_GAP`]. +fn check_growing_gap( + current_gap: u32, + previous_gap: u32, + running_gap: u32, +) -> Result<(u32, u32), StateSubscriptionError> { + let running_gap = if current_gap > previous_gap { + running_gap + (current_gap - previous_gap) + } else { + running_gap.saturating_sub(previous_gap - current_gap) + }; + if running_gap > MAX_RUNNING_GAP { + return Err(StateSubscriptionError::TooSlow); + } + Ok((current_gap, running_gap)) +} + +// TESTS +// ================================================================================================ + +#[cfg(test)] +mod tests { + use super::*; + + fn run(gaps: &[u32]) -> Result<(), StateSubscriptionError> { + let mut previous_gap = 0u32; + let mut growth_run = 0u32; + for &gap in gaps { + (previous_gap, growth_run) = check_growing_gap(gap, previous_gap, growth_run)?; + } + Ok(()) + } + + #[test] + fn stable_gap_does_not_accumulate() { + // Gap stays constant — delta is always 0, growth_run never increments. + assert!(run(&[5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5, 5]).is_ok()); + } + + #[test] + fn zero_gap_throughout_is_ok() { + assert!(run(&[0, 0, 0, 0, 0]).is_ok()); + } + + #[test] + fn shrinking_gap_reduces_accumulation() { + // Accumulate close to the limit, then shrink — running total decreases, no error. + assert!(run(&[10, 20, MAX_RUNNING_GAP - 1, 5]).is_ok()); + } + + #[test] + fn exactly_max_growth_run_is_ok() { + // A single jump of exactly MAX_RUNNING_GAP is the boundary — still ok. + assert!(run(&[MAX_RUNNING_GAP]).is_ok()); + } + + #[test] + fn exceeding_max_growth_run_returns_too_slow() { + // One block past the limit triggers TooSlow, even in a single jump. + assert!(matches!(run(&[MAX_RUNNING_GAP + 1]), Err(StateSubscriptionError::TooSlow))); + } + + #[test] + fn growth_spread_across_windows_accumulates() { + // Many small growths that each stay below the limit still trigger TooSlow once they sum + // past MAX_RUNNING_GAP. + let step = MAX_RUNNING_GAP / 4; + let gaps: Vec = (1..=6).map(|i| i * step).collect(); // total growth = 5 * step + assert!(matches!(run(&gaps), Err(StateSubscriptionError::TooSlow))); + } + + #[test] + fn recovery_reduces_and_allows_fresh_accumula30tion() { + // Grow close to the limit, recover most of the way, then grow again — still ok. + let near_limit = MAX_RUNNING_GAP - 1; + assert!(run(&[near_limit, 1, near_limit]).is_ok()); + } + + #[test] + fn token_improvement_does_not_prevent_disconnection() { + // A client that grows by a large amount then shrinks by just one block on each cycle + // accumulates net growth and is eventually disconnected. + let gaps: Vec = (0u32..60).flat_map(|i| [50 + i, 49 + i]).collect(); + assert!(matches!(run(&gaps), Err(StateSubscriptionError::TooSlow))); + } +} From fdfdd8fb83250c7f350c43b53dc33a4c6a58c2de Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 12 Jun 2026 12:49:49 +1200 Subject: [PATCH 12/15] Fix starting gap bug --- crates/store/src/state/subscription.rs | 46 +++++++++++++++++++------- 1 file changed, 34 insertions(+), 12 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index ac625d5ef..929eb38a4 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -130,7 +130,10 @@ impl SubscriptionSource for BlockSource { block: Vec, committed_chain_tip: BlockNumber, ) -> BlockSubscriptionEvent { - BlockSubscriptionEvent { block, committed_chain_tip } + BlockSubscriptionEvent { + block, + committed_chain_tip, + } } } @@ -159,7 +162,11 @@ impl SubscriptionSource for ProofSource { proof: Vec, proven_chain_tip: BlockNumber, ) -> ProofSubscriptionEvent { - ProofSubscriptionEvent { block_num, proof, proven_chain_tip } + ProofSubscriptionEvent { + block_num, + proof, + proven_chain_tip, + } } } @@ -193,13 +200,17 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut previous_gap = 0u32; + let mut previous_gap: Option = None; let mut running_gap = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); - (previous_gap, running_gap) = check_growing_gap(current_gap, previous_gap, running_gap)?; + (previous_gap, running_gap) = check_growing_gap( + current_gap, + previous_gap.unwrap_or(current_gap), + running_gap, + )?; while next <= tip { let data = source.fetch(next).await?; @@ -230,7 +241,7 @@ fn check_growing_gap( current_gap: u32, previous_gap: u32, running_gap: u32, -) -> Result<(u32, u32), StateSubscriptionError> { +) -> Result<(Option, u32), StateSubscriptionError> { let running_gap = if current_gap > previous_gap { running_gap + (current_gap - previous_gap) } else { @@ -239,7 +250,7 @@ fn check_growing_gap( if running_gap > MAX_RUNNING_GAP { return Err(StateSubscriptionError::TooSlow); } - Ok((current_gap, running_gap)) + Ok((Some(current_gap), running_gap)) } // TESTS @@ -250,10 +261,11 @@ mod tests { use super::*; fn run(gaps: &[u32]) -> Result<(), StateSubscriptionError> { - let mut previous_gap = 0u32; + let mut previous_gap: Option = None; let mut growth_run = 0u32; for &gap in gaps { - (previous_gap, growth_run) = check_growing_gap(gap, previous_gap, growth_run)?; + (previous_gap, growth_run) = + check_growing_gap(gap, previous_gap.unwrap_or(gap), growth_run)?; } Ok(()) } @@ -275,16 +287,24 @@ mod tests { assert!(run(&[10, 20, MAX_RUNNING_GAP - 1, 5]).is_ok()); } + #[test] + fn starting_above_max_growth_is_ok() { + assert!(run(&[MAX_RUNNING_GAP * 2]).is_ok()); + } + #[test] fn exactly_max_growth_run_is_ok() { // A single jump of exactly MAX_RUNNING_GAP is the boundary — still ok. - assert!(run(&[MAX_RUNNING_GAP]).is_ok()); + assert!(run(&[0, MAX_RUNNING_GAP]).is_ok()); } #[test] fn exceeding_max_growth_run_returns_too_slow() { // One block past the limit triggers TooSlow, even in a single jump. - assert!(matches!(run(&[MAX_RUNNING_GAP + 1]), Err(StateSubscriptionError::TooSlow))); + assert!(matches!( + run(&[0, MAX_RUNNING_GAP + 1]), + Err(StateSubscriptionError::TooSlow) + )); } #[test] @@ -297,7 +317,7 @@ mod tests { } #[test] - fn recovery_reduces_and_allows_fresh_accumula30tion() { + fn recovery_reduces_and_allows_fresh_accumulation() { // Grow close to the limit, recover most of the way, then grow again — still ok. let near_limit = MAX_RUNNING_GAP - 1; assert!(run(&[near_limit, 1, near_limit]).is_ok()); @@ -307,7 +327,9 @@ mod tests { fn token_improvement_does_not_prevent_disconnection() { // A client that grows by a large amount then shrinks by just one block on each cycle // accumulates net growth and is eventually disconnected. - let gaps: Vec = (0u32..60).flat_map(|i| [50 + i, 49 + i]).collect(); + let gaps: Vec = (0u32..MAX_RUNNING_GAP + 10) + .flat_map(|i| [50 + i, 49 + i]) + .collect(); assert!(matches!(run(&gaps), Err(StateSubscriptionError::TooSlow))); } } From 8e4af84385a0317694ef35e9ea54c3decfeba03e Mon Sep 17 00:00:00 2001 From: sergerad Date: Fri, 12 Jun 2026 13:10:58 +1200 Subject: [PATCH 13/15] lint --- crates/store/src/state/subscription.rs | 27 ++++++-------------------- 1 file changed, 6 insertions(+), 21 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 929eb38a4..002d3bb80 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -130,10 +130,7 @@ impl SubscriptionSource for BlockSource { block: Vec, committed_chain_tip: BlockNumber, ) -> BlockSubscriptionEvent { - BlockSubscriptionEvent { - block, - committed_chain_tip, - } + BlockSubscriptionEvent { block, committed_chain_tip } } } @@ -162,11 +159,7 @@ impl SubscriptionSource for ProofSource { proof: Vec, proven_chain_tip: BlockNumber, ) -> ProofSubscriptionEvent { - ProofSubscriptionEvent { - block_num, - proof, - proven_chain_tip, - } + ProofSubscriptionEvent { block_num, proof, proven_chain_tip } } } @@ -206,11 +199,8 @@ async fn run_stream( let mut tip = *tip_rx.borrow_and_update(); let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); - (previous_gap, running_gap) = check_growing_gap( - current_gap, - previous_gap.unwrap_or(current_gap), - running_gap, - )?; + (previous_gap, running_gap) = + check_growing_gap(current_gap, previous_gap.unwrap_or(current_gap), running_gap)?; while next <= tip { let data = source.fetch(next).await?; @@ -301,10 +291,7 @@ mod tests { #[test] fn exceeding_max_growth_run_returns_too_slow() { // One block past the limit triggers TooSlow, even in a single jump. - assert!(matches!( - run(&[0, MAX_RUNNING_GAP + 1]), - Err(StateSubscriptionError::TooSlow) - )); + assert!(matches!(run(&[0, MAX_RUNNING_GAP + 1]), Err(StateSubscriptionError::TooSlow))); } #[test] @@ -327,9 +314,7 @@ mod tests { fn token_improvement_does_not_prevent_disconnection() { // A client that grows by a large amount then shrinks by just one block on each cycle // accumulates net growth and is eventually disconnected. - let gaps: Vec = (0u32..MAX_RUNNING_GAP + 10) - .flat_map(|i| [50 + i, 49 + i]) - .collect(); + let gaps: Vec = (0u32..MAX_RUNNING_GAP + 10).flat_map(|i| [50 + i, 49 + i]).collect(); assert!(matches!(run(&gaps), Err(StateSubscriptionError::TooSlow))); } } From 2086aab9e0dcc349a553a44217eb7eb81e2a0eb4 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Fri, 12 Jun 2026 07:40:49 +0200 Subject: [PATCH 14/15] Init gap instead of using optional --- crates/store/src/state/subscription.rs | 12 ++++++------ 1 file changed, 6 insertions(+), 6 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 002d3bb80..27ce24ecd 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -193,14 +193,14 @@ async fn run_stream( source: S, ) -> Result<(), StateSubscriptionError> { let mut next = from; - let mut previous_gap: Option = None; + let mut previous_gap = tip_rx.borrow().as_u32(); let mut running_gap = 0u32; loop { let mut tip = *tip_rx.borrow_and_update(); let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); (previous_gap, running_gap) = - check_growing_gap(current_gap, previous_gap.unwrap_or(current_gap), running_gap)?; + check_growing_gap(current_gap, previous_gap, running_gap)?; while next <= tip { let data = source.fetch(next).await?; @@ -231,7 +231,7 @@ fn check_growing_gap( current_gap: u32, previous_gap: u32, running_gap: u32, -) -> Result<(Option, u32), StateSubscriptionError> { +) -> Result<(u32, u32), StateSubscriptionError> { let running_gap = if current_gap > previous_gap { running_gap + (current_gap - previous_gap) } else { @@ -240,7 +240,7 @@ fn check_growing_gap( if running_gap > MAX_RUNNING_GAP { return Err(StateSubscriptionError::TooSlow); } - Ok((Some(current_gap), running_gap)) + Ok((current_gap, running_gap)) } // TESTS @@ -251,11 +251,11 @@ mod tests { use super::*; fn run(gaps: &[u32]) -> Result<(), StateSubscriptionError> { - let mut previous_gap: Option = None; + let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX); let mut growth_run = 0u32; for &gap in gaps { (previous_gap, growth_run) = - check_growing_gap(gap, previous_gap.unwrap_or(gap), growth_run)?; + check_growing_gap(gap, previous_gap, growth_run)?; } Ok(()) } From b3971cfa9365be30d702b85f8025c8f258167a56 Mon Sep 17 00:00:00 2001 From: Mirko von Leipzig <48352201+Mirko-von-Leipzig@users.noreply.github.com> Date: Fri, 12 Jun 2026 07:48:39 +0200 Subject: [PATCH 15/15] Fmt --- crates/store/src/state/subscription.rs | 6 ++---- 1 file changed, 2 insertions(+), 4 deletions(-) diff --git a/crates/store/src/state/subscription.rs b/crates/store/src/state/subscription.rs index 27ce24ecd..cc44b756a 100644 --- a/crates/store/src/state/subscription.rs +++ b/crates/store/src/state/subscription.rs @@ -199,8 +199,7 @@ async fn run_stream( let mut tip = *tip_rx.borrow_and_update(); let current_gap = tip.saturating_sub(next.as_u32()).as_u32(); - (previous_gap, running_gap) = - check_growing_gap(current_gap, previous_gap, running_gap)?; + (previous_gap, running_gap) = check_growing_gap(current_gap, previous_gap, running_gap)?; while next <= tip { let data = source.fetch(next).await?; @@ -254,8 +253,7 @@ mod tests { let mut previous_gap = gaps.first().copied().unwrap_or(u32::MAX); let mut growth_run = 0u32; for &gap in gaps { - (previous_gap, growth_run) = - check_growing_gap(gap, previous_gap, growth_run)?; + (previous_gap, growth_run) = check_growing_gap(gap, previous_gap, growth_run)?; } Ok(()) }