Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -49,6 +49,7 @@ hex = "0.4"
spawned-concurrency = "0.5.0"
spawned-rt = "0.5.0"
tokio = "1.0"
tokio-util = "0.7"

prometheus = "0.14"

Expand Down
1 change: 1 addition & 0 deletions bin/ethlambda/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ libssz.workspace = true
libssz-types.workspace = true

tokio.workspace = true
tokio-util.workspace = true

tracing.workspace = true
tracing-subscriber = "0.3"
Expand Down
47 changes: 41 additions & 6 deletions bin/ethlambda/src/main.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@ use std::{
path::{Path, PathBuf},
sync::Arc,
};
use tokio_util::sync::CancellationToken;

use clap::Parser;
use ethlambda_blockchain::key_manager::ValidatorKeyPair;
Expand All @@ -29,7 +30,7 @@ use ethlambda_types::{
state::{State, ValidatorPubkeyBytes},
};
use serde::Deserialize;
use tracing::{error, info};
use tracing::{error, info, warn};
use tracing_subscriber::{EnvFilter, Layer, Registry, layer::SubscriberExt};

use ethlambda_blockchain::BlockChain;
Expand Down Expand Up @@ -204,21 +205,55 @@ async fn main() -> eyre::Result<()> {
})
.inspect_err(|err| error!(%err, "Failed to send InitBlockChain — actors not wired"))?;

tokio::spawn(async move {
let _ = ethlambda_rpc::start_metrics_server(metrics_socket)
let shutdown_token = CancellationToken::new();
let metrics_shutdown = shutdown_token.clone();
let api_shutdown = shutdown_token.clone();

let metrics_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_metrics_server(metrics_socket, metrics_shutdown)
.await
.inspect_err(|err| error!(%err, "Metrics server failed"));
});
tokio::spawn(async move {
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator)
let api_handle = tokio::spawn(async move {
let _ = ethlambda_rpc::start_api_server(api_socket, store, aggregator, api_shutdown)
.await
.inspect_err(|err| error!(%err, "API server failed"));
});

info!("Node initialized");

// 1st ctrl+c: start graceful shutdown
tokio::signal::ctrl_c().await.ok();
println!("Shutting down...");

info!("Shutdown signal received, stopping actors and servers...");

tokio::spawn(async move {
// This can be turned into a loop
tokio::signal::ctrl_c().await.ok();
warn!(
"Graceful shutdown in progress. Press ctrl+C 2 more times to force ungraceful shutdown"
);
tokio::signal::ctrl_c().await.ok();
warn!(
"Graceful shutdown in progress. Press ctrl+C 1 more times to force ungraceful shutdown"
);
tokio::signal::ctrl_c().await.ok();
info!("Forced ungraceful shutdown...");
std::process::exit(1);
});

let blockchain_ref = blockchain.actor_ref().clone();
let p2p_ref = p2p.actor_ref().clone();
blockchain_ref.context().stop();
p2p_ref.context().stop();
shutdown_token.cancel();

blockchain_ref.join().await;
p2p_ref.join().await;
let _ = api_handle.await;
let _ = metrics_handle.await;
Comment thread
dicethedev marked this conversation as resolved.

info!("Shutdown complete");

Ok(())
}
Expand Down
1 change: 1 addition & 0 deletions crates/net/rpc/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ version.workspace = true
[dependencies]
axum = "0.8.1"
tokio.workspace = true
tokio-util.workspace = true
ethlambda-fork-choice.workspace = true
ethlambda-metrics.workspace = true
tracing.workspace = true
Expand Down
19 changes: 16 additions & 3 deletions crates/net/rpc/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ use ethlambda_storage::Store;
use ethlambda_types::aggregator::AggregatorController;
use ethlambda_types::primitives::H256;
use libssz::SszEncode;
use tokio_util::sync::CancellationToken;

pub(crate) const JSON_CONTENT_TYPE: &str = "application/json; charset=utf-8";
pub(crate) const SSZ_CONTENT_TYPE: &str = "application/octet-stream";
Expand All @@ -20,23 +21,35 @@ pub async fn start_api_server(
address: SocketAddr,
store: Store,
aggregator: AggregatorController,
shutdown: CancellationToken,
) -> Result<(), std::io::Error> {
let api_router = build_api_router(store).layer(Extension(aggregator));

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, api_router).await?;
axum::serve(listener, api_router)
.with_graceful_shutdown(async move {
shutdown.cancelled().await;
})
.await?;

Ok(())
}

pub async fn start_metrics_server(address: SocketAddr) -> Result<(), std::io::Error> {
pub async fn start_metrics_server(
address: SocketAddr,
shutdown: CancellationToken,
) -> Result<(), std::io::Error> {
let metrics_router = metrics::start_prometheus_metrics_api();
let debug_router = build_debug_router();

let app = Router::new().merge(metrics_router).merge(debug_router);

let listener = tokio::net::TcpListener::bind(address).await?;
axum::serve(listener, app).await?;
axum::serve(listener, app)
.with_graceful_shutdown(async move {
shutdown.cancelled().await;
})
.await?;

Ok(())
}
Expand Down