Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

4 changes: 4 additions & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -66,6 +66,7 @@ members = [
"engine/sdks/rust/depot-protocol",
"engine/sdks/rust/test-envoy",
"engine/sdks/rust/ups-protocol",
"engine/sdks/rust/universaldb-commit",
"rivetkit-rust/packages/actor-persist",
"rivetkit-rust/packages/client",
"rivetkit-rust/packages/engine-process",
Expand Down Expand Up @@ -638,6 +639,9 @@ members = [
[workspace.dependencies.rivet-ups-protocol]
path = "engine/sdks/rust/ups-protocol"

[workspace.dependencies.rivet-universaldb-commit]
path = "engine/sdks/rust/universaldb-commit"

[profile.dev]
overflow-checks = false
# "line-tables-only" produces just the line-number DWARF needed for stack
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/test-deps-docker/src/database.rs
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ impl TestDatabase {
});

let docker_config = DockerRunConfig {
image: "postgres:17".to_string(),
image: "postgres:18".to_string(),
container_name: container_name.clone(),
port_mapping: (port, 5432),
env_vars: vec![
Expand Down
5 changes: 5 additions & 0 deletions engine/packages/universaldb/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@ edition.workspace = true
[dependencies]
anyhow.workspace = true
async-trait.workspace = true
base64.workspace = true
deadpool-postgres.workspace = true
foundationdb-tuple.workspace = true
futures-util.workspace = true
Expand All @@ -18,16 +19,20 @@ rand.workspace = true
rivet-metrics.workspace = true
rivet-postgres-util.workspace = true
rivet-tracing-utils.workspace = true
rivet-universaldb-commit.workspace = true
rocksdb.workspace = true
scc.workspace = true
serde.workspace = true
tempfile.workspace = true
thiserror.workspace = true
tokio-postgres-rustls.workspace = true
tokio-postgres.workspace = true
tokio-util.workspace = true
tokio.workspace = true
tracing.workspace = true
url.workspace = true
uuid.workspace = true
vbare.workspace = true

[dev-dependencies]
rivet-config.workspace = true
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,7 @@ use tokio::sync::Mutex;
use crate::options::ConflictRangeType;

// Transactions cannot live longer than 5 seconds so we don't need to store transaction conflicts longer than
// that
// that.
const TXN_CONFLICT_TTL: Duration = Duration::from_secs(10);

#[derive(Debug)]
Expand All @@ -22,6 +22,15 @@ struct PreviousTransaction {
conflict_ranges: Vec<(Vec<u8>, Vec<u8>, ConflictRangeType)>,
}

/// In-process FoundationDB-style resolver. Holds the last `TXN_CONFLICT_TTL` of committed
/// transactions and rejects a committing transaction if any retained transaction has both an
/// overlapping version window and an overlapping conflict range of a differing type.
///
/// Used by the rocksdb driver (single process) and by the postgres leader-resolver. The two
/// differ only in where the commit version comes from: rocksdb generates it from the in-process
/// `global_version` counter, while the postgres leader assigns it from the durable
/// `udb_version_seq` so it survives leader failover and matches the versionstamp. For that reason
/// `check_and_insert` takes the commit version from the caller instead of generating it.
#[derive(Clone)]
pub struct TransactionConflictTracker {
// NOTE: We use a mutex because we need to lock reads across all active txns. This could be optimized to
Expand All @@ -40,18 +49,23 @@ impl TransactionConflictTracker {
}
}

/// Each number returned is unique.
/// Each number returned is unique. Used by the in-process rocksdb driver to assign both start
/// and commit versions. The postgres leader does not use this; it assigns versions from the
/// durable Postgres sequence.
pub fn next_global_version(&self) -> u64 {
self.global_version.fetch_add(1, Ordering::SeqCst)
}

/// Returns `true` on conflict (same polarity as the original rocksdb tracker). The caller
/// supplies `commit_version` (e.g. `nextval('udb_version_seq')` on the postgres leader, or
/// `next_global_version()` on rocksdb) so version assignment stays the caller's responsibility.
pub async fn check_and_insert(
&self,
txn1_start_version: u64,
txn1_commit_version: u64,
txn1_conflict_ranges: Vec<(Vec<u8>, Vec<u8>, ConflictRangeType)>,
) -> bool {
let mut txns = self.txns.lock().await;
let txn1_commit_version = self.next_global_version();

// Prune old entries
txns.retain(|txn| txn.insert_instant.elapsed() < TXN_CONFLICT_TTL);
Expand Down
167 changes: 167 additions & 0 deletions engine/packages/universaldb/src/driver/postgres/codec.rs
Original file line number Diff line number Diff line change
@@ -0,0 +1,167 @@
use anyhow::Result;
use rivet_universaldb_commit::{self as proto, versioned};
use vbare::OwnedVersionedData;

use crate::{
options::{ConflictRangeType, MutationType},
tx_ops::Operation,
};

/// Decoded form of a `udb_commit_requests.payload` blob.
///
/// `read_version` is intentionally omitted: it is also denormalized into the `read_version` column,
/// which is what the leader's drain reads, so decoding it here would be dead.
pub struct DecodedCommit {
pub conflict_ranges: Vec<(Vec<u8>, Vec<u8>, ConflictRangeType)>,
pub operations: Vec<Operation>,
}

/// Encode a follower's commit request to the versioned BARE wire format with an embedded version
/// header so a leader running older or newer code can still decode it during a rolling deploy.
pub fn encode_commit_request(
read_version: u64,
conflict_ranges: &[(Vec<u8>, Vec<u8>, ConflictRangeType)],
operations: &[Operation],
) -> Result<Vec<u8>> {
let request = proto::CommitRequest {
read_version,
conflict_ranges: conflict_ranges
.iter()
.map(|(begin, end, kind)| proto::ConflictRange {
begin: begin.clone(),
end: end.clone(),
kind: conflict_range_type_to_proto(*kind),
})
.collect(),
operations: operations.iter().map(operation_to_proto).collect(),
};

versioned::CommitRequest::wrap_latest(request)
.serialize_with_embedded_version(proto::PROTOCOL_VERSION)
}

/// Decode a `udb_commit_requests.payload` blob produced by [`encode_commit_request`].
pub fn decode_commit_request(payload: &[u8]) -> Result<DecodedCommit> {
let request = versioned::CommitRequest::deserialize_with_embedded_version(payload)?;

let conflict_ranges = request
.conflict_ranges
.into_iter()
.map(|range| {
(
range.begin,
range.end,
conflict_range_type_from_proto(range.kind),
)
})
.collect();

let operations = request
.operations
.into_iter()
.map(operation_from_proto)
.collect();

Ok(DecodedCommit {
conflict_ranges,
operations,
})
}

fn conflict_range_type_to_proto(kind: ConflictRangeType) -> proto::ConflictRangeType {
match kind {
ConflictRangeType::Read => proto::ConflictRangeType::Read,
ConflictRangeType::Write => proto::ConflictRangeType::Write,
}
}

fn conflict_range_type_from_proto(kind: proto::ConflictRangeType) -> ConflictRangeType {
match kind {
proto::ConflictRangeType::Read => ConflictRangeType::Read,
proto::ConflictRangeType::Write => ConflictRangeType::Write,
}
}

fn operation_to_proto(op: &Operation) -> proto::Operation {
match op {
Operation::SetValue { key, value } => proto::Operation::SetValue(proto::SetValue {
key: key.clone(),
value: value.clone(),
}),
Operation::Clear { key } => proto::Operation::Clear(proto::Clear { key: key.clone() }),
Operation::ClearRange { begin, end } => proto::Operation::ClearRange(proto::ClearRange {
begin: begin.clone(),
end: end.clone(),
}),
Operation::AtomicOp {
key,
param,
op_type,
} => proto::Operation::AtomicOp(proto::AtomicOp {
key: key.clone(),
param: param.clone(),
op_type: mutation_type_to_proto(*op_type),
}),
}
}

fn operation_from_proto(op: proto::Operation) -> Operation {
match op {
proto::Operation::SetValue(proto::SetValue { key, value }) => {
Operation::SetValue { key, value }
}
proto::Operation::Clear(proto::Clear { key }) => Operation::Clear { key },
proto::Operation::ClearRange(proto::ClearRange { begin, end }) => {
Operation::ClearRange { begin, end }
}
proto::Operation::AtomicOp(proto::AtomicOp {
key,
param,
op_type,
}) => Operation::AtomicOp {
key,
param,
op_type: mutation_type_from_proto(op_type),
},
}
}

fn mutation_type_to_proto(op_type: MutationType) -> proto::MutationType {
match op_type {
MutationType::Add => proto::MutationType::Add,
MutationType::And => proto::MutationType::And,
MutationType::BitAnd => proto::MutationType::BitAnd,
MutationType::Or => proto::MutationType::Or,
MutationType::BitOr => proto::MutationType::BitOr,
MutationType::Xor => proto::MutationType::Xor,
MutationType::BitXor => proto::MutationType::BitXor,
MutationType::AppendIfFits => proto::MutationType::AppendIfFits,
MutationType::Max => proto::MutationType::Max,
MutationType::Min => proto::MutationType::Min,
MutationType::SetVersionstampedKey => proto::MutationType::SetVersionstampedKey,
MutationType::SetVersionstampedValue => proto::MutationType::SetVersionstampedValue,
MutationType::ByteMin => proto::MutationType::ByteMin,
MutationType::ByteMax => proto::MutationType::ByteMax,
MutationType::CompareAndClear => proto::MutationType::CompareAndClear,
}
}

fn mutation_type_from_proto(op_type: proto::MutationType) -> MutationType {
match op_type {
proto::MutationType::Add => MutationType::Add,
proto::MutationType::And => MutationType::And,
proto::MutationType::BitAnd => MutationType::BitAnd,
proto::MutationType::Or => MutationType::Or,
proto::MutationType::BitOr => MutationType::BitOr,
proto::MutationType::Xor => MutationType::Xor,
proto::MutationType::BitXor => MutationType::BitXor,
proto::MutationType::AppendIfFits => MutationType::AppendIfFits,
proto::MutationType::Max => MutationType::Max,
proto::MutationType::Min => MutationType::Min,
proto::MutationType::SetVersionstampedKey => MutationType::SetVersionstampedKey,
proto::MutationType::SetVersionstampedValue => MutationType::SetVersionstampedValue,
proto::MutationType::ByteMin => MutationType::ByteMin,
proto::MutationType::ByteMax => MutationType::ByteMax,
proto::MutationType::CompareAndClear => MutationType::CompareAndClear,
}
}
Loading
Loading