From 6196b672de385833d01d83df6a18b92f11928298 Mon Sep 17 00:00:00 2001 From: MasterPtato Date: Wed, 24 Jun 2026 15:50:19 -0700 Subject: [PATCH] [SLOP(claude-opus-4-8-high)] feat(universaldb): postgres leader-resolver driver overhaul --- Cargo.lock | 17 + Cargo.toml | 4 + .../packages/test-deps-docker/src/database.rs | 2 +- engine/packages/universaldb/Cargo.toml | 5 + ...onflict_tracker.rs => conflict_tracker.rs} | 20 +- .../universaldb/src/driver/postgres/codec.rs | 167 +++++++++ .../universaldb/src/driver/postgres/commit.rs | 185 ++++++++++ .../src/driver/postgres/database.rs | 241 +++++++------ .../src/driver/postgres/listener.rs | 228 ++++++++++++ .../universaldb/src/driver/postgres/mod.rs | 5 + .../src/driver/postgres/resolver/apply.rs | 139 ++++++++ .../src/driver/postgres/resolver/lease.rs | 82 +++++ .../src/driver/postgres/resolver/mod.rs | 336 ++++++++++++++++++ .../universaldb/src/driver/postgres/shared.rs | 161 +++++++++ .../src/driver/postgres/transaction.rs | 22 +- .../src/driver/postgres/transaction_task.rs | 283 +++------------ .../src/driver/rocksdb/database.rs | 6 +- .../universaldb/src/driver/rocksdb/mod.rs | 1 - .../src/driver/rocksdb/transaction.rs | 7 +- .../src/driver/rocksdb/transaction_task.rs | 6 +- engine/packages/universaldb/src/lib.rs | 1 + .../packages/universaldb/tests/integration.rs | 10 +- engine/sdks/rust/depot-protocol/Cargo.toml | 1 + .../sdks/rust/universaldb-commit/Cargo.toml | 16 + engine/sdks/rust/universaldb-commit/build.rs | 64 ++++ .../rust/universaldb-commit/src/generated.rs | 1 + .../sdks/rust/universaldb-commit/src/lib.rs | 6 + .../rust/universaldb-commit/src/versioned.rs | 38 ++ .../sdks/schemas/universaldb-commit/v1.bare | 70 ++++ scripts/run/postgres.sh | 2 +- scripts/run/restore-postgres.sh | 2 +- self-host/compose/dev-host/docker-compose.yml | 2 +- .../dev-multidc-multinode/docker-compose.yml | 6 +- .../compose/dev-multidc/docker-compose.yml | 6 +- .../compose/dev-multinode/docker-compose.yml | 2 +- self-host/compose/dev/docker-compose.yml | 2 +- .../compose/template/src/docker-compose.ts | 2 +- .../k8s/engine/12-postgres-statefulset.yaml | 2 +- .../docs/self-hosting/docker-compose.mdx | 2 +- .../docs/self-hosting/docker-container.mdx | 2 +- 40 files changed, 1747 insertions(+), 407 deletions(-) rename engine/packages/universaldb/src/{driver/rocksdb/transaction_conflict_tracker.rs => conflict_tracker.rs} (69%) create mode 100644 engine/packages/universaldb/src/driver/postgres/codec.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/commit.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/listener.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/resolver/apply.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/resolver/lease.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/resolver/mod.rs create mode 100644 engine/packages/universaldb/src/driver/postgres/shared.rs create mode 100644 engine/sdks/rust/universaldb-commit/Cargo.toml create mode 100644 engine/sdks/rust/universaldb-commit/build.rs create mode 100644 engine/sdks/rust/universaldb-commit/src/generated.rs create mode 100644 engine/sdks/rust/universaldb-commit/src/lib.rs create mode 100644 engine/sdks/rust/universaldb-commit/src/versioned.rs create mode 100644 engine/sdks/schemas/universaldb-commit/v1.bare diff --git a/Cargo.lock b/Cargo.lock index fa28937016..f2f1a190d1 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -5323,6 +5323,7 @@ name = "rivet-depot-protocol" version = "2.3.2" dependencies = [ "anyhow", + "rivet-util", "rivet-vbare-compiler", "serde", "serde_bare", @@ -5884,6 +5885,17 @@ dependencies = [ "vbare", ] +[[package]] +name = "rivet-universaldb-commit" +version = "2.3.2" +dependencies = [ + "anyhow", + "rivet-vbare-compiler", + "serde", + "serde_bare", + "vbare", +] + [[package]] name = "rivet-ups-broadcast" version = "0.1.0" @@ -8186,6 +8198,7 @@ version = "2.3.2" dependencies = [ "anyhow", "async-trait", + "base64 0.22.1", "deadpool-postgres", "foundationdb-tuple", "futures-util", @@ -8199,17 +8212,21 @@ dependencies = [ "rivet-postgres-util", "rivet-test-deps-docker", "rivet-tracing-utils", + "rivet-universaldb-commit", "rocksdb", + "scc", "serde", "tempfile", "thiserror 1.0.69", "tokio", "tokio-postgres", "tokio-postgres-rustls", + "tokio-util", "tracing", "tracing-subscriber", "url", "uuid", + "vbare", ] [[package]] diff --git a/Cargo.toml b/Cargo.toml index fdcad3f310..2f9d8ef621 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -66,6 +66,7 @@ members = [ "engine/sdks/rust/depot-protocol", "engine/sdks/rust/test-envoy", "engine/sdks/rust/ups-protocol", + "engine/sdks/rust/universaldb-commit", "rivetkit-rust/packages/actor-persist", "rivetkit-rust/packages/client", "rivetkit-rust/packages/engine-process", @@ -638,6 +639,9 @@ members = [ [workspace.dependencies.rivet-ups-protocol] path = "engine/sdks/rust/ups-protocol" + [workspace.dependencies.rivet-universaldb-commit] + path = "engine/sdks/rust/universaldb-commit" + [profile.dev] overflow-checks = false # "line-tables-only" produces just the line-number DWARF needed for stack diff --git a/engine/packages/test-deps-docker/src/database.rs b/engine/packages/test-deps-docker/src/database.rs index 11532ebf25..955e4d7897 100644 --- a/engine/packages/test-deps-docker/src/database.rs +++ b/engine/packages/test-deps-docker/src/database.rs @@ -61,7 +61,7 @@ impl TestDatabase { }); let docker_config = DockerRunConfig { - image: "postgres:17".to_string(), + image: "postgres:18".to_string(), container_name: container_name.clone(), port_mapping: (port, 5432), env_vars: vec![ diff --git a/engine/packages/universaldb/Cargo.toml b/engine/packages/universaldb/Cargo.toml index 1fdc9b1c99..b2f6b5f045 100644 --- a/engine/packages/universaldb/Cargo.toml +++ b/engine/packages/universaldb/Cargo.toml @@ -9,6 +9,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true async-trait.workspace = true +base64.workspace = true deadpool-postgres.workspace = true foundationdb-tuple.workspace = true futures-util.workspace = true @@ -18,16 +19,20 @@ rand.workspace = true rivet-metrics.workspace = true rivet-postgres-util.workspace = true rivet-tracing-utils.workspace = true +rivet-universaldb-commit.workspace = true rocksdb.workspace = true +scc.workspace = true serde.workspace = true tempfile.workspace = true thiserror.workspace = true tokio-postgres-rustls.workspace = true tokio-postgres.workspace = true +tokio-util.workspace = true tokio.workspace = true tracing.workspace = true url.workspace = true uuid.workspace = true +vbare.workspace = true [dev-dependencies] rivet-config.workspace = true diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs b/engine/packages/universaldb/src/conflict_tracker.rs similarity index 69% rename from engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs rename to engine/packages/universaldb/src/conflict_tracker.rs index 370240760a..127e2bf615 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_conflict_tracker.rs +++ b/engine/packages/universaldb/src/conflict_tracker.rs @@ -11,7 +11,7 @@ use tokio::sync::Mutex; use crate::options::ConflictRangeType; // Transactions cannot live longer than 5 seconds so we don't need to store transaction conflicts longer than -// that +// that. const TXN_CONFLICT_TTL: Duration = Duration::from_secs(10); #[derive(Debug)] @@ -22,6 +22,15 @@ struct PreviousTransaction { conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, } +/// In-process FoundationDB-style resolver. Holds the last `TXN_CONFLICT_TTL` of committed +/// transactions and rejects a committing transaction if any retained transaction has both an +/// overlapping version window and an overlapping conflict range of a differing type. +/// +/// Used by the rocksdb driver (single process) and by the postgres leader-resolver. The two +/// differ only in where the commit version comes from: rocksdb generates it from the in-process +/// `global_version` counter, while the postgres leader assigns it from the durable +/// `udb_version_seq` so it survives leader failover and matches the versionstamp. For that reason +/// `check_and_insert` takes the commit version from the caller instead of generating it. #[derive(Clone)] pub struct TransactionConflictTracker { // NOTE: We use a mutex because we need to lock reads across all active txns. This could be optimized to @@ -40,18 +49,23 @@ impl TransactionConflictTracker { } } - /// Each number returned is unique. + /// Each number returned is unique. Used by the in-process rocksdb driver to assign both start + /// and commit versions. The postgres leader does not use this; it assigns versions from the + /// durable Postgres sequence. pub fn next_global_version(&self) -> u64 { self.global_version.fetch_add(1, Ordering::SeqCst) } + /// Returns `true` on conflict (same polarity as the original rocksdb tracker). The caller + /// supplies `commit_version` (e.g. `nextval('udb_version_seq')` on the postgres leader, or + /// `next_global_version()` on rocksdb) so version assignment stays the caller's responsibility. pub async fn check_and_insert( &self, txn1_start_version: u64, + txn1_commit_version: u64, txn1_conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, ) -> bool { let mut txns = self.txns.lock().await; - let txn1_commit_version = self.next_global_version(); // Prune old entries txns.retain(|txn| txn.insert_instant.elapsed() < TXN_CONFLICT_TTL); diff --git a/engine/packages/universaldb/src/driver/postgres/codec.rs b/engine/packages/universaldb/src/driver/postgres/codec.rs new file mode 100644 index 0000000000..432ede2f34 --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/codec.rs @@ -0,0 +1,167 @@ +use anyhow::Result; +use rivet_universaldb_commit::{self as proto, versioned}; +use vbare::OwnedVersionedData; + +use crate::{ + options::{ConflictRangeType, MutationType}, + tx_ops::Operation, +}; + +/// Decoded form of a `udb_commit_requests.payload` blob. +/// +/// `read_version` is intentionally omitted: it is also denormalized into the `read_version` column, +/// which is what the leader's drain reads, so decoding it here would be dead. +pub struct DecodedCommit { + pub conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, + pub operations: Vec, +} + +/// Encode a follower's commit request to the versioned BARE wire format with an embedded version +/// header so a leader running older or newer code can still decode it during a rolling deploy. +pub fn encode_commit_request( + read_version: u64, + conflict_ranges: &[(Vec, Vec, ConflictRangeType)], + operations: &[Operation], +) -> Result> { + let request = proto::CommitRequest { + read_version, + conflict_ranges: conflict_ranges + .iter() + .map(|(begin, end, kind)| proto::ConflictRange { + begin: begin.clone(), + end: end.clone(), + kind: conflict_range_type_to_proto(*kind), + }) + .collect(), + operations: operations.iter().map(operation_to_proto).collect(), + }; + + versioned::CommitRequest::wrap_latest(request) + .serialize_with_embedded_version(proto::PROTOCOL_VERSION) +} + +/// Decode a `udb_commit_requests.payload` blob produced by [`encode_commit_request`]. +pub fn decode_commit_request(payload: &[u8]) -> Result { + let request = versioned::CommitRequest::deserialize_with_embedded_version(payload)?; + + let conflict_ranges = request + .conflict_ranges + .into_iter() + .map(|range| { + ( + range.begin, + range.end, + conflict_range_type_from_proto(range.kind), + ) + }) + .collect(); + + let operations = request + .operations + .into_iter() + .map(operation_from_proto) + .collect(); + + Ok(DecodedCommit { + conflict_ranges, + operations, + }) +} + +fn conflict_range_type_to_proto(kind: ConflictRangeType) -> proto::ConflictRangeType { + match kind { + ConflictRangeType::Read => proto::ConflictRangeType::Read, + ConflictRangeType::Write => proto::ConflictRangeType::Write, + } +} + +fn conflict_range_type_from_proto(kind: proto::ConflictRangeType) -> ConflictRangeType { + match kind { + proto::ConflictRangeType::Read => ConflictRangeType::Read, + proto::ConflictRangeType::Write => ConflictRangeType::Write, + } +} + +fn operation_to_proto(op: &Operation) -> proto::Operation { + match op { + Operation::SetValue { key, value } => proto::Operation::SetValue(proto::SetValue { + key: key.clone(), + value: value.clone(), + }), + Operation::Clear { key } => proto::Operation::Clear(proto::Clear { key: key.clone() }), + Operation::ClearRange { begin, end } => proto::Operation::ClearRange(proto::ClearRange { + begin: begin.clone(), + end: end.clone(), + }), + Operation::AtomicOp { + key, + param, + op_type, + } => proto::Operation::AtomicOp(proto::AtomicOp { + key: key.clone(), + param: param.clone(), + op_type: mutation_type_to_proto(*op_type), + }), + } +} + +fn operation_from_proto(op: proto::Operation) -> Operation { + match op { + proto::Operation::SetValue(proto::SetValue { key, value }) => { + Operation::SetValue { key, value } + } + proto::Operation::Clear(proto::Clear { key }) => Operation::Clear { key }, + proto::Operation::ClearRange(proto::ClearRange { begin, end }) => { + Operation::ClearRange { begin, end } + } + proto::Operation::AtomicOp(proto::AtomicOp { + key, + param, + op_type, + }) => Operation::AtomicOp { + key, + param, + op_type: mutation_type_from_proto(op_type), + }, + } +} + +fn mutation_type_to_proto(op_type: MutationType) -> proto::MutationType { + match op_type { + MutationType::Add => proto::MutationType::Add, + MutationType::And => proto::MutationType::And, + MutationType::BitAnd => proto::MutationType::BitAnd, + MutationType::Or => proto::MutationType::Or, + MutationType::BitOr => proto::MutationType::BitOr, + MutationType::Xor => proto::MutationType::Xor, + MutationType::BitXor => proto::MutationType::BitXor, + MutationType::AppendIfFits => proto::MutationType::AppendIfFits, + MutationType::Max => proto::MutationType::Max, + MutationType::Min => proto::MutationType::Min, + MutationType::SetVersionstampedKey => proto::MutationType::SetVersionstampedKey, + MutationType::SetVersionstampedValue => proto::MutationType::SetVersionstampedValue, + MutationType::ByteMin => proto::MutationType::ByteMin, + MutationType::ByteMax => proto::MutationType::ByteMax, + MutationType::CompareAndClear => proto::MutationType::CompareAndClear, + } +} + +fn mutation_type_from_proto(op_type: proto::MutationType) -> MutationType { + match op_type { + proto::MutationType::Add => MutationType::Add, + proto::MutationType::And => MutationType::And, + proto::MutationType::BitAnd => MutationType::BitAnd, + proto::MutationType::Or => MutationType::Or, + proto::MutationType::BitOr => MutationType::BitOr, + proto::MutationType::Xor => MutationType::Xor, + proto::MutationType::BitXor => MutationType::BitXor, + proto::MutationType::AppendIfFits => MutationType::AppendIfFits, + proto::MutationType::Max => MutationType::Max, + proto::MutationType::Min => MutationType::Min, + proto::MutationType::SetVersionstampedKey => MutationType::SetVersionstampedKey, + proto::MutationType::SetVersionstampedValue => MutationType::SetVersionstampedValue, + proto::MutationType::ByteMin => MutationType::ByteMin, + proto::MutationType::ByteMax => MutationType::ByteMax, + proto::MutationType::CompareAndClear => MutationType::CompareAndClear, + } +} diff --git a/engine/packages/universaldb/src/driver/postgres/commit.rs b/engine/packages/universaldb/src/driver/postgres/commit.rs new file mode 100644 index 0000000000..e61714dd0e --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/commit.rs @@ -0,0 +1,185 @@ +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result}; + +use crate::{error::DatabaseError, options::ConflictRangeType, tx_ops::Operation}; + +use super::{ + codec, + shared::{LeaseInfo, PostgresShared, commit_channel, reply_channel}, +}; + +/// How long to wait for a leader to be elected before giving up a submit as retryable. +const LEADER_WAIT_TIMEOUT: Duration = Duration::from_secs(5); +/// Backstop poll cadence while waiting for a commit result, in case a reply NOTIFY is missed. +const RESULT_POLL_INTERVAL: Duration = Duration::from_millis(250); + +/// Submit a follower transaction's commit to the leader and await the result. +/// +/// `read_version` is the watermark captured when this transaction opened its read snapshot. A pure +/// snapshot read-only transaction (no operations and no read conflict ranges) submits nothing. +pub async fn submit( + shared: &Arc, + read_version: i64, + operations: Vec, + conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, +) -> Result<()> { + // A transaction with no writes and no serializable read ranges has nothing to order or + // validate; it never needs the leader. + if operations.is_empty() + && conflict_ranges + .iter() + .all(|(_, _, kind)| matches!(kind, ConflictRangeType::Write)) + { + return Ok(()); + } + + let lease = wait_for_leader(shared).await?; + let payload = + codec::encode_commit_request(read_version.max(0) as u64, &conflict_ranges, &operations) + .context("failed to encode commit request")?; + let reply_channel = reply_channel(&shared.node_id); + + // Subscribe to our reply channel before inserting so we cannot miss the leader's NOTIFY. + let mut reply_rx = shared.listener.listen(&reply_channel).await; + + let conn = shared + .pool + .get() + .await + .context("failed to get connection for commit submit")?; + + let id: i64 = conn + .query_one( + "INSERT INTO udb_commit_requests (epoch, read_version, payload, reply_channel) + VALUES ($1, $2, $3, $4) + RETURNING id", + &[&lease.epoch, &read_version, &payload, &reply_channel], + ) + .await + .context("failed to enqueue commit request")? + .get(0); + + // Wake the leader's drain loop. + if let Err(err) = conn + .execute( + "SELECT pg_notify($1, $2)", + &[&commit_channel(&lease.leader_addr), &id.to_string()], + ) + .await + { + tracing::debug!( + ?err, + "failed to notify leader; relying on its poll backstop" + ); + } + + // Release the connection before waiting so a long wait does not pin a pool slot. The request + // row is durable, so await_result re-acquires a connection per poll. + drop(conn); + + await_result(shared, id, lease.epoch, &mut reply_rx).await +} + +/// Wait for a known leader, returning a retryable error if none is elected in time. +async fn wait_for_leader(shared: &Arc) -> Result { + let deadline = Instant::now() + LEADER_WAIT_TIMEOUT; + loop { + if let Some(lease) = shared.current_lease() { + return Ok(lease); + } + if Instant::now() >= deadline { + return Err(DatabaseError::NotCommitted.into()); + } + tokio::time::sleep(RESULT_POLL_INTERVAL).await; + } +} + +/// Poll the request row until it reaches a terminal status, woken by reply NOTIFYs with a polling +/// backstop. Bails as retryable if the leader epoch advances (our request is now orphaned and will +/// never be applied, so it is definitively not committed). +async fn await_result( + shared: &Arc, + id: i64, + submit_epoch: i64, + reply_rx: &mut tokio::sync::broadcast::Receiver, +) -> Result<()> { + loop { + // Re-acquire a connection per poll: the request row is durable, so a transient pool/query + // error just means we retry the poll rather than failing a possibly-applied commit. + match read_status(shared, id).await { + Ok(Some(Status::Committed)) => return Ok(()), + Ok(Some(Status::Conflict)) => return Err(DatabaseError::NotCommitted.into()), + Ok(Some(Status::Pending)) => {} + Ok(None) => { + // The row was GC'd before we observed a terminal status. Treat as not committed + // and let the retry loop resubmit. + return Err(DatabaseError::NotCommitted.into()); + } + Err(err) => { + tracing::debug!(?err, "transient error polling commit status, retrying"); + } + } + + // If a new leader took over, our old-epoch request will never be claimed. + if let Some(current) = shared.current_lease() { + if current.epoch != submit_epoch { + return Err(DatabaseError::NotCommitted.into()); + } + } + + tokio::select! { + res = reply_rx.recv() => { + match res { + Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + *reply_rx = shared + .listener + .listen(&reply_channel(&shared.node_id)) + .await; + } + } + } + _ = tokio::time::sleep(RESULT_POLL_INTERVAL) => {} + } + } +} + +enum Status { + Pending, + Committed, + Conflict, +} + +/// Read the current status of a commit request. `Ok(None)` means the row no longer exists. +async fn read_status(shared: &Arc, id: i64) -> Result> { + let conn = shared + .pool + .get() + .await + .context("failed to get connection for commit status poll")?; + + let row = conn + .query_opt( + "SELECT status FROM udb_commit_requests WHERE id = $1", + &[&id], + ) + .await + .context("failed to read commit request status")?; + + let Some(row) = row else { + return Ok(None); + }; + + let status: String = row.get(0); + let status = match status.as_str() { + "committed" => Status::Committed, + "conflict" => Status::Conflict, + // 'pending' or any in-flight state. + _ => Status::Pending, + }; + Ok(Some(status)) +} diff --git a/engine/packages/universaldb/src/driver/postgres/database.rs b/engine/packages/universaldb/src/driver/postgres/database.rs index 4d50d08ab3..6e24cf647d 100644 --- a/engine/packages/universaldb/src/driver/postgres/database.rs +++ b/engine/packages/universaldb/src/driver/postgres/database.rs @@ -13,6 +13,7 @@ use rivet_postgres_util::build_tls_config; use tokio::task::JoinHandle; use tokio_postgres_rustls::MakeRustlsConnect; use url::Url; +use uuid::Uuid; use crate::{ RetryableTransaction, Transaction, @@ -22,9 +23,15 @@ use crate::{ utils::{MaybeCommitted, calculate_tx_retry_backoff}, }; -use super::transaction::PostgresTransactionDriver; +use super::{ + listener::PgListener, resolver, shared::PostgresShared, transaction::PostgresTransactionDriver, +}; -const GC_INTERVAL: Duration = Duration::from_secs(5); +const GC_INTERVAL: Duration = Duration::from_secs(30); +/// Terminal and orphaned commit-request rows older than this are garbage collected. Must be well +/// beyond the longest a follower could spend awaiting a result, so a result is never deleted before +/// it is observed. +const COMMIT_ROW_MAX_AGE_SECS: i64 = 60; #[derive(Clone, Debug)] pub struct PostgresConfig { @@ -50,7 +57,7 @@ impl PostgresConfig { } pub struct PostgresDatabaseDriver { - pool: Pool, + shared: Arc, max_retries: AtomicI32, gc_handle: JoinHandle<()>, } @@ -63,7 +70,60 @@ impl PostgresDatabaseDriver { "creating PostgresDatabaseDriver" ); - // Create deadpool config from connection string + let ssl_disabled = if let Ok(url) = Url::parse(&config.connection_string) { + url.query_pairs() + .any(|(k, v)| k == "sslmode" && v == "disable") + } else { + false + }; + + let pool = Self::build_pool(&config, ssl_disabled)?; + + // Initialize the schema (idempotent). + { + let conn = pool + .get() + .await + .context("failed to get connection from postgres pool")?; + Self::init_schema(&conn).await?; + } + + // Unique per-process node id (no hyphens) used to name this node's NOTIFY channels. Kept + // short so `udb_commit_` stays within Postgres's 63-byte identifier limit. + let node_id = Uuid::new_v4().simple().to_string(); + + let listener = PgListener::new( + config.connection_string.clone(), + ssl_disabled, + config + .ssl_config + .as_ref() + .and_then(|c| c.ssl_root_cert_path.clone()), + config + .ssl_config + .as_ref() + .and_then(|c| c.ssl_client_cert_path.clone()), + config + .ssl_config + .as_ref() + .and_then(|c| c.ssl_client_key_path.clone()), + ); + + let shared = PostgresShared::new(pool, node_id, listener); + + // Every node runs the resolver; only the elected leader drains the commit queue. + resolver::spawn(shared.clone()); + + let gc_handle = Self::spawn_gc(shared.clone()); + + Ok(PostgresDatabaseDriver { + shared, + max_retries: AtomicI32::new(100), + gc_handle, + }) + } + + fn build_pool(config: &PostgresConfig, ssl_disabled: bool) -> Result { let mut pool_config = Config::new(); pool_config.url = Some(config.connection_string.clone()); pool_config.pool = Some(PoolConfig { @@ -74,21 +134,10 @@ impl PostgresDatabaseDriver { recycling_method: RecyclingMethod::Fast, }); - tracing::debug!("creating Postgres pool"); - - let ssl_disabled = if let Ok(url) = Url::parse(&config.connection_string) { - url.query_pairs() - .any(|(k, v)| k == "sslmode" && v == "disable") - } else { - false - }; - - let pool = if ssl_disabled { - let tls = tokio_postgres::NoTls; - + if ssl_disabled { pool_config - .create_pool(Some(Runtime::Tokio1), tls) - .context("failed to create postgres connection pool")? + .create_pool(Some(Runtime::Tokio1), tokio_postgres::NoTls) + .context("failed to create postgres connection pool") } else { let tls_config = build_tls_config( config @@ -104,139 +153,87 @@ impl PostgresDatabaseDriver { .as_ref() .and_then(|c| c.ssl_client_key_path.as_ref()), )?; - let tls = MakeRustlsConnect::new(tls_config); - pool_config - .create_pool(Some(Runtime::Tokio1), tls) - .context("failed to create postgres connection pool")? - }; - - tracing::debug!("Getting Postgres connection from pool"); - // Get a connection from the pool to create the table - let conn = pool - .get() - .await - .context("failed to get connection from postgres pool")?; - - // Enable btree gist - conn.execute("CREATE EXTENSION IF NOT EXISTS btree_gist", &[]) - .await - .context("failed to create btree_gist extension")?; - - conn.execute("CREATE UNLOGGED SEQUENCE IF NOT EXISTS global_version_seq START WITH 1 INCREMENT BY 1 MINVALUE 1", &[]) - .await - .context("failed to create global version sequence")?; + .create_pool(Some(Runtime::Tokio1), MakeRustlsConnect::new(tls_config)) + .context("failed to create postgres connection pool") + } + } - // Create the KV table if it doesn't exist - conn.execute( + async fn init_schema(conn: &deadpool_postgres::Client) -> Result<()> { + // Durable latest-value store. + conn.batch_execute( "CREATE TABLE IF NOT EXISTS kv ( key BYTEA PRIMARY KEY, value BYTEA NOT NULL - )", - &[], - ) - .await - .context("failed to create kv table")?; - - // Create range_type type if it doesn't exist - conn.execute( - "DO $$ BEGIN - CREATE TYPE range_type AS ENUM ('read', 'write'); - EXCEPTION - WHEN duplicate_object THEN null; - END $$", - &[], - ) - .await - .context("failed to create range_type enum")?; - - // Create bytearange type if it doesn't exist - conn.execute( - "DO $$ BEGIN - CREATE TYPE bytearange AS RANGE ( - SUBTYPE = bytea, - SUBTYPE_OPCLASS = bytea_ops - ); - EXCEPTION - WHEN duplicate_object THEN null; - END $$", - &[], + ); + + CREATE TABLE IF NOT EXISTS udb_lease ( + id INT PRIMARY KEY DEFAULT 1 CHECK (id = 1), + epoch BIGINT NOT NULL, + leader_addr TEXT NOT NULL, + durable_version BIGINT NOT NULL DEFAULT 0, + expires_at TIMESTAMPTZ NOT NULL + ); + + CREATE SEQUENCE IF NOT EXISTS udb_version_seq AS BIGINT + START WITH 1 INCREMENT BY 1 MINVALUE 1; + + CREATE TABLE IF NOT EXISTS udb_commit_requests ( + id BIGSERIAL PRIMARY KEY, + epoch BIGINT NOT NULL, + read_version BIGINT NOT NULL, + payload BYTEA NOT NULL, + reply_channel TEXT NOT NULL, + status TEXT NOT NULL DEFAULT 'pending', + commit_version BIGINT, + created_at TIMESTAMPTZ NOT NULL DEFAULT now() + ); + + CREATE INDEX IF NOT EXISTS udb_commit_requests_pending + ON udb_commit_requests (id) WHERE status = 'pending';", ) .await - .context("failed to create bytearange type")?; - - // Create the conflict ranges table for non-snapshot reads - // This enforces consistent reads for ranges by preventing overlapping conflict ranges - conn.execute( - "CREATE UNLOGGED TABLE IF NOT EXISTS conflict_ranges ( - range_data BYTEARANGE NOT NULL, - conflict_type range_type NOT NULL, - start_version BIGINT NOT NULL, - commit_version BIGINT NOT NULL, - ts timestamp NOT NULL DEFAULT now(), - - EXCLUDE USING gist ( - -- Conflict if byte range overlaps... - range_data WITH &&, - -- And if conflict types are different... - conflict_type WITH <>, - -- And if the txn versions overlap... - int8range(start_version, commit_version, '[]') WITH &&, - -- But not if the start_version is the same (from the same txn) - start_version WITH <> - ) - )", - &[], - ) - .await - .context("failed to create conflict_ranges table")?; + .context("failed to initialize postgres schema")?; - // Create index on ts column for efficient garbage collection - conn.execute( - "CREATE INDEX IF NOT EXISTS idx_conflict_ranges_ts ON conflict_ranges (ts)", - &[], - ) - .await - .context("failed to create index on conflict_ranges ts column")?; + Ok(()) + } - let pool2 = pool.clone(); - let gc_handle = tokio::spawn(async move { + fn spawn_gc(shared: Arc) -> JoinHandle<()> { + tokio::spawn(async move { let mut interval = tokio::time::interval(GC_INTERVAL); interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); loop { interval.tick().await; - tracing::debug!(status=?pool2.status(), "postgres pool status"); + let conn = match shared.pool.get().await { + Ok(conn) => conn, + Err(err) => { + tracing::debug!(?err, "failed to get connection for commit gc"); + continue; + } + }; - // NOTE: Transactions have a max limit of 5 seconds, we delete after 10 seconds for extra padding - // Delete old conflict ranges if let Err(err) = conn .execute( - "DELETE FROM conflict_ranges where ts < now() - interval '10 seconds'", - &[], + "DELETE FROM udb_commit_requests + WHERE created_at < now() - ($1::bigint * interval '1 second')", + &[&COMMIT_ROW_MAX_AGE_SECS], ) .await { - tracing::error!(?err, "failed postgres gc task"); + tracing::error!(?err, "failed postgres commit-queue gc"); } } - }); - - Ok(PostgresDatabaseDriver { - pool, - max_retries: AtomicI32::new(100), - gc_handle, }) } } impl DatabaseDriver for PostgresDatabaseDriver { fn create_txn(&self) -> Result { - // Pass the connection pool and config to the transaction driver - Ok(Transaction::new(Arc::new( - PostgresTransactionDriver::with_config(self.pool.clone()), - ))) + Ok(Transaction::new(Arc::new(PostgresTransactionDriver::new( + self.shared.clone(), + )))) } fn run<'a>( diff --git a/engine/packages/universaldb/src/driver/postgres/listener.rs b/engine/packages/universaldb/src/driver/postgres/listener.rs new file mode 100644 index 0000000000..e5a7919dfe --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/listener.rs @@ -0,0 +1,228 @@ +use std::{path::PathBuf, sync::Arc, time::Duration}; + +use futures_util::future::poll_fn; +use rivet_postgres_util::build_tls_config; +use scc::HashMap; +use tokio::{ + io::{AsyncRead, AsyncWrite}, + sync::{Mutex, broadcast}, +}; +use tokio_postgres::AsyncMessage; +use tokio_postgres_rustls::MakeRustlsConnect; + +/// How long to wait between reconnect attempts for the dedicated LISTEN connection. +const RECONNECT_BACKOFF: Duration = Duration::from_secs(1); +/// Capacity of each channel's broadcast buffer. Notifications are wakeup signals with a polling +/// backstop, so a lagged receiver only delays a wake, never drops a durable commit. +const BROADCAST_CAPACITY: usize = 1024; + +struct Subscription { + tx: broadcast::Sender, +} + +/// Owns a single dedicated Postgres connection used exclusively for `LISTEN`. Demultiplexes +/// incoming `NOTIFY` payloads to per-channel broadcast senders and re-`LISTEN`s every registered +/// channel after a reconnect. +/// +/// This is separate from the deadpool pool because deadpool recycles connections and drops the +/// async notification stream; LISTEN requires owning the connection's message stream directly. +pub struct PgListener { + conn_str: String, + ssl_disabled: bool, + ssl_root_cert_path: Option, + ssl_client_cert_path: Option, + ssl_client_key_path: Option, + channels: Arc>, + client: Arc>>, +} + +impl PgListener { + pub fn new( + conn_str: String, + ssl_disabled: bool, + ssl_root_cert_path: Option, + ssl_client_cert_path: Option, + ssl_client_key_path: Option, + ) -> Self { + let channels: Arc> = Arc::new(HashMap::new()); + let client: Arc>> = Arc::new(Mutex::new(None)); + + tokio::spawn(Self::connection_lifecycle( + conn_str.clone(), + ssl_disabled, + ssl_root_cert_path.clone(), + ssl_client_cert_path.clone(), + ssl_client_key_path.clone(), + channels.clone(), + client.clone(), + )); + + Self { + conn_str, + ssl_disabled, + ssl_root_cert_path, + ssl_client_cert_path, + ssl_client_key_path, + channels, + client, + } + } + + /// Subscribe to a channel, registering a `LISTEN` if this is the first subscriber. Returns a + /// broadcast receiver of notification payloads. Idempotent per channel. + pub async fn listen(&self, channel: &str) -> broadcast::Receiver { + match self.channels.entry_async(channel.to_string()).await { + scc::hash_map::Entry::Occupied(entry) => entry.get().tx.subscribe(), + scc::hash_map::Entry::Vacant(entry) => { + let (tx, rx) = broadcast::channel(BROADCAST_CAPACITY); + entry.insert_entry(Subscription { tx }); + + // Best-effort immediate LISTEN; the lifecycle task re-LISTENs on reconnect. + if let Some(client) = &*self.client.lock().await { + if let Err(err) = client.execute(&format!("LISTEN \"{channel}\""), &[]).await { + tracing::warn!(?err, %channel, "failed to LISTEN, will retry on reconnect"); + } + } + + rx + } + } + } + + async fn connection_lifecycle( + conn_str: String, + ssl_disabled: bool, + ssl_root_cert_path: Option, + ssl_client_cert_path: Option, + ssl_client_key_path: Option, + channels: Arc>, + client: Arc>>, + ) { + loop { + let connected = if ssl_disabled { + Self::connect_and_run(&conn_str, tokio_postgres::NoTls, &channels, &client).await + } else { + match build_tls_config( + ssl_root_cert_path.as_ref(), + ssl_client_cert_path.as_ref(), + ssl_client_key_path.as_ref(), + ) { + Ok(tls_config) => { + Self::connect_and_run( + &conn_str, + MakeRustlsConnect::new(tls_config), + &channels, + &client, + ) + .await + } + Err(err) => { + tracing::error!(?err, "failed to build listener TLS config"); + false + } + } + }; + + if !connected { + tokio::time::sleep(RECONNECT_BACKOFF).await; + } + } + } + + /// Connects, re-LISTENs all channels, then drives the notification poll loop until the + /// connection closes. Returns `true` if a connection was successfully established (so the caller + /// can skip the reconnect backoff). + async fn connect_and_run( + conn_str: &str, + tls: T, + channels: &Arc>, + client: &Arc>>, + ) -> bool + where + T: tokio_postgres::tls::MakeTlsConnect, + T::Stream: AsyncRead + AsyncWrite + Unpin + Send + 'static, + T::TlsConnect: Send, + >::Future: Send, + { + let (new_client, connection) = match tokio_postgres::connect(conn_str, tls).await { + Ok(pair) => pair, + Err(err) => { + tracing::error!(?err, "failed to connect postgres listener"); + return false; + } + }; + + let channels_poll = channels.clone(); + let poll_handle = + tokio::spawn(async move { Self::poll_connection(connection, channels_poll).await }); + + // Re-LISTEN all registered channels on the fresh connection. + let mut registered = Vec::new(); + channels + .iter_async(|k, _| { + registered.push(k.clone()); + true + }) + .await; + for channel in ®istered { + if let Err(err) = new_client + .execute(&format!("LISTEN \"{channel}\""), &[]) + .await + { + tracing::error!(?err, %channel, "failed to re-LISTEN channel after reconnect"); + } + } + + *client.lock().await = Some(new_client); + + // Block until the poll loop ends (connection closed or errored). + let _ = poll_handle.await; + + *client.lock().await = None; + + true + } + + async fn poll_connection( + mut connection: tokio_postgres::Connection, + channels: Arc>, + ) where + S: AsyncRead + AsyncWrite + Unpin, + T: AsyncRead + AsyncWrite + Unpin, + { + loop { + match poll_fn(|cx| connection.poll_message(cx)).await { + Some(Ok(AsyncMessage::Notification(note))) => { + if let Some(sub) = channels.get_async(note.channel()).await { + // Ignore send errors: no active receiver just means no one is waiting + // right now; the polling backstop covers them. + let _ = sub.tx.send(note.payload().to_string()); + } + } + Some(Ok(_)) => {} + Some(Err(err)) => { + tracing::warn!(?err, "postgres listener connection error"); + break; + } + None => { + tracing::warn!("postgres listener connection closed"); + break; + } + } + } + } +} + +impl Clone for PgListener { + fn clone(&self) -> Self { + Self { + conn_str: self.conn_str.clone(), + ssl_disabled: self.ssl_disabled, + ssl_root_cert_path: self.ssl_root_cert_path.clone(), + ssl_client_cert_path: self.ssl_client_cert_path.clone(), + ssl_client_key_path: self.ssl_client_key_path.clone(), + channels: self.channels.clone(), + client: self.client.clone(), + } + } +} diff --git a/engine/packages/universaldb/src/driver/postgres/mod.rs b/engine/packages/universaldb/src/driver/postgres/mod.rs index 1c24f9cb94..64f4bbd1bf 100644 --- a/engine/packages/universaldb/src/driver/postgres/mod.rs +++ b/engine/packages/universaldb/src/driver/postgres/mod.rs @@ -1,4 +1,9 @@ +mod codec; +mod commit; mod database; +mod listener; +mod resolver; +mod shared; mod transaction; mod transaction_task; diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs b/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs new file mode 100644 index 0000000000..d9729de884 --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/resolver/apply.rs @@ -0,0 +1,139 @@ +use anyhow::{Context, Result}; +use deadpool_postgres::Transaction; + +use crate::{ + atomic::apply_atomic_op, options::MutationType, tuple::Versionstamp, tx_ops::Operation, + versionstamp::substitute_raw_versionstamp, +}; + +/// Apply a winning transaction's operations to `kv` inside the leader's batch txn. +/// +/// `commit_version` is the Postgres-resolved version assigned to this commit (`nextval`). It is +/// substituted into the 8-byte committed-version slot of any versionstamped key/value so +/// versionstamps are globally monotonic with commit order across all follower processes. +pub async fn apply( + txn: &Transaction<'_>, + operations: Vec, + commit_version: u64, +) -> Result<()> { + // Distinguishes multiple versionstamped operations within a single commit so their 10-byte + // stamps stay unique (8-byte version shared, 2-byte counter incremented). + let mut versionstamp_counter: u16 = 0; + + for op in operations { + match op { + Operation::SetValue { key, value } => { + upsert(txn, &key, &value).await?; + } + Operation::Clear { key } => { + txn.execute("DELETE FROM kv WHERE key = $1", &[&key]) + .await + .context("failed to clear key")?; + } + Operation::ClearRange { begin, end } => { + txn.execute( + "DELETE FROM kv WHERE key >= $1 AND key < $2", + &[&begin, &end], + ) + .await + .context("failed to clear range")?; + } + Operation::AtomicOp { + key, + param, + op_type, + } => { + apply_atomic( + txn, + key, + param, + op_type, + commit_version, + &mut versionstamp_counter, + ) + .await?; + } + } + } + + Ok(()) +} + +async fn apply_atomic( + txn: &Transaction<'_>, + key: Vec, + param: Vec, + op_type: MutationType, + commit_version: u64, + versionstamp_counter: &mut u16, +) -> Result<()> { + match op_type { + MutationType::SetVersionstampedKey => { + let versionstamp = build_versionstamp(commit_version, versionstamp_counter); + let key = substitute_raw_versionstamp(key, &versionstamp) + .map_err(anyhow::Error::msg) + .context("failed substituting versionstamped key")?; + upsert(txn, &key, ¶m).await?; + } + MutationType::SetVersionstampedValue => { + let versionstamp = build_versionstamp(commit_version, versionstamp_counter); + let value = substitute_raw_versionstamp(param, &versionstamp) + .map_err(anyhow::Error::msg) + .context("failed substituting versionstamped value")?; + upsert(txn, &key, &value).await?; + } + // Read-modify-write atomics: the leader is the single writer, so reading the live value + // inside the apply txn and writing the result is serializable with no lost update. + MutationType::Add + | MutationType::And + | MutationType::BitAnd + | MutationType::Or + | MutationType::BitOr + | MutationType::Xor + | MutationType::BitXor + | MutationType::AppendIfFits + | MutationType::Max + | MutationType::Min + | MutationType::ByteMin + | MutationType::ByteMax + | MutationType::CompareAndClear => { + let current = txn + .query_opt("SELECT value FROM kv WHERE key = $1", &[&key]) + .await + .context("failed to read current value for atomic op")? + .map(|row| row.get::<_, Vec>(0)); + + let new_value = apply_atomic_op(current.as_deref(), ¶m, op_type); + + if let Some(new_value) = new_value { + upsert(txn, &key, &new_value).await?; + } else { + txn.execute("DELETE FROM kv WHERE key = $1", &[&key]) + .await + .context("failed to clear key after atomic op")?; + } + } + } + + Ok(()) +} + +async fn upsert(txn: &Transaction<'_>, key: &[u8], value: &[u8]) -> Result<()> { + txn.execute( + "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2", + &[&key, &value], + ) + .await + .context("failed to upsert kv")?; + Ok(()) +} + +/// Build a 10-byte versionstamp (plus the 2 user-version bytes the substitution helper ignores) +/// from the Postgres-resolved commit version and a per-commit counter. +fn build_versionstamp(commit_version: u64, counter: &mut u16) -> Versionstamp { + let mut bytes = [0u8; 12]; + bytes[0..8].copy_from_slice(&commit_version.to_be_bytes()); + bytes[8..10].copy_from_slice(&counter.to_be_bytes()); + *counter = counter.wrapping_add(1); + Versionstamp::from(bytes) +} diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs b/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs new file mode 100644 index 0000000000..a17b6c7759 --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/resolver/lease.rs @@ -0,0 +1,82 @@ +use anyhow::{Context, Result}; +use deadpool_postgres::Pool; + +use crate::driver::postgres::shared::LEASE_ID; + +/// Lease time-to-live. A leader renews well within this; a candidate may take over only after it +/// expires. +pub const LEASE_TTL_SECS: i64 = 10; + +/// Outcome of a leadership acquisition attempt. +pub struct Acquired { + pub epoch: i64, +} + +/// Attempt to acquire or take over the leader lease via an epoch CAS. Succeeds if there is no lease +/// row yet, or the existing lease has expired. Bumps `epoch` on every successful acquisition so a +/// superseded old leader is fenced out. +pub async fn try_acquire(pool: &Pool, node_id: &str) -> Result> { + let conn = pool + .get() + .await + .context("failed to get connection for lease acquire")?; + + // Take over an expired (or absent) lease. The INSERT seeds the singleton row on first ever + // election; thereafter the UPDATE path runs. + let row = conn + .query_opt( + "INSERT INTO udb_lease (id, epoch, leader_addr, durable_version, expires_at) + VALUES ($1, 1, $2, 0, now() + ($3 || ' seconds')::interval) + ON CONFLICT (id) DO UPDATE + SET epoch = udb_lease.epoch + 1, + leader_addr = EXCLUDED.leader_addr, + expires_at = now() + ($3 || ' seconds')::interval + WHERE udb_lease.expires_at < now() + RETURNING epoch", + &[&LEASE_ID, &node_id, &LEASE_TTL_SECS.to_string()], + ) + .await + .context("failed to run lease acquire query")?; + + Ok(row.map(|row| Acquired { epoch: row.get(0) })) +} + +/// Renew the lease, fenced on this leader's epoch. Returns `false` if the lease was lost (another +/// node took over, bumping the epoch), in which case the caller must step down. +pub async fn renew(pool: &Pool, node_id: &str, epoch: i64) -> Result { + let conn = pool + .get() + .await + .context("failed to get connection for lease renew")?; + + let updated = conn + .execute( + "UPDATE udb_lease + SET expires_at = now() + ($3 || ' seconds')::interval + WHERE id = $1 AND epoch = $2 AND leader_addr = $4", + &[&LEASE_ID, &epoch, &LEASE_TTL_SECS.to_string(), &node_id], + ) + .await + .context("failed to renew lease")?; + + Ok(updated == 1) +} + +/// Read the current durable version (`udb_lease.durable_version`). Used by a freshly elected leader +/// to learn the watermark floor it must continue from. +pub async fn current_durable_version(pool: &Pool) -> Result { + let conn = pool + .get() + .await + .context("failed to get connection for durable version read")?; + + let row = conn + .query_opt( + "SELECT durable_version FROM udb_lease WHERE id = $1", + &[&LEASE_ID], + ) + .await + .context("failed to read durable version")?; + + Ok(row.map(|row| row.get::<_, i64>(0)).unwrap_or(0)) +} diff --git a/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs new file mode 100644 index 0000000000..79f1f1f48b --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/resolver/mod.rs @@ -0,0 +1,336 @@ +mod apply; +mod lease; + +use std::{ + sync::Arc, + time::{Duration, Instant}, +}; + +use anyhow::{Context, Result}; + +use crate::{conflict_tracker::TransactionConflictTracker, transaction::TXN_TIMEOUT}; + +use super::shared::{LEASE_ID, LeaseInfo, PostgresShared, WATERMARK_CHANNEL, commit_channel}; + +/// Max commits resolved+applied per batch (group commit). Amortizes the resolver, Postgres +/// round-trips, and fsync across the batch. +const DRAIN_BATCH_SIZE: i64 = 256; + +/// How often a leader renews its lease. Must be comfortably under `LEASE_TTL_SECS`. +const RENEW_INTERVAL: Duration = Duration::from_secs(3); + +/// Backstop poll cadence so a missed `udb_commit` NOTIFY cannot stall the drain indefinitely. +const POLL_BACKSTOP: Duration = Duration::from_millis(50); + +/// How long a candidate waits before retrying election when another node holds the lease. +const ELECTION_RETRY: Duration = Duration::from_secs(2); + +enum DrainOutcome { + /// Processed zero or more requests; still leader. + Drained, + /// Lost the lease (epoch bumped by a new leader). Step down. + LostLease, +} + +/// Spawn the per-process resolver task. Every node runs this; only the elected leader drains the +/// commit queue. +pub fn spawn(shared: Arc) { + tokio::spawn(run(shared)); +} + +async fn run(shared: Arc) { + loop { + match lease::try_acquire(&shared.pool, &shared.node_id).await { + Ok(Some(acquired)) => { + tracing::info!(epoch = acquired.epoch, node_id = %shared.node_id, "acquired udb leader lease"); + if let Err(err) = lead(&shared, acquired.epoch).await { + tracing::error!(?err, "udb leader loop errored, stepping down"); + } + tracing::info!(epoch = acquired.epoch, "stepped down from udb leader"); + } + Ok(None) => { + tokio::time::sleep(ELECTION_RETRY).await; + } + Err(err) => { + tracing::warn!(?err, "failed udb lease acquire attempt"); + tokio::time::sleep(ELECTION_RETRY).await; + } + } + } +} + +/// Leader main loop: hold the lease, drain the commit queue on wake or poll, and renew the lease. +async fn lead(shared: &Arc, epoch: i64) -> Result<()> { + // Publish our own lease into the cache immediately so our local commits route to us. + shared.set_lease(LeaseInfo { + epoch, + leader_addr: shared.node_id.clone(), + }); + + // The recovery floor: a freshly elected leader has a cold conflict window, so reject commits + // whose read_version predates the floor until the window warms (one TXN_TIMEOUT), forcing + // those followers to take a fresh read_version. + let recovery_version = recovery_floor(shared).await?; + let recovery_deadline = Instant::now() + TXN_TIMEOUT; + + let tracker = TransactionConflictTracker::new(); + + let mut wake_rx = shared + .listener + .listen(&commit_channel(&shared.node_id)) + .await; + + let mut renew_interval = tokio::time::interval(RENEW_INTERVAL); + renew_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + let mut poll_interval = tokio::time::interval(POLL_BACKSTOP); + poll_interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + // Drain anything already queued before our first wake. + if matches!( + drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, + DrainOutcome::LostLease + ) { + return Ok(()); + } + + loop { + tokio::select! { + _ = renew_interval.tick() => { + if !lease::renew(&shared.pool, &shared.node_id, epoch).await? { + tracing::warn!(epoch, "lost udb lease on renew"); + return Ok(()); + } + } + res = wake_rx.recv() => { + match res { + Ok(_) | Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + wake_rx = shared.listener.listen(&commit_channel(&shared.node_id)).await; + } + } + if matches!( + drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, + DrainOutcome::LostLease + ) { + return Ok(()); + } + } + _ = poll_interval.tick() => { + if matches!( + drain(shared, epoch, &tracker, recovery_version, recovery_deadline).await?, + DrainOutcome::LostLease + ) { + return Ok(()); + } + } + } + } +} + +/// The version floor a freshly elected leader continues from: the higher of the durable watermark +/// and the sequence high-water. The LOGGED `udb_version_seq` is crash-safe, so this never regresses. +async fn recovery_floor(shared: &Arc) -> Result { + let durable = lease::current_durable_version(&shared.pool).await?; + + let conn = shared + .pool + .get() + .await + .context("failed to get connection for recovery floor")?; + let seq_high: i64 = conn + .query_one("SELECT last_value FROM udb_version_seq", &[]) + .await + .context("failed to read sequence high water")? + .get(0); + + Ok(durable.max(seq_high).max(0) as u64) +} + +/// Drain pending commit requests in id-ordered batches until none remain. Each batch resolves and +/// applies inside a single Postgres transaction (group commit), fenced on the leader's epoch. +async fn drain( + shared: &Arc, + epoch: i64, + tracker: &TransactionConflictTracker, + recovery_version: u64, + recovery_deadline: Instant, +) -> Result { + loop { + match drain_batch(shared, epoch, tracker, recovery_version, recovery_deadline).await? { + BatchOutcome::Empty => return Ok(DrainOutcome::Drained), + BatchOutcome::Processed => {} + BatchOutcome::LostLease => return Ok(DrainOutcome::LostLease), + } + } +} + +enum BatchOutcome { + Empty, + Processed, + LostLease, +} + +struct Reply { + channel: String, + id: i64, +} + +async fn drain_batch( + shared: &Arc, + epoch: i64, + tracker: &TransactionConflictTracker, + recovery_version: u64, + recovery_deadline: Instant, +) -> Result { + let mut conn = shared + .pool + .get() + .await + .context("failed to get connection for drain batch")?; + let txn = conn + .build_transaction() + .start() + .await + .context("failed to start drain batch txn")?; + + // Claim a batch in id order. FOR UPDATE SKIP LOCKED holds the rows for this txn so they are + // stamped terminal on COMMIT with no intermediate 'claimed' state to clean up. + let rows = txn + .query( + "SELECT id, read_version, payload, reply_channel + FROM udb_commit_requests + WHERE status = 'pending' AND epoch = $1 + ORDER BY id + LIMIT $2 + FOR UPDATE SKIP LOCKED", + &[&epoch, &DRAIN_BATCH_SIZE], + ) + .await + .context("failed to claim commit batch")?; + + if rows.is_empty() { + txn.rollback().await.ok(); + return Ok(BatchOutcome::Empty); + } + + let cold_window = Instant::now() < recovery_deadline; + let mut max_winner_cv: i64 = 0; + let mut replies = Vec::with_capacity(rows.len()); + + for row in &rows { + let id: i64 = row.get(0); + let read_version: i64 = row.get(1); + let payload: Vec = row.get(2); + let reply_channel: String = row.get(3); + + let decoded = super::codec::decode_commit_request(&payload) + .context("failed to decode commit payload")?; + + let commit_version: i64 = txn + .query_one("SELECT nextval('udb_version_seq')", &[]) + .await + .context("failed to get next commit version")? + .get(0); + + let start_version = read_version.max(0) as u64; + + // Cold-window guard: a commit whose read_version predates the recovery floor cannot be + // safely resolved against this leader's empty window. Reject it as retryable. + let conflicted = if cold_window && start_version < recovery_version { + true + } else { + tracker + .check_and_insert( + start_version, + commit_version.max(0) as u64, + decoded.conflict_ranges, + ) + .await + }; + + if conflicted { + txn.execute( + "UPDATE udb_commit_requests SET status = 'conflict' WHERE id = $1", + &[&id], + ) + .await + .context("failed to stamp conflict")?; + } else { + apply::apply(&txn, decoded.operations, commit_version.max(0) as u64) + .await + .context("failed to apply commit")?; + txn.execute( + "UPDATE udb_commit_requests SET status = 'committed', commit_version = $1 WHERE id = $2", + &[&commit_version, &id], + ) + .await + .context("failed to stamp committed")?; + max_winner_cv = max_winner_cv.max(commit_version); + } + + replies.push(Reply { + channel: reply_channel, + id, + }); + } + + // Advance the watermark, fenced on our epoch. A zombie old leader whose epoch was bumped sees + // zero rows updated and must step down before any of its writes become visible. + let new_durable: i64 = match txn + .query_opt( + "UPDATE udb_lease + SET durable_version = GREATEST(durable_version, $1) + WHERE id = $2 AND epoch = $3 + RETURNING durable_version", + &[&max_winner_cv, &LEASE_ID, &epoch], + ) + .await + .context("failed to advance watermark")? + { + Some(row) => row.get(0), + None => { + txn.rollback().await.ok(); + return Ok(BatchOutcome::LostLease); + } + }; + + txn.commit().await.context("failed to commit drain batch")?; + + // Watermark advances strictly after the apply txn is durably committed and visible, so a + // reader handed this read_version can never miss a write with commit_version <= read_version. + shared.advance_durable_version(new_durable); + + notify_after_commit(&conn, new_durable, &replies).await; + + Ok(BatchOutcome::Processed) +} + +/// Wake watermark listeners and the followers waiting on each processed request. Best-effort: a +/// missed NOTIFY is covered by the follower's polling backstop and the watermark refresh timer. +async fn notify_after_commit( + conn: &deadpool_postgres::Client, + new_durable: i64, + replies: &[Reply], +) { + if let Err(err) = conn + .execute( + "SELECT pg_notify($1, $2)", + &[&WATERMARK_CHANNEL, &new_durable.to_string()], + ) + .await + { + tracing::debug!(?err, "failed to notify watermark"); + } + + let channels: Vec<&str> = replies.iter().map(|r| r.channel.as_str()).collect(); + let ids: Vec = replies.iter().map(|r| r.id.to_string()).collect(); + if let Err(err) = conn + .execute( + "SELECT pg_notify(c, p) FROM unnest($1::text[], $2::text[]) AS t(c, p)", + &[&channels, &ids], + ) + .await + { + tracing::debug!(?err, "failed to notify commit replies"); + } +} diff --git a/engine/packages/universaldb/src/driver/postgres/shared.rs b/engine/packages/universaldb/src/driver/postgres/shared.rs new file mode 100644 index 0000000000..0ab2963158 --- /dev/null +++ b/engine/packages/universaldb/src/driver/postgres/shared.rs @@ -0,0 +1,161 @@ +use std::{ + sync::{ + Arc, + atomic::{AtomicI64, Ordering}, + }, + time::Duration, +}; + +use deadpool_postgres::Pool; +use tokio::sync::{Notify, watch}; + +use super::listener::PgListener; + +/// The singleton row id of `udb_lease`. +pub const LEASE_ID: i32 = 1; + +/// How often the follower refreshes its cached lease row (epoch, leader channel, watermark) as a +/// backstop to the `udb_watermark` NOTIFY. A stale-but-older watermark only widens the conflict +/// window, so this can be loose. +const LEASE_REFRESH_INTERVAL: Duration = Duration::from_millis(500); + +/// Channel a follower NOTIFYs (and the leader LISTENs) to wake the leader's drain loop. +pub fn commit_channel(node_id: &str) -> String { + format!("udb_commit_{node_id}") +} + +/// Channel the leader NOTIFYs (and a follower LISTENs) to deliver a commit result. +pub fn reply_channel(node_id: &str) -> String { + format!("udb_reply_{node_id}") +} + +/// Channel the leader NOTIFYs on every watermark advance; all nodes LISTEN. +pub const WATERMARK_CHANNEL: &str = "udb_watermark"; + +/// Cached view of the current leader lease, as seen by a follower. +#[derive(Clone, Debug)] +pub struct LeaseInfo { + pub epoch: i64, + /// Node id of the current leader, used to build its commit channel. + pub leader_addr: String, +} + +/// Process-wide state shared by the follower transaction tasks and the leader resolver. Every node +/// is both a follower (it submits its own commits) and a candidate leader. +pub struct PostgresShared { + pub pool: Pool, + /// Unique per-process id used to name this node's NOTIFY channels. + pub node_id: String, + pub listener: PgListener, + /// Highest durable commit version (`udb_lease.durable_version`); the follower read version. + durable_version: AtomicI64, + /// Pinged whenever `durable_version` advances. + watermark_notify: Notify, + lease_tx: watch::Sender>, + lease_rx: watch::Receiver>, +} + +impl PostgresShared { + pub fn new(pool: Pool, node_id: String, listener: PgListener) -> Arc { + let (lease_tx, lease_rx) = watch::channel(None); + let shared = Arc::new(Self { + pool, + node_id, + listener, + durable_version: AtomicI64::new(0), + watermark_notify: Notify::new(), + lease_tx, + lease_rx, + }); + + tokio::spawn(Self::cache_refresh_task(shared.clone())); + + shared + } + + /// The cached follower read version (`durable_version`). + pub fn read_version(&self) -> i64 { + self.durable_version.load(Ordering::SeqCst) + } + + /// Advance the cached watermark monotonically and wake any waiters. + pub fn advance_durable_version(&self, version: i64) { + let prev = self.durable_version.fetch_max(version, Ordering::SeqCst); + if version > prev { + self.watermark_notify.notify_waiters(); + } + } + + /// Current cached lease, if known. + pub fn current_lease(&self) -> Option { + self.lease_rx.borrow().clone() + } + + /// Publish a freshly observed/elected lease into the cache. + pub fn set_lease(&self, lease: LeaseInfo) { + let _ = self.lease_tx.send(Some(lease)); + } + + /// Background task: keep `durable_version` and the cached lease fresh via the `udb_watermark` + /// NOTIFY plus a periodic poll of `udb_lease`. + async fn cache_refresh_task(shared: Arc) { + let mut watermark_rx = shared.listener.listen(WATERMARK_CHANNEL).await; + let mut interval = tokio::time::interval(LEASE_REFRESH_INTERVAL); + interval.set_missed_tick_behavior(tokio::time::MissedTickBehavior::Skip); + + loop { + tokio::select! { + notify = watermark_rx.recv() => { + match notify { + Ok(payload) => { + if let Ok(version) = payload.parse::() { + shared.advance_durable_version(version); + } + } + Err(tokio::sync::broadcast::error::RecvError::Lagged(_)) => {} + Err(tokio::sync::broadcast::error::RecvError::Closed) => { + // Re-subscribe; the listener recreates the channel on reconnect. + watermark_rx = shared.listener.listen(WATERMARK_CHANNEL).await; + } + } + } + _ = interval.tick() => { + shared.refresh_lease_row().await; + } + } + } + } + + async fn refresh_lease_row(&self) { + let conn = match self.pool.get().await { + Ok(conn) => conn, + Err(err) => { + tracing::debug!(?err, "failed to get connection for lease refresh"); + return; + } + }; + + let row = conn + .query_opt( + "SELECT epoch, leader_addr, durable_version FROM udb_lease WHERE id = $1", + &[&LEASE_ID], + ) + .await; + + match row { + Ok(Some(row)) => { + let epoch: i64 = row.get(0); + let leader_addr: String = row.get(1); + let durable_version: i64 = row.get(2); + self.advance_durable_version(durable_version); + self.set_lease(LeaseInfo { epoch, leader_addr }); + } + Ok(None) => { + // No lease row yet; no leader elected. + } + Err(err) => { + tracing::debug!(?err, "failed to refresh lease row"); + } + } + } +} diff --git a/engine/packages/universaldb/src/driver/postgres/transaction.rs b/engine/packages/universaldb/src/driver/postgres/transaction.rs index f80cbba393..cc315feba4 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction.rs @@ -1,11 +1,13 @@ use std::{ future::Future, pin::Pin, - sync::atomic::{AtomicBool, Ordering}, + sync::{ + Arc, + atomic::{AtomicBool, Ordering}, + }, }; use anyhow::{Context, Result}; -use deadpool_postgres::Pool; use tokio::sync::{OnceCell, mpsc, oneshot}; use crate::{ @@ -18,33 +20,35 @@ use crate::{ value::{Slice, Value, Values}, }; -use super::transaction_task::{TransactionCommand, TransactionTask}; +use super::{ + shared::PostgresShared, + transaction_task::{TransactionCommand, TransactionTask}, +}; pub struct PostgresTransactionDriver { - pool: Pool, + shared: Arc, operations: TransactionOperations, committed: AtomicBool, tx_sender: OnceCell>, } impl PostgresTransactionDriver { - pub fn with_config(pool: Pool) -> Self { + pub fn new(shared: Arc) -> Self { PostgresTransactionDriver { - pool, + shared, operations: TransactionOperations::default(), committed: AtomicBool::new(false), tx_sender: OnceCell::new(), } } - /// Get or create the transaction task + /// Get or create the transaction task that owns this transaction's read snapshot. async fn ensure_transaction(&self) -> Result<&mpsc::UnboundedSender> { self.tx_sender .get_or_try_init(|| async { let (sender, receiver) = mpsc::unbounded_channel(); - // Spawn the transaction task with serializable isolation - let task = TransactionTask::new(self.pool.clone(), receiver); + let task = TransactionTask::new(self.shared.clone(), receiver); tokio::spawn(task.run()); anyhow::Ok(sender) diff --git a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs index 297751f608..c4d3e3f9f6 100644 --- a/engine/packages/universaldb/src/driver/postgres/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/postgres/transaction_task.rs @@ -1,17 +1,18 @@ -use anyhow::{Context, Result, anyhow, bail}; -use deadpool_postgres::{Pool, Transaction}; +use std::sync::Arc; + +use anyhow::{Result, anyhow, bail}; +use deadpool_postgres::Transaction; use tokio::sync::{mpsc, oneshot}; use tokio_postgres::IsolationLevel; use crate::{ - atomic::apply_atomic_op, - error::DatabaseError, - options::{ConflictRangeType, MutationType}, + options::ConflictRangeType, tx_ops::Operation, value::{KeyValue, Slice, Values}, - versionstamp::{generate_versionstamp, substitute_raw_versionstamp}, }; +use super::{commit, shared::PostgresShared}; + pub enum TransactionCommand { // Read operations Get { @@ -48,70 +49,57 @@ pub enum TransactionCommand { }, } -/// TransactionTask runs in a separate tokio task to manage a PostgreSQL transaction. -/// -/// This design is necessary because PostgreSQL transactions have lifetime constraints -/// that don't work well with the FoundationDB-style API. Specifically: -/// - The transaction must outlive all references to it -/// - We can't store the transaction in a mutex due to lifetime issues with the connection +/// TransactionTask runs in a separate tokio task to own a single pinned PostgreSQL `REPEATABLE READ` +/// snapshot connection for the lifetime of a follower transaction. /// -/// By running in a separate task and communicating via channels, we avoid these lifetime -/// issues while maintaining a single serializable transaction for all operations. +/// Reads go directly against this snapshot (they never involve the leader). Commits delegate to +/// [`commit::submit`], which enqueues the request on the leader and awaits the result. The +/// `read_version` is captured from the cached watermark before the snapshot is opened, so no write +/// with `commit_version <= read_version` can be invisible to the snapshot. pub struct TransactionTask { - pool: Pool, + shared: Arc, receiver: mpsc::UnboundedReceiver, } impl TransactionTask { - pub fn new(pool: Pool, receiver: mpsc::UnboundedReceiver) -> Self { - Self { pool, receiver } + pub fn new( + shared: Arc, + receiver: mpsc::UnboundedReceiver, + ) -> Self { + Self { shared, receiver } } pub async fn run(mut self) { - // Get connection from pool - let mut conn = match self.pool.get().await { + // Capture the read version BEFORE opening the snapshot so the snapshot reflects every write + // with commit_version <= read_version. + let read_version = self.shared.read_version(); + + let mut conn = match self.shared.pool.get().await { Ok(conn) => conn, Err(_) => { - // If we can't get a connection, respond to all pending commands with errors self.fail_receiver().await; return; } }; - // Start the read transaction let tx = match conn .build_transaction() .isolation_level(IsolationLevel::RepeatableRead) + .read_only(true) .start() .await { Ok(tx) => tx, Err(_) => { - // If we can't start a transaction, respond to all pending commands with errors self.fail_receiver().await; return; } }; - // TODO: Parallelize future - let start_version = match tx - .query_one("SELECT nextval('global_version_seq')", &[]) - .await - { - Ok(row) => row.get::<_, i64>(0), - Err(err) => { - tracing::error!(?err, "failed to get postgres txn start_version"); - self.fail_receiver().await; - return; - } - }; - - // Process commands while let Some(cmd) = self.receiver.recv().await { match cmd { TransactionCommand::Get { key, response } => { let result = self.handle_get(&tx, &key).await; - let _ = response.send(result); } TransactionCommand::GetKey { @@ -121,7 +109,6 @@ impl TransactionTask { response, } => { let result = self.handle_get_key(&tx, &key, or_equal, offset).await; - let _ = response.send(result); } TransactionCommand::GetRange { @@ -148,7 +135,6 @@ impl TransactionTask { reverse, ) .await; - let _ = response.send(result); } TransactionCommand::Commit { @@ -156,14 +142,12 @@ impl TransactionTask { conflict_ranges, response, } => { - let (_, result) = tokio::join!( - // Read-only txn, we don't care about the result - tx.commit(), - self.handle_commit(start_version, operations, conflict_ranges), - ); - + // The read snapshot is read-only; release it and submit the commit to the leader. + let _ = tx.commit().await; + let result = + commit::submit(&self.shared, read_version, operations, conflict_ranges) + .await; let _ = response.send(result); - // Exit after commit return; } TransactionCommand::GetEstimatedRangeSize { @@ -174,13 +158,12 @@ impl TransactionTask { let result = self .handle_get_estimated_range_size(&tx, &begin, &end) .await; - let _ = response.send(result); } } } - // If the channel is closed, the transaction will be rolled back when dropped + // If the channel is closed, the snapshot transaction is rolled back when dropped. } async fn handle_get(&mut self, tx: &Transaction<'_>, key: &[u8]) -> Result> { @@ -234,27 +217,18 @@ impl TransactionTask { reverse: bool, ) -> Result { // Determine SQL operators based on key selector types - // For begin selector: - // first_greater_or_equal: or_equal = false, offset = 1 -> ">=" - // first_greater_than: or_equal = true, offset = 1 -> ">" let begin_op = if begin_offset == 1 { if begin_or_equal { ">" } else { ">=" } } else { - // This shouldn't happen for begin in range queries ">=" }; - // For end selector: - // first_greater_than: or_equal = true, offset = 1 -> "<=" - // first_greater_or_equal: or_equal = false, offset = 1 -> "<" let end_op = if end_offset == 1 { if end_or_equal { "<=" } else { "<" } } else { - // This shouldn't happen for end in range queries "<" }; - // Build query with CTE that adds conflict range let query = if reverse { if let Some(limit) = limit { format!( @@ -301,22 +275,22 @@ impl TransactionTask { begin: &[u8], end: &[u8], ) -> Result { - // Sample's 1% of the range + // Sample 1% of the range. let query = " WITH range_stats AS ( - SELECT + SELECT COUNT(*) as estimated_count, COALESCE(SUM(pg_column_size(key) + pg_column_size(value)), 0) as sample_size - FROM kv TABLESAMPLE SYSTEM(1) + FROM kv TABLESAMPLE SYSTEM(1) WHERE key >= $1 AND key < $2 ), table_stats AS ( - SELECT reltuples::bigint as total_rows - FROM pg_class + SELECT reltuples::bigint as total_rows + FROM pg_class WHERE relname = 'kv' AND relkind = 'r' ) - SELECT - CASE + SELECT + CASE WHEN r.estimated_count = 0 THEN 0 ELSE (r.sample_size * 100)::bigint END as estimated_size @@ -329,165 +303,6 @@ impl TransactionTask { .map_err(map_postgres_error) } - async fn handle_commit( - &mut self, - start_version: i64, - operations: Vec, - conflict_ranges: Vec<(Vec, Vec, ConflictRangeType)>, - ) -> Result<()> { - // Get connection from pool - let mut conn = self.pool.get().await?; - - // Start write transaction - let tx = conn - .build_transaction() - .isolation_level(IsolationLevel::ReadCommitted) - .start() - .await - .context("failed to start write txn")?; - - let mut begins = Vec::with_capacity(conflict_ranges.len()); - let mut ends = Vec::with_capacity(conflict_ranges.len()); - let mut conflict_types = Vec::with_capacity(conflict_ranges.len()); - - for (begin, end, conflict_type) in conflict_ranges { - let conflict_type = match conflict_type { - ConflictRangeType::Read => "read", - ConflictRangeType::Write => "write", - }; - - begins.push(begin); - ends.push(end); - conflict_types.push(conflict_type); - } - - let query = " - WITH data AS ( - SELECT nextval('global_version_seq') AS commit_version - ) - INSERT INTO conflict_ranges (range_data, conflict_type, start_version, commit_version) - SELECT - bytearange(begin_key, end_key, '[)'), - conflict_type::range_type, - $4, - data.commit_version - FROM UNNEST($1::bytea[], $2::bytea[], $3::text[]) AS t(begin_key, end_key, conflict_type), data"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - // Insert all conflict ranges at once - tx.execute(&stmt, &[&begins, &ends, &conflict_types, &start_version]) - .await - .map_err(map_postgres_error)?; - - let transaction_versionstamp = generate_versionstamp(0); - - for op in operations { - match op { - Operation::SetValue { key, value } => { - let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key, &value]) - .await - .map_err(map_postgres_error)?; - } - Operation::Clear { key } => { - let query = "DELETE FROM kv WHERE key = $1"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key]) - .await - .map_err(map_postgres_error)?; - } - Operation::ClearRange { begin, end } => { - let query = "DELETE FROM kv WHERE key >= $1 AND key < $2"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&begin, &end]) - .await - .map_err(map_postgres_error)?; - } - Operation::AtomicOp { - key, - param, - op_type, - } => { - if matches!(op_type, MutationType::SetVersionstampedKey) { - let key = substitute_raw_versionstamp(key, &transaction_versionstamp) - .map_err(anyhow::Error::msg) - .context("failed substituting versionstamped key")?; - let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key, ¶m]) - .await - .map_err(map_postgres_error)?; - continue; - } - - if matches!(op_type, MutationType::SetVersionstampedValue) { - let value = substitute_raw_versionstamp(param, &transaction_versionstamp) - .map_err(anyhow::Error::msg) - .context("failed substituting versionstamped value")?; - let query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; - let stmt = tx.prepare_cached(query).await.map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key, &value]) - .await - .map_err(map_postgres_error)?; - continue; - } - - // TODO: All operations need to be done on the sql side, not in rust - - // Get current value from database - let current_query = "SELECT value FROM kv WHERE key = $1"; - let stmt = tx - .prepare_cached(current_query) - .await - .map_err(map_postgres_error)?; - - let current_row = tx - .query_opt(&stmt, &[&key]) - .await - .map_err(map_postgres_error)?; - - // Extract current value or use None if key doesn't exist - let current_value = current_row.map(|row| row.get::<_, Vec>(0)); - let current_slice = current_value.as_deref(); - - // Apply atomic operation - let new_value = apply_atomic_op(current_slice, ¶m, op_type); - - // Store the result - if let Some(new_value) = new_value { - let update_query = "INSERT INTO kv (key, value) VALUES ($1, $2) ON CONFLICT (key) DO UPDATE SET value = $2"; - let stmt = tx - .prepare_cached(update_query) - .await - .map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key, &new_value]) - .await - .map_err(map_postgres_error)?; - } else { - let update_query = "DELETE FROM kv WHERE key = $1"; - let stmt = tx - .prepare_cached(update_query) - .await - .map_err(map_postgres_error)?; - - tx.execute(&stmt, &[&key]) - .await - .map_err(map_postgres_error)?; - } - } - } - } - - tx.commit().await.map_err(map_postgres_error) - } - async fn fail_receiver(&mut self) { while let Some(cmd) = self.receiver.recv().await { match cmd { @@ -511,31 +326,19 @@ impl TransactionTask { } } -/// Maps PostgreSQL error to DatabaseError +/// Maps a PostgreSQL error from the read path to a `DatabaseError` where appropriate. fn map_postgres_error(err: tokio_postgres::Error) -> anyhow::Error { - let error_str = if let Some(err) = err.as_db_error() { - err.to_string() - } else { - err.to_string() - }; + let error_str = err.to_string(); - if error_str.contains("exclusion_violation") - || error_str.contains("violates exclusion constraint") - { - // Retryable - another transaction has a conflicting range - DatabaseError::NotCommitted.into() - } else if error_str.contains("serialization failure") + if error_str.contains("serialization failure") || error_str.contains("could not serialize") || error_str.contains("deadlock detected") { - // Retryable - transaction conflict - DatabaseError::NotCommitted.into() + crate::error::DatabaseError::NotCommitted.into() } else if error_str.contains("current transaction is aborted") { - // Returned by the rest of the commands in a txn if it failed for exclusion reasons - DatabaseError::NotCommitted.into() + crate::error::DatabaseError::NotCommitted.into() } else { tracing::error!(%err, "postgres error"); - // Non-retryable error anyhow::Error::new(err) } } diff --git a/engine/packages/universaldb/src/driver/rocksdb/database.rs b/engine/packages/universaldb/src/driver/rocksdb/database.rs index ec0175ee5a..ba186d872e 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/database.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/database.rs @@ -17,9 +17,9 @@ use crate::{ utils::{MaybeCommitted, calculate_tx_retry_backoff}, }; -use super::{ - transaction::RocksDbTransactionDriver, transaction_conflict_tracker::TransactionConflictTracker, -}; +use crate::conflict_tracker::TransactionConflictTracker; + +use super::transaction::RocksDbTransactionDriver; pub struct RocksDbDatabaseDriver { db: Arc, diff --git a/engine/packages/universaldb/src/driver/rocksdb/mod.rs b/engine/packages/universaldb/src/driver/rocksdb/mod.rs index a24bd72603..b18e28edca 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/mod.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/mod.rs @@ -1,6 +1,5 @@ mod database; mod transaction; -mod transaction_conflict_tracker; mod transaction_task; pub use database::RocksDbDatabaseDriver; diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction.rs index e62c3eda79..85cf5edf5c 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction.rs @@ -21,10 +21,9 @@ use crate::{ value::{Slice, Value, Values}, }; -use super::{ - transaction_conflict_tracker::TransactionConflictTracker, - transaction_task::{TransactionCommand, TransactionTask}, -}; +use crate::conflict_tracker::TransactionConflictTracker; + +use super::transaction_task::{TransactionCommand, TransactionTask}; pub struct RocksDbTransactionDriver { db: Arc, diff --git a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs index a92f351fcc..aedbfcbe93 100644 --- a/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs +++ b/engine/packages/universaldb/src/driver/rocksdb/transaction_task.rs @@ -6,9 +6,9 @@ use rocksdb::{ }; use tokio::sync::{mpsc, oneshot}; -use super::transaction_conflict_tracker::TransactionConflictTracker; use crate::{ atomic::apply_atomic_op, + conflict_tracker::TransactionConflictTracker, error::DatabaseError, key_selector::KeySelector, options::{ConflictRangeType, MutationType}, @@ -389,9 +389,11 @@ impl TransactionTask { } } + // rocksdb generates both start and commit versions from the in-process counter. + let commit_version = self.txn_conflict_tracker.next_global_version(); if self .txn_conflict_tracker - .check_and_insert(start_version, conflict_ranges) + .check_and_insert(start_version, commit_version, conflict_ranges) .await { return Err(DatabaseError::NotCommitted.into()); diff --git a/engine/packages/universaldb/src/lib.rs b/engine/packages/universaldb/src/lib.rs index 96260a177d..1b6fb78b87 100644 --- a/engine/packages/universaldb/src/lib.rs +++ b/engine/packages/universaldb/src/lib.rs @@ -1,4 +1,5 @@ pub(crate) mod atomic; +pub(crate) mod conflict_tracker; mod database; pub mod driver; pub mod error; diff --git a/engine/packages/universaldb/tests/integration.rs b/engine/packages/universaldb/tests/integration.rs index 52fa703116..45a5b3bdf7 100644 --- a/engine/packages/universaldb/tests/integration.rs +++ b/engine/packages/universaldb/tests/integration.rs @@ -137,11 +137,9 @@ async fn test_database_options(db: &Database) { use std::sync::Arc; use std::sync::atomic::{AtomicU32, Ordering}; use universaldb::error::DatabaseError; - use universaldb::options::DatabaseOption; // Test setting transaction retry limit - db.set_option(DatabaseOption::TransactionRetryLimit(5)) - .unwrap(); + db.txn_retry_limit(5).unwrap(); // Test that retry limit is respected by forcing conflicts let conflict_counter = Arc::new(AtomicU32::new(0)); @@ -172,8 +170,7 @@ async fn test_database_options(db: &Database) { assert_eq!(final_attempts, 3, "Should have taken 3 attempts"); // Now set a very low retry limit and verify it fails - db.set_option(DatabaseOption::TransactionRetryLimit(1)) - .unwrap(); + db.txn_retry_limit(1).unwrap(); let conflict_counter2 = Arc::new(AtomicU32::new(0)); let counter_clone2 = conflict_counter2.clone(); @@ -204,8 +201,7 @@ async fn test_database_options(db: &Database) { assert!(attempts <= 2, "Should not retry more than limit + 1"); // Reset to a reasonable retry limit - db.set_option(DatabaseOption::TransactionRetryLimit(100)) - .unwrap(); + db.txn_retry_limit(100).unwrap(); } async fn clear_test_namespace(db: &Database) -> Result<()> { diff --git a/engine/sdks/rust/depot-protocol/Cargo.toml b/engine/sdks/rust/depot-protocol/Cargo.toml index 274836220a..99d8117349 100644 --- a/engine/sdks/rust/depot-protocol/Cargo.toml +++ b/engine/sdks/rust/depot-protocol/Cargo.toml @@ -8,6 +8,7 @@ edition.workspace = true [dependencies] anyhow.workspace = true +rivet-util.workspace = true serde_bare.workspace = true serde.workspace = true vbare.workspace = true diff --git a/engine/sdks/rust/universaldb-commit/Cargo.toml b/engine/sdks/rust/universaldb-commit/Cargo.toml new file mode 100644 index 0000000000..126c5cb35e --- /dev/null +++ b/engine/sdks/rust/universaldb-commit/Cargo.toml @@ -0,0 +1,16 @@ +[package] +name = "rivet-universaldb-commit" +publish = false +version.workspace = true +authors.workspace = true +license.workspace = true +edition.workspace = true + +[dependencies] +anyhow.workspace = true +serde_bare.workspace = true +serde.workspace = true +vbare.workspace = true + +[build-dependencies] +vbare-compiler.workspace = true diff --git a/engine/sdks/rust/universaldb-commit/build.rs b/engine/sdks/rust/universaldb-commit/build.rs new file mode 100644 index 0000000000..6400be6f2b --- /dev/null +++ b/engine/sdks/rust/universaldb-commit/build.rs @@ -0,0 +1,64 @@ +use std::{ + fs, + path::{Path, PathBuf}, +}; + +fn main() -> Result<(), Box> { + let manifest_dir = std::env::var("CARGO_MANIFEST_DIR")?; + let out_dir = PathBuf::from(std::env::var("OUT_DIR")?); + let workspace_root = Path::new(&manifest_dir) + .parent() + .and_then(|p| p.parent()) + .and_then(|p| p.parent()) + .ok_or("Failed to find workspace root")?; + + let schema_dir = workspace_root + .join("sdks") + .join("schemas") + .join("universaldb-commit"); + println!("cargo:rerun-if-changed={}", schema_dir.display()); + + let (highest_version, _) = find_highest_version(&schema_dir); + + let cfg = vbare_compiler::Config::default(); + vbare_compiler::process_schemas_with_config(&schema_dir, &cfg)?; + + // Append protocol version constant to generated file + let combined_imports_path = out_dir.join("combined_imports.rs"); + let mut combined = fs::read_to_string(&combined_imports_path)?; + combined.push_str(&format!( + "\npub const PROTOCOL_VERSION: u16 = {};\n", + highest_version + )); + fs::write(combined_imports_path, combined)?; + + Ok(()) +} + +fn find_highest_version(schema_dir: &Path) -> (u32, PathBuf) { + let mut highest_version = 0; + let mut highest_version_path = PathBuf::new(); + + for entry in fs::read_dir(schema_dir).unwrap().flatten() { + if !entry.path().is_dir() { + let path = entry.path(); + let bare_name = path + .file_name() + .unwrap() + .to_str() + .unwrap() + .split_once('.') + .unwrap() + .0; + + if let Ok(version) = bare_name[1..].parse::() { + if version > highest_version { + highest_version = version; + highest_version_path = path; + } + } + } + } + + (highest_version, highest_version_path) +} diff --git a/engine/sdks/rust/universaldb-commit/src/generated.rs b/engine/sdks/rust/universaldb-commit/src/generated.rs new file mode 100644 index 0000000000..84801af8dc --- /dev/null +++ b/engine/sdks/rust/universaldb-commit/src/generated.rs @@ -0,0 +1 @@ +include!(concat!(env!("OUT_DIR"), "/combined_imports.rs")); diff --git a/engine/sdks/rust/universaldb-commit/src/lib.rs b/engine/sdks/rust/universaldb-commit/src/lib.rs new file mode 100644 index 0000000000..41954bbe75 --- /dev/null +++ b/engine/sdks/rust/universaldb-commit/src/lib.rs @@ -0,0 +1,6 @@ +pub mod generated; +pub mod versioned; + +// Re-export latest +pub use generated::PROTOCOL_VERSION; +pub use generated::v1::*; diff --git a/engine/sdks/rust/universaldb-commit/src/versioned.rs b/engine/sdks/rust/universaldb-commit/src/versioned.rs new file mode 100644 index 0000000000..a9d637aa81 --- /dev/null +++ b/engine/sdks/rust/universaldb-commit/src/versioned.rs @@ -0,0 +1,38 @@ +use anyhow::{Ok, Result, bail}; +use vbare::OwnedVersionedData; + +use crate::generated::v1; + +// Only v1 exists today. When adding v2+, generate converters with +// `scripts/vbare-gen-converters` (see the envoy-protocol package for the +// resulting `versioned/` module layout) and wire them in here. +pub enum CommitRequest { + V1(v1::CommitRequest), +} + +impl OwnedVersionedData for CommitRequest { + type Latest = v1::CommitRequest; + + fn wrap_latest(latest: v1::CommitRequest) -> Self { + CommitRequest::V1(latest) + } + + fn unwrap_latest(self) -> Result { + match self { + CommitRequest::V1(data) => Ok(data), + } + } + + fn deserialize_version(payload: &[u8], version: u16) -> Result { + match version { + 1 => Ok(CommitRequest::V1(serde_bare::from_slice(payload)?)), + _ => bail!("invalid version: {version}"), + } + } + + fn serialize_version(self, _version: u16) -> Result> { + match self { + CommitRequest::V1(data) => serde_bare::to_vec(&data).map_err(Into::into), + } + } +} diff --git a/engine/sdks/schemas/universaldb-commit/v1.bare b/engine/sdks/schemas/universaldb-commit/v1.bare new file mode 100644 index 0000000000..2377312f84 --- /dev/null +++ b/engine/sdks/schemas/universaldb-commit/v1.bare @@ -0,0 +1,70 @@ +# Commit-queue wire format for the Postgres leader-resolver UDB driver. +# +# Followers encode a CommitRequest into the `payload` column of +# `udb_commit_requests`; the leader decodes it to resolve and apply. +# Rust-only (never leaves the engine), but versioned so rolling deploys can +# skew follower vs leader code. + +type ConflictRangeType enum { + READ + WRITE +} + +type ConflictRange struct { + begin: data + end: data + kind: ConflictRangeType +} + +# Order MUST match universaldb::options::MutationType declaration order so the +# enum tag round-trips. 15 variants today; never reorder, only append. +type MutationType enum { + ADD + AND + BIT_AND + OR + BIT_OR + XOR + BIT_XOR + APPEND_IF_FITS + MAX + MIN + SET_VERSIONSTAMPED_KEY + SET_VERSIONSTAMPED_VALUE + BYTE_MIN + BYTE_MAX + COMPARE_AND_CLEAR +} + +type SetValue struct { + key: data + value: data +} + +type Clear struct { + key: data +} + +type ClearRange struct { + begin: data + end: data +} + +type AtomicOp struct { + key: data + param: data + opType: MutationType +} + +type Operation union { + SetValue | + Clear | + ClearRange | + AtomicOp +} + +type CommitRequest struct { + readVersion: u64 + conflictRanges: list + operations: list +} diff --git a/scripts/run/postgres.sh b/scripts/run/postgres.sh index 30acd498f1..2ac817b598 100755 --- a/scripts/run/postgres.sh +++ b/scripts/run/postgres.sh @@ -2,7 +2,7 @@ set -euo pipefail CONTAINER_NAME="rivet-engine-postgres" -POSTGRES_IMAGE="postgres:17" +POSTGRES_IMAGE="postgres:18" if docker ps --all --format '{{.Names}}' | grep -qw "${CONTAINER_NAME}"; then if docker ps --format '{{.Names}}' | grep -qw "${CONTAINER_NAME}"; then diff --git a/scripts/run/restore-postgres.sh b/scripts/run/restore-postgres.sh index bd2b50e0b4..0a82410608 100755 --- a/scripts/run/restore-postgres.sh +++ b/scripts/run/restore-postgres.sh @@ -2,7 +2,7 @@ set -euo pipefail CONTAINER_NAME="rivet-engine-postgres" -POSTGRES_IMAGE="postgres:17" +POSTGRES_IMAGE="postgres:18" if [ $# -ne 1 ]; then echo "Usage: $0 " diff --git a/self-host/compose/dev-host/docker-compose.yml b/self-host/compose/dev-host/docker-compose.yml index 3b54702d63..96db5a57d0 100644 --- a/self-host/compose/dev-host/docker-compose.yml +++ b/self-host/compose/dev-host/docker-compose.yml @@ -72,7 +72,7 @@ services: network_mode: host postgres: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres diff --git a/self-host/compose/dev-multidc-multinode/docker-compose.yml b/self-host/compose/dev-multidc-multinode/docker-compose.yml index 75c500850b..7e421144c8 100644 --- a/self-host/compose/dev-multidc-multinode/docker-compose.yml +++ b/self-host/compose/dev-multidc-multinode/docker-compose.yml @@ -85,7 +85,7 @@ services: condition: service_healthy postgres-dc-a: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres @@ -355,7 +355,7 @@ services: - rivet-network-dc-a postgres-dc-b: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres @@ -617,7 +617,7 @@ services: - rivet-network-dc-b postgres-dc-c: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres diff --git a/self-host/compose/dev-multidc/docker-compose.yml b/self-host/compose/dev-multidc/docker-compose.yml index f20db41ff3..79cb619e0a 100644 --- a/self-host/compose/dev-multidc/docker-compose.yml +++ b/self-host/compose/dev-multidc/docker-compose.yml @@ -85,7 +85,7 @@ services: condition: service_healthy postgres-dc-a: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres @@ -235,7 +235,7 @@ services: - rivet-network-dc-a postgres-dc-b: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres @@ -377,7 +377,7 @@ services: - rivet-network-dc-b postgres-dc-c: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres diff --git a/self-host/compose/dev-multinode/docker-compose.yml b/self-host/compose/dev-multinode/docker-compose.yml index 381ece396d..a432743057 100644 --- a/self-host/compose/dev-multinode/docker-compose.yml +++ b/self-host/compose/dev-multinode/docker-compose.yml @@ -81,7 +81,7 @@ services: condition: service_healthy postgres: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres diff --git a/self-host/compose/dev/docker-compose.yml b/self-host/compose/dev/docker-compose.yml index 6c5dcc0ef2..91f82dce45 100644 --- a/self-host/compose/dev/docker-compose.yml +++ b/self-host/compose/dev/docker-compose.yml @@ -81,7 +81,7 @@ services: condition: service_healthy postgres: restart: unless-stopped - image: postgres:17-alpine + image: postgres:18-alpine environment: - POSTGRES_USER=postgres - POSTGRES_PASSWORD=postgres diff --git a/self-host/compose/template/src/docker-compose.ts b/self-host/compose/template/src/docker-compose.ts index ca2f6b1fc8..6db4a8ab9a 100644 --- a/self-host/compose/template/src/docker-compose.ts +++ b/self-host/compose/template/src/docker-compose.ts @@ -171,7 +171,7 @@ export function generateDockerCompose(context: TemplateContext) { ); services[postgresServiceName] = { restart: "unless-stopped", - image: "postgres:17-alpine", + image: "postgres:18-alpine", environment: [ "POSTGRES_USER=postgres", "POSTGRES_PASSWORD=postgres", diff --git a/self-host/k8s/engine/12-postgres-statefulset.yaml b/self-host/k8s/engine/12-postgres-statefulset.yaml index e1e14a4b75..c035d26611 100644 --- a/self-host/k8s/engine/12-postgres-statefulset.yaml +++ b/self-host/k8s/engine/12-postgres-statefulset.yaml @@ -20,7 +20,7 @@ spec: spec: containers: - name: postgres - image: postgres:17 + image: postgres:18 args: - postgres - -c diff --git a/website/src/content/docs/self-hosting/docker-compose.mdx b/website/src/content/docs/self-hosting/docker-compose.mdx index e031abc118..5d77eae8af 100644 --- a/website/src/content/docs/self-hosting/docker-compose.mdx +++ b/website/src/content/docs/self-hosting/docker-compose.mdx @@ -188,7 +188,7 @@ PostgreSQL is the recommended backend for multi-node self-hosted deployments tod ```yaml services: postgres: - image: postgres:15 + image: postgres:18 environment: POSTGRES_DB: rivet POSTGRES_USER: rivet diff --git a/website/src/content/docs/self-hosting/docker-container.mdx b/website/src/content/docs/self-hosting/docker-container.mdx index 5cc474ef4a..cbbd39cb6b 100644 --- a/website/src/content/docs/self-hosting/docker-container.mdx +++ b/website/src/content/docs/self-hosting/docker-container.mdx @@ -157,7 +157,7 @@ docker run -d \ -e POSTGRES_USER=rivet \ -e POSTGRES_PASSWORD=rivet_password \ -v postgres-data:/var/lib/postgresql/data \ - postgres:15 + postgres:18 # Run Rivet Engine docker run -d \