From e53b72cbfb7305c3e862ed306561820536383ccb Mon Sep 17 00:00:00 2001 From: dicethedev Date: Tue, 28 Apr 2026 05:29:13 +0100 Subject: [PATCH 1/5] fix: stop genservers gracefully --- bin/ethlambda/src/main.rs | 34 +++++++++++++++++++++++++++++----- crates/net/rpc/src/lib.rs | 14 +++++++++++--- 2 files changed, 40 insertions(+), 8 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 3c3f816..a52edab 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -16,6 +16,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; +use tokio::sync::Notify; use clap::Parser; use ethlambda_blockchain::key_manager::ValidatorKeyPair; @@ -204,13 +205,23 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; - tokio::spawn(async move { - let _ = ethlambda_rpc::start_metrics_server(metrics_socket) + let shutdown_notify = Arc::new(Notify::new()); + let metrics_shutdown = shutdown_notify.clone(); + let api_shutdown = shutdown_notify.clone(); + + let metrics_handle = tokio::spawn(async move { + let shutdown_future = async move { + metrics_shutdown.notified().await; + }; + let _ = ethlambda_rpc::start_metrics_server(metrics_socket, shutdown_future) .await .inspect_err(|err| error!(%err, "Metrics server failed")); }); - tokio::spawn(async move { - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator) + let api_handle = tokio::spawn(async move { + let shutdown_future = async move { + api_shutdown.notified().await; + }; + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, shutdown_future) .await .inspect_err(|err| error!(%err, "API server failed")); }); @@ -218,7 +229,20 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); tokio::signal::ctrl_c().await.ok(); - println!("Shutting down..."); + info!("Shutdown signal received, stopping actors and servers..."); + + let blockchain_ref = blockchain.actor_ref().clone(); + let p2p_ref = p2p.actor_ref().clone(); + blockchain_ref.context().stop(); + p2p_ref.context().stop(); + shutdown_notify.notify_waiters(); + + blockchain_ref.join().await; + p2p_ref.join().await; + let _ = api_handle.await; + let _ = metrics_handle.await; + + info!("Shutdown complete"); Ok(()) } diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index acec7fa..21f031b 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -20,23 +20,31 @@ pub async fn start_api_server( address: SocketAddr, store: Store, aggregator: AggregatorController, + shutdown: impl std::future::Future + Send + 'static, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, api_router).await?; + axum::serve(listener, api_router) + .with_graceful_shutdown(shutdown) + .await?; Ok(()) } -pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Error> { +pub async fn start_metrics_server( + address: SocketAddr, + shutdown: impl std::future::Future + Send + 'static, +) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); let app = Router::new().merge(metrics_router).merge(debug_router); let listener = tokio::net::TcpListener::bind(address).await?; - axum::serve(listener, app).await?; + axum::serve(listener, app) + .with_graceful_shutdown(shutdown) + .await?; Ok(()) } From 8a7d098202ecc4ddc727108c82a3f116ebb530af Mon Sep 17 00:00:00 2001 From: dicethedev Date: Wed, 29 Apr 2026 13:39:34 +0100 Subject: [PATCH 2/5] fix: use CancellationToken for graceful shutdown --- Cargo.lock | 2 ++ Cargo.toml | 1 + bin/ethlambda/Cargo.toml | 1 + bin/ethlambda/src/main.rs | 20 +++++++------------- crates/net/rpc/Cargo.toml | 1 + crates/net/rpc/src/lib.rs | 12 ++++++++---- 6 files changed, 20 insertions(+), 17 deletions(-) diff --git a/Cargo.lock b/Cargo.lock index ba59e2a..26718db 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -2056,6 +2056,7 @@ dependencies = [ "thiserror 2.0.18", "tikv-jemallocator", "tokio", + "tokio-util", "tracing", "tracing-subscriber", "vergen-git2", @@ -2165,6 +2166,7 @@ dependencies = [ "serde", "serde_json", "tokio", + "tokio-util", "tower", "tracing", ] diff --git a/Cargo.toml b/Cargo.toml index d0a415e..c407f5e 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -49,6 +49,7 @@ hex = "0.4" spawned-concurrency = "0.5.0" spawned-rt = "0.5.0" tokio = "1.0" +tokio-util = "0.7" prometheus = "0.14" diff --git a/bin/ethlambda/Cargo.toml b/bin/ethlambda/Cargo.toml index 799a27b..e5e22ee 100644 --- a/bin/ethlambda/Cargo.toml +++ b/bin/ethlambda/Cargo.toml @@ -17,6 +17,7 @@ libssz.workspace = true libssz-types.workspace = true tokio.workspace = true +tokio-util.workspace = true tracing.workspace = true tracing-subscriber = "0.3" diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index a52edab..e64391f 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -16,7 +16,7 @@ use std::{ path::{Path, PathBuf}, sync::Arc, }; -use tokio::sync::Notify; +use tokio_util::sync::CancellationToken; use clap::Parser; use ethlambda_blockchain::key_manager::ValidatorKeyPair; @@ -205,23 +205,17 @@ async fn main() -> eyre::Result<()> { }) .inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?; - let shutdown_notify = Arc::new(Notify::new()); - let metrics_shutdown = shutdown_notify.clone(); - let api_shutdown = shutdown_notify.clone(); + let shutdown_token = CancellationToken::new(); + let metrics_shutdown = shutdown_token.clone(); + let api_shutdown = shutdown_token.clone(); let metrics_handle = tokio::spawn(async move { - let shutdown_future = async move { - metrics_shutdown.notified().await; - }; - let _ = ethlambda_rpc::start_metrics_server(metrics_socket, shutdown_future) + let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown) .await .inspect_err(|err| error!(%err, "Metrics server failed")); }); let api_handle = tokio::spawn(async move { - let shutdown_future = async move { - api_shutdown.notified().await; - }; - let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, shutdown_future) + let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown) .await .inspect_err(|err| error!(%err, "API server failed")); }); @@ -235,7 +229,7 @@ async fn main() -> eyre::Result<()> { let p2p_ref = p2p.actor_ref().clone(); blockchain_ref.context().stop(); p2p_ref.context().stop(); - shutdown_notify.notify_waiters(); + shutdown_token.cancel(); blockchain_ref.join().await; p2p_ref.join().await; diff --git a/crates/net/rpc/Cargo.toml b/crates/net/rpc/Cargo.toml index c05e9a2..df7e23c 100644 --- a/crates/net/rpc/Cargo.toml +++ b/crates/net/rpc/Cargo.toml @@ -12,6 +12,7 @@ version.workspace = true [dependencies] axum = "0.8.1" tokio.workspace = true +tokio-util.workspace = true ethlambda-fork-choice.workspace = true ethlambda-metrics.workspace = true tracing.workspace = true diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index 21f031b..f64b3f2 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -20,13 +20,15 @@ pub async fn start_api_server( address: SocketAddr, store: Store, aggregator: AggregatorController, - shutdown: impl std::future::Future + Send + 'static, + shutdown: tokio_util::sync::CancellationToken, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); let listener = tokio::net::TcpListener::bind(address).await?; axum::serve(listener, api_router) - .with_graceful_shutdown(shutdown) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) .await?; Ok(()) @@ -34,7 +36,7 @@ pub async fn start_api_server( pub async fn start_metrics_server( address: SocketAddr, - shutdown: impl std::future::Future + Send + 'static, + shutdown: tokio_util::sync::CancellationToken, ) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); @@ -43,7 +45,9 @@ pub async fn start_metrics_server( let listener = tokio::net::TcpListener::bind(address).await?; axum::serve(listener, app) - .with_graceful_shutdown(shutdown) + .with_graceful_shutdown(async move { + shutdown.cancelled().await; + }) .await?; Ok(()) From 28a7f6f99e0bc10c180b6cc8f9bcfb72ee1c6eda Mon Sep 17 00:00:00 2001 From: dicethedev Date: Wed, 29 Apr 2026 13:45:44 +0100 Subject: [PATCH 3/5] feat: allow force shutdown on second Ctrl+C --- bin/ethlambda/src/main.rs | 31 ++++++++++++++++++++++++++++--- 1 file changed, 28 insertions(+), 3 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index e64391f..5339056 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -15,6 +15,7 @@ use std::{ net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, sync::Arc, + sync::atomic::{AtomicU32, Ordering}, }; use tokio_util::sync::CancellationToken; @@ -30,7 +31,7 @@ use ethlambda_types::{ state::{State, ValidatorPubkeyBytes}, }; use serde::Deserialize; -use tracing::{error, info}; +use tracing::{error, info, warn}; use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt}; use ethlambda_blockchain::BlockChain; @@ -222,8 +223,32 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); - tokio::signal::ctrl_c().await.ok(); - info!("Shutdown signal received, stopping actors and servers..."); + let signal_count = Arc::new(AtomicU32::new(0)); + let signal_count_clone = signal_count.clone(); + + // Handle multiple Ctrl+C signals: + // 1st Ctrl+C: graceful shutdown + // 2nd Ctrl+C: force exit immediately + tokio::spawn(async move { + loop { + tokio::signal::ctrl_c().await.ok(); + let count = signal_count_clone.fetch_add(1, Ordering::SeqCst) + 1; + if count == 1 { + info!("Shutdown signal received, stopping actors and servers..."); + } else { + warn!("Force shutdown requested, exiting immediately"); + std::process::exit(1); + } + } + }); + + // Wait for first signal + loop { + if signal_count.load(Ordering::SeqCst) > 0 { + break; + } + tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; + } let blockchain_ref = blockchain.actor_ref().clone(); let p2p_ref = p2p.actor_ref().clone(); From fa7ba58d80de2e501b5d42efcc05a6119158e851 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Wed, 29 Apr 2026 23:16:49 +0100 Subject: [PATCH 4/5] refactor: simplify shutdown signal handling with force-exit prompts --- bin/ethlambda/src/main.rs | 36 ++++++++++++------------------------ crates/net/rpc/src/lib.rs | 5 +++-- 2 files changed, 15 insertions(+), 26 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 5339056..2821489 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -15,7 +15,6 @@ use std::{ net::{IpAddr, SocketAddr}, path::{Path, PathBuf}, sync::Arc, - sync::atomic::{AtomicU32, Ordering}, }; use tokio_util::sync::CancellationToken; @@ -223,33 +222,22 @@ async fn main() -> eyre::Result<()> { info!("Node initialized"); - let signal_count = Arc::new(AtomicU32::new(0)); - let signal_count_clone = signal_count.clone(); + // 1st ctrl+c: start graceful shutdown + tokio::signal::ctrl_c().await.ok(); + + info!("Shutdown signal received, stopping actors and servers..."); - // Handle multiple Ctrl+C signals: - // 1st Ctrl+C: graceful shutdown - // 2nd Ctrl+C: force exit immediately tokio::spawn(async move { - loop { - tokio::signal::ctrl_c().await.ok(); - let count = signal_count_clone.fetch_add(1, Ordering::SeqCst) + 1; - if count == 1 { - info!("Shutdown signal received, stopping actors and servers..."); - } else { - warn!("Force shutdown requested, exiting immediately"); - std::process::exit(1); - } - } + // This can be turned into a loop + tokio::signal::ctrl_c().await.ok(); + warn!("Graceful shutdown in progress. Press ctrl+C 2 more times to force ungraceful shutdown"); + tokio::signal::ctrl_c().await.ok(); + warn!("Graceful shutdown in progress. Press ctrl+C 1 more times to force ungraceful shutdown"); + tokio::signal::ctrl_c().await.ok(); + info!("Forced ungraceful shutdown..."); + std::process::exit(1); }); - // Wait for first signal - loop { - if signal_count.load(Ordering::SeqCst) > 0 { - break; - } - tokio::time::sleep(tokio::time::Duration::from_millis(100)).await; - } - let blockchain_ref = blockchain.actor_ref().clone(); let p2p_ref = p2p.actor_ref().clone(); blockchain_ref.context().stop(); diff --git a/crates/net/rpc/src/lib.rs b/crates/net/rpc/src/lib.rs index f64b3f2..e756e2f 100644 --- a/crates/net/rpc/src/lib.rs +++ b/crates/net/rpc/src/lib.rs @@ -7,6 +7,7 @@ use ethlambda_storage::Store; use ethlambda_types::aggregator::AggregatorController; use ethlambda_types::primitives::H256; use libssz::SszEncode; +use tokio_util::sync::CancellationToken; pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8"; pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream"; @@ -20,7 +21,7 @@ pub async fn start_api_server( address: SocketAddr, store: Store, aggregator: AggregatorController, - shutdown: tokio_util::sync::CancellationToken, + shutdown: CancellationToken, ) -> Result<(), std::io::Error> { let api_router = build_api_router(store).layer(Extension(aggregator)); @@ -36,7 +37,7 @@ pub async fn start_api_server( pub async fn start_metrics_server( address: SocketAddr, - shutdown: tokio_util::sync::CancellationToken, + shutdown: CancellationToken, ) -> Result<(), std::io::Error> { let metrics_router = metrics::start_prometheus_metrics_api(); let debug_router = build_debug_router(); From 308bd016389d1dd8ac4f4779a665aa5f0d8e4635 Mon Sep 17 00:00:00 2001 From: dicethedev Date: Thu, 30 Apr 2026 17:32:32 +0100 Subject: [PATCH 5/5] format files --- bin/ethlambda/src/main.rs | 8 ++++++-- 1 file changed, 6 insertions(+), 2 deletions(-) diff --git a/bin/ethlambda/src/main.rs b/bin/ethlambda/src/main.rs index 2821489..83c768d 100644 --- a/bin/ethlambda/src/main.rs +++ b/bin/ethlambda/src/main.rs @@ -230,9 +230,13 @@ async fn main() -> eyre::Result<()> { tokio::spawn(async move { // This can be turned into a loop tokio::signal::ctrl_c().await.ok(); - warn!("Graceful shutdown in progress. Press ctrl+C 2 more times to force ungraceful shutdown"); + warn!( + "Graceful shutdown in progress. Press ctrl+C 2 more times to force ungraceful shutdown" + ); tokio::signal::ctrl_c().await.ok(); - warn!("Graceful shutdown in progress. Press ctrl+C 1 more times to force ungraceful shutdown"); + warn!( + "Graceful shutdown in progress. Press ctrl+C 1 more times to force ungraceful shutdown" + ); tokio::signal::ctrl_c().await.ok(); info!("Forced ungraceful shutdown..."); std::process::exit(1);