Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

9 changes: 7 additions & 2 deletions core/binary_protocol/src/responses/topics/create_topic.rs
Original file line number Diff line number Diff line change
Expand Up @@ -15,5 +15,10 @@
// specific language governing permissions and limitations
// under the License.

/// `CreateTopic` response is empty.
pub type CreateTopicResponse = super::EmptyResponse;
/// `CreateTopic` reply ships the freshly-created topic.
///
/// Same `[TopicHeader][PartitionResponse]*` layout as `GetTopicResponse`,
/// so the SDK reuses one decoder for both calls. Legacy server's
/// `create_topic_handler` builds this shape directly; server-ng's metadata
/// STM emits the same bytes from `apply`.
pub type CreateTopicResponse = super::GetTopicResponse;
30 changes: 30 additions & 0 deletions core/configs/src/server_config/sharding.rs
Original file line number Diff line number Diff line change
Expand Up @@ -77,6 +77,21 @@ pub const SHUTDOWN_DRAIN_TIMEOUT_MAX: Duration = Duration::from_secs(600);
/// at 5s so Ctrl-C latency stays bounded regardless of config.
pub const SHUTDOWN_POLL_INTERVAL_MAX: Duration = Duration::from_secs(5);

/// Default safety-tick cadence for the partition reconciliation loop.
/// The reconciler also wakes on every `LifecycleFrame::MetadataCommitTick`
/// broadcast by shard 0; this fallback covers dropped wake-ups (the wake
/// channel is intentionally capacity-1) and the initial post-bootstrap
/// convergence window before shard 0's first tick. One second is
/// invisible to operators yet keeps idle clusters from burning CPU
/// re-reading the same target snapshot.
pub const DEFAULT_RECONCILE_PERIODIC_INTERVAL: Duration = Duration::from_secs(1);

/// Hard upper bound on `reconcile_periodic_interval`. A tick longer
/// than ~30s makes post-failure recovery latency operator-visible; the
/// cap reins in pathological typos without disturbing reasonable
/// production values.
pub const RECONCILE_PERIODIC_INTERVAL_MAX: Duration = Duration::from_secs(30);

const fn default_inbox_capacity() -> usize {
DEFAULT_INBOX_CAPACITY
}
Expand All @@ -89,6 +104,10 @@ fn default_shutdown_poll_interval() -> IggyDuration {
IggyDuration::new(DEFAULT_SHUTDOWN_POLL_INTERVAL)
}

fn default_reconcile_periodic_interval() -> IggyDuration {
IggyDuration::new(DEFAULT_RECONCILE_PERIODIC_INTERVAL)
}

#[serde_as]
#[derive(Debug, Deserialize, Serialize, ConfigEnv)]
pub struct ShardingConfig {
Expand Down Expand Up @@ -136,6 +155,16 @@ pub struct ShardingConfig {
#[serde_as(as = "DisplayFromStr")]
#[config_env(leaf)]
pub shutdown_poll_interval: IggyDuration,
/// Safety-tick cadence for the partition reconciliation loop; the
/// reconciler also wakes immediately on every
/// `LifecycleFrame::MetadataCommitTick` from shard 0. See
/// [`DEFAULT_RECONCILE_PERIODIC_INTERVAL`] for the rationale; values
/// above [`RECONCILE_PERIODIC_INTERVAL_MAX`] are rejected by the
/// validator.
#[serde(default = "default_reconcile_periodic_interval")]
#[serde_as(as = "DisplayFromStr")]
#[config_env(leaf)]
pub reconcile_periodic_interval: IggyDuration,
}

impl Default for ShardingConfig {
Expand All @@ -145,6 +174,7 @@ impl Default for ShardingConfig {
inbox_capacity: DEFAULT_INBOX_CAPACITY,
shutdown_drain_timeout: default_shutdown_drain_timeout(),
shutdown_poll_interval: default_shutdown_poll_interval(),
reconcile_periodic_interval: default_reconcile_periodic_interval(),
}
}
}
Expand Down
23 changes: 21 additions & 2 deletions core/configs/src/server_config/validators.rs
Original file line number Diff line number Diff line change
Expand Up @@ -22,8 +22,8 @@ use super::server::{
};
use super::server::{MemoryPoolConfig, PersonalAccessTokenConfig, ServerConfig};
use super::sharding::{
CpuAllocation, INBOX_CAPACITY_MAX, SHUTDOWN_DRAIN_TIMEOUT_MAX, SHUTDOWN_POLL_INTERVAL_MAX,
ShardingConfig,
CpuAllocation, INBOX_CAPACITY_MAX, RECONCILE_PERIODIC_INTERVAL_MAX, SHUTDOWN_DRAIN_TIMEOUT_MAX,
SHUTDOWN_POLL_INTERVAL_MAX, ShardingConfig,
};
use super::system::SegmentConfig;
use super::system::{CompressionConfig, LoggingConfig, PartitionConfig};
Expand Down Expand Up @@ -440,6 +440,25 @@ impl Validatable<ConfigurationError> for ShardingConfig {
return Err(ConfigurationError::InvalidConfigurationValue);
}

let reconcile = self.reconcile_periodic_interval.get_duration();
if reconcile.is_zero() {
eprintln!(
"Invalid sharding configuration: reconcile_periodic_interval resolves to zero. \
Note that \"0\", \"none\", \"unlimited\", and \"disabled\" all parse to zero. The \
periodic reconcile tick is a safety net for dropped commit-wakes and cannot be \
turned off; set a positive duration (default \"1s\", max {RECONCILE_PERIODIC_INTERVAL_MAX:?})."
);
return Err(ConfigurationError::InvalidConfigurationValue);
}
if reconcile > RECONCILE_PERIODIC_INTERVAL_MAX {
eprintln!(
"Invalid sharding configuration: reconcile_periodic_interval {:?} exceeds the \
{:?} cap (a long tick makes post-failure convergence latency operator-visible)",
reconcile, RECONCILE_PERIODIC_INTERVAL_MAX
);
return Err(ConfigurationError::InvalidConfigurationValue);
}

let available_cpus = available_parallelism()
.map_err(|_| {
eprintln!("Failed to detect available CPU cores");
Expand Down
2 changes: 2 additions & 0 deletions core/consensus/src/metadata_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -387,6 +387,8 @@ mod tests {

#[allow(clippy::future_not_send)]
impl MessageBus for ClientSpyBus {
fn track_background(&self, _handle: message_bus::JoinHandle<()>) {}

async fn send_to_client(
&self,
client_id: u128,
Expand Down
4 changes: 4 additions & 0 deletions core/consensus/src/plane_helpers.rs
Original file line number Diff line number Diff line change
Expand Up @@ -454,6 +454,8 @@ mod tests {
struct NoopBus;

impl MessageBus for NoopBus {
fn track_background(&self, _handle: message_bus::JoinHandle<()>) {}

async fn send_to_client(
&self,
_client_id: u128,
Expand Down Expand Up @@ -661,6 +663,8 @@ mod tests {

#[allow(clippy::future_not_send)]
impl MessageBus for SpyBus {
fn track_background(&self, _handle: message_bus::JoinHandle<()>) {}

async fn send_to_client(
&self,
_client_id: u128,
Expand Down
18 changes: 17 additions & 1 deletion core/message_bus/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -99,7 +99,7 @@ pub use lifecycle::{
};
pub use transports::tls::TlsServerCredentials;

use compio::runtime::JoinHandle;
pub use compio::runtime::JoinHandle;
use configs::server_ng::ServerNgConfig;
use iggy_binary_protocol::GenericHeader;
use server_common::{MESSAGE_ALIGN, Message, iobuf::Frozen};
Expand Down Expand Up @@ -507,6 +507,14 @@ pub trait MessageBus {
/// Panics on a second install on the same bus, same one-shot
/// `OnceCell` invariant as [`Self::set_replica_forward_fn`].
fn set_client_forward_fn(&self, f: ClientForwardFn);

/// Register a detached `compio::runtime::spawn` handle so
/// [`shutdown`](IggyMessageBus::shutdown) can await it before the
/// runtime drops. Production [`IggyMessageBus`] pushes onto a
/// `RefCell<Vec<JoinHandle>>` drained on shutdown. Required (no default)
/// so a new impl cannot silently drop detached handles by omission; a
/// stub that spawns nothing implements it as a no-op.
fn track_background(&self, handle: JoinHandle<()>);
}

/// Production message bus backed by real TCP connections.
Expand Down Expand Up @@ -1024,6 +1032,10 @@ impl<T: MessageBus + ?Sized> MessageBus for std::rc::Rc<T> {
fn set_client_forward_fn(&self, f: ClientForwardFn) {
(**self).set_client_forward_fn(f);
}

fn track_background(&self, handle: JoinHandle<()>) {
(**self).track_background(handle);
}
}

#[allow(clippy::future_not_send)]
Expand Down Expand Up @@ -1109,6 +1121,10 @@ impl MessageBus for IggyMessageBus {
fn set_client_forward_fn(&self, f: ClientForwardFn) {
Self::set_client_forward_fn(self, f);
}

fn track_background(&self, handle: JoinHandle<()>) {
Self::track_background(self, handle);
}
}

/// Extract the owning shard from a client id.
Expand Down
135 changes: 135 additions & 0 deletions core/metadata/src/impls/metadata.rs
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@ use server_common::Message;
use std::cell::RefCell;
use std::mem::size_of;
use std::path::Path;
use std::rc::Rc;
use tracing::{debug, error, warn};

fn freeze_client_reply(
Expand Down Expand Up @@ -335,6 +336,16 @@ fn require_shard_zero<'a, T>(
slot
}

/// Late-bound callback invoked after every successful
/// `mux_stm.update(prepare)` on shard 0's metadata commit path.
///
/// Wired by server-ng bootstrap once the metadata bundle has broadcast;
/// receives the committed [`Operation`] so the recipient can filter (the
/// partition reconciliation loop only cares about partition-shaped
/// events). Wrapped in [`RefCell`] for late binding; the per-shard
/// single-thread invariant keeps access safe without [`Sync`].
pub type CommitNotifier = std::rc::Rc<dyn Fn(Operation)>;

pub struct IggyMetadata<C, J, S, M> {
/// `Some` on shard 0, `None` on other shards. Server-ng bootstrap
/// holds the invariant: only shard 0 owns the metadata consensus
Expand All @@ -359,6 +370,12 @@ pub struct IggyMetadata<C, J, S, M> {
pub coordinator: Option<SnapshotCoordinator<M>>,
/// Per-client session state (sessions, dedup, eviction). Metadata-only.
pub client_table: RefCell<ClientTable>,
/// Late-bound post-commit notifier. Fires once per committed
/// operation after [`crate::stm::StateMachine::update`] succeeds in
/// both [`Plane::on_ack`] and [`Self::commit_journal`]. `None` until
/// [`Self::set_commit_notifier`] runs (server-ng bootstrap on shard
/// 0 sets it; peer shards and tests leave it `None`).
commit_notifier: RefCell<Option<CommitNotifier>>,
}

impl<C, J, S, M> IggyMetadata<C, J, S, M>
Expand Down Expand Up @@ -388,6 +405,26 @@ where
allocator,
coordinator,
client_table: RefCell::new(ClientTable::new(CLIENTS_TABLE_MAX)),
commit_notifier: RefCell::new(None),
}
}
}

impl<C, J, S, M> IggyMetadata<C, J, S, M> {
/// Install (or replace) the post-commit notifier. Passing `None`
/// removes any previous one. Server-ng bootstrap calls this on shard 0
/// only; peer shards never commit metadata locally.
pub fn set_commit_notifier(&self, notifier: Option<CommitNotifier>) {
*self.commit_notifier.borrow_mut() = notifier;
}

/// Fire post-commit notifier. Clones the `Rc` out under a short
/// borrow so a re-entrant `set_commit_notifier` from inside the
/// closure cannot panic on `borrow_mut`.
fn fire_commit_notifier(&self, operation: Operation) {
let notifier = self.commit_notifier.borrow().as_ref().map(Rc::clone);
if let Some(notifier) = notifier {
notifier(operation);
}
}
}
Expand Down Expand Up @@ -742,6 +779,10 @@ where
prepare_header.op
);
});
// Post-commit notifier (e.g. partition reconciler
// wake-up). Filtering by operation is the
// recipient's responsibility.
self.fire_commit_notifier(prepare_header.operation);
let reply = build_reply_message(&prepare_header, &response);
// Cache only if session exists. Client evicted between
// prepare and commit: skip cache (`commit_reply` no-ops),
Expand Down Expand Up @@ -1484,6 +1525,11 @@ where
let response = self.mux_stm.update(prepare).unwrap_or_else(|err| {
panic!("commit_journal: committed metadata op={op} failed to apply: {err}");
});
// Post-commit notifier (e.g. partition reconciler
// wake-up). Same hook fires on backups so reconcilers
// converge after replicated commits, not only quorum-acked
// ones reached via `on_ack` on the primary.
self.fire_commit_notifier(header.operation);
let reply = build_reply_message(&header, &response);
// Cache only if session still exists. WAL replay may carry a
// reply for a later-evicted client; `commit_reply` no-ops.
Expand Down Expand Up @@ -1671,3 +1717,92 @@ where

prepare
}

#[cfg(test)]
mod tests {
use super::*;
use crate::stm::consumer_group::ConsumerGroups;
use crate::stm::stream::Streams;
use crate::stm::user::Users;
use iggy_common::variadic;
use std::cell::RefCell;
use std::rc::Rc;

type TestMux = MuxStateMachine<variadic!(Users, Streams, ConsumerGroups)>;

/// Build a peer-shard-style `IggyMetadata` with `consensus`,
/// `journal`, and `snapshot` all `None`. Enough to test the
/// commit-notifier slot without standing up VSR / WAL infrastructure:
/// the test picks `()` for `C` / `J` / `S` since no notifier code path
/// touches their methods.
fn peer_metadata() -> IggyMetadata<(), (), (), TestMux> {
IggyMetadata::new(None, None, None, TestMux::default(), None)
}

#[test]
fn commit_notifier_fires_with_received_operation() {
let md = peer_metadata();
let captured: Rc<RefCell<Vec<Operation>>> = Rc::new(RefCell::new(Vec::new()));

let observer = Rc::clone(&captured);
md.set_commit_notifier(Some(Rc::new(move |op| {
observer.borrow_mut().push(op);
})));

md.fire_commit_notifier(Operation::CreateTopicWithAssignments);
md.fire_commit_notifier(Operation::DeletePartitions);
md.fire_commit_notifier(Operation::DeleteStream);

let seen = captured.borrow();
assert_eq!(
seen.as_slice(),
&[
Operation::CreateTopicWithAssignments,
Operation::DeletePartitions,
Operation::DeleteStream,
],
"notifier must observe every fired operation in order"
);
}

#[test]
fn commit_notifier_is_no_op_when_unset() {
// No notifier installed: firing must not panic, must not allocate.
// Mirrors the production-side guarantee that peer shards (no
// notifier) take the same commit path as shard 0 (with notifier).
let md = peer_metadata();
md.fire_commit_notifier(Operation::CreateStream);
}

#[test]
fn commit_notifier_can_be_replaced_and_cleared() {
let md = peer_metadata();
let first_count: Rc<RefCell<usize>> = Rc::new(RefCell::new(0));
let second_count: Rc<RefCell<usize>> = Rc::new(RefCell::new(0));

let first_observer = Rc::clone(&first_count);
md.set_commit_notifier(Some(Rc::new(move |_op| {
*first_observer.borrow_mut() += 1;
})));
md.fire_commit_notifier(Operation::CreateStream);
assert_eq!(*first_count.borrow(), 1);

// Replace: the first closure must no longer run.
let second_observer = Rc::clone(&second_count);
md.set_commit_notifier(Some(Rc::new(move |_op| {
*second_observer.borrow_mut() += 1;
})));
md.fire_commit_notifier(Operation::DeleteStream);
assert_eq!(*first_count.borrow(), 1, "old notifier must be detached");
assert_eq!(*second_count.borrow(), 1, "new notifier must take over");

// Clear: subsequent fires must be no-ops.
md.set_commit_notifier(None);
md.fire_commit_notifier(Operation::DeleteTopic);
assert_eq!(
*second_count.borrow(),
1,
"cleared notifier must stay quiet"
);
}
}
Loading
Loading