Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion engine/packages/api-peer/src/internal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ pub async fn set_tracing_config(
body: SetTracingConfigRequest,
) -> Result<SetTracingConfigResponse> {
// Broadcast message to all services via UPS
let message = serde_json::to_vec(&body)?;
let message = rivet_util::serde::json_to_vec!(&body)?;

ctx.ups()?
.publish(TracingConfigSubject, &message, PublishOpts::broadcast())
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/cache/src/req_config.rs
Original file line number Diff line number Diff line change
Expand Up @@ -365,7 +365,7 @@ impl RequestConfig {
keys: cache_keys.clone(),
};

let payload = serde_json::to_vec(&message)?;
let payload = rivet_util::serde::json_to_vec!(&message)?;

if let Err(err) = ups
.publish(
Expand Down Expand Up @@ -495,12 +495,12 @@ impl RequestConfig {
keys,
getter,
|value: &Value| -> Result<Vec<u8>> {
serde_json::to_vec(&value)
rivet_util::serde::json_to_vec!(&value)
.map_err(Error::SerdeEncode)
.map_err(Into::into)
},
|value: &[u8]| -> Result<Value> {
serde_json::from_slice(value)
rivet_util::serde::json_from_slice!(value)
.map_err(Error::SerdeDecode)
.map_err(Into::into)
},
Expand Down
6 changes: 5 additions & 1 deletion engine/packages/cli/src/commands/logs.rs
Original file line number Diff line number Diff line change
Expand Up @@ -177,7 +177,11 @@ fn print_line(entry: &LogEntry, json: bool, color: bool) -> Result<()> {
}

let severity = if color {
format!("\x1b[{}m{}\x1b[0m", severity_color(&entry.severity), entry.severity)
format!(
"\x1b[{}m{}\x1b[0m",
severity_color(&entry.severity),
entry.severity
)
} else {
entry.severity.clone()
};
Expand Down
6 changes: 3 additions & 3 deletions engine/packages/depot/Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -30,19 +30,19 @@ rivet-error.workspace = true
rivet-metrics.workspace = true
rivet-pools.workspace = true
rivet-runtime.workspace = true
rivet-util.workspace = true
rusqlite.workspace = true
scc.workspace = true
serde.workspace = true
serde_bare.workspace = true
serde_json.workspace = true
serde.workspace = true
sha2.workspace = true
rusqlite.workspace = true
tempfile.workspace = true
tokio.workspace = true
tokio-util.workspace = true
tracing.workspace = true
universaldb.workspace = true
universalpubsub.workspace = true
util.workspace = true
uuid.workspace = true
vbare.workspace = true

Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline-macros/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -403,7 +403,7 @@ pub fn signal(attr: TokenStream, item: TokenStream) -> TokenStream {
}

fn parse(_name: &str, body: &serde_json::value::RawValue) -> gas::prelude::WorkflowResult<Self> {
serde_json::from_str(body.get()).map_err(WorkflowError::DeserializeSignalBody)
rivet_util::serde::json_from_str!(body.get()).map_err(WorkflowError::DeserializeSignalBody)
}
}
};
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/builder/common/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -129,7 +129,7 @@ impl<T: Signal + Serialize> SignalBuilder<T> {
tracing::Span::current().record("signal_id", signal_id.to_string());

// Serialize input
let input_val = serde_json::value::to_raw_value(&self.body)
let input_val = rivet_util::serde::json_to_raw_value!(&self.body)
.map_err(WorkflowError::SerializeSignalBody)?;

match (
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/builder/common/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -81,7 +81,7 @@ where
return self;
}

match serde_json::to_value(&v) {
match rivet_util::serde::json_to_value!(&v) {
Ok(v) => {
self.tags.insert(k.to_string(), v);
}
Expand Down Expand Up @@ -125,7 +125,7 @@ where
}

// Serialize input
let input_val = serde_json::value::to_raw_value(&input)
let input_val = rivet_util::serde::json_to_raw_value!(&input)
.map_err(WorkflowError::SerializeWorkflowInput)?;

let actual_workflow_id = self
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/gasoline/src/builder/workflow/lupe.rs
Original file line number Diff line number Diff line change
Expand Up @@ -72,7 +72,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {

(loop_event.iteration, state, output, None)
} else {
let state_val = serde_json::value::to_raw_value(&state)
let state_val = rivet_util::serde::json_to_raw_value!(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Clone data to move into future
Expand Down Expand Up @@ -219,7 +219,7 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
if iteration % commit_interval.unwrap_or(DEFAULT_LOOP_COMMIT_INTERVAL)
== 0
{
let state_val = serde_json::value::to_raw_value(&state)
let state_val = rivet_util::serde::json_to_raw_value!(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Clone data to move into future
Expand Down Expand Up @@ -251,9 +251,9 @@ impl<'a, S: Serialize + DeserializeOwned> LoopBuilder<'a, S> {
Loop::Break(res) => {
iteration += 1;

let state_val = serde_json::value::to_raw_value(&state)
let state_val = rivet_util::serde::json_to_raw_value!(&state)
.map_err(WorkflowError::SerializeLoopOutput)?;
let output_val = serde_json::value::to_raw_value(&res)
let output_val = rivet_util::serde::json_to_raw_value!(&res)
.map_err(WorkflowError::SerializeLoopOutput)?;

// Commit loop output and final state to db. Note that we don't defer this because
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/builder/workflow/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -76,7 +76,7 @@ impl<'a, M: Message> MessageBuilder<'a, M> {
let start_instant = Instant::now();

// Serialize body
let body_val = serde_json::value::to_raw_value(&self.body)
let body_val = rivet_util::serde::json_to_raw_value!(&self.body)
.map_err(WorkflowError::SerializeMessageBody)?;
let topic = self.topic.unwrap_or_else(|| "*".to_string());
let tags = serde_json::Value::Object(
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/builder/workflow/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -145,7 +145,7 @@ impl<'a, T: Signal + Serialize> SignalBuilder<'a, T> {
let db_write_duration;

// Serialize input
let input_val = serde_json::value::to_raw_value(&self.body)
let input_val = rivet_util::serde::json_to_raw_value!(&self.body)
.map_err(WorkflowError::SerializeSignalBody)?;

match (
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/builder/workflow/sub_workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -155,7 +155,7 @@ where
}

// Serialize input
let input_val = serde_json::value::to_raw_value(input)
let input_val = rivet_util::serde::json_to_raw_value!(input)
.map_err(WorkflowError::SerializeWorkflowOutput)?;

let actual_sub_workflow_id = ctx
Expand Down Expand Up @@ -228,7 +228,7 @@ where
// Err for version mismatch
self.ctx.compare_version("sub workflow", self.version)?;

let input_val = serde_json::value::to_raw_value(&input)
let input_val = rivet_util::serde::json_to_raw_value!(&input)
.map_err(WorkflowError::SerializeWorkflowInput)?;
let mut branch = self
.ctx
Expand Down
7 changes: 4 additions & 3 deletions engine/packages/gasoline/src/ctx/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -96,8 +96,8 @@ impl MessageCtx {
let ts = duration_since_epoch.as_millis() as i64;

// Serialize the body
let body_buf =
serde_json::to_string(&message_body).map_err(WorkflowError::SerializeMessage)?;
let body_buf = rivet_util::serde::json_to_string!(&message_body)
.map_err(WorkflowError::SerializeMessage)?;
let body_buf_len = body_buf.len();
let body_buf = serde_json::value::RawValue::from_string(body_buf)
.map_err(WorkflowError::SerializeMessage)?;
Expand All @@ -111,7 +111,8 @@ impl MessageCtx {
ts,
body: &body_buf,
};
let message_buf = serde_json::to_vec(&message).map_err(WorkflowError::SerializeMessage)?;
let message_buf =
rivet_util::serde::json_to_vec!(&message).map_err(WorkflowError::SerializeMessage)?;

tracing::debug!(
%subject,
Expand Down
8 changes: 4 additions & 4 deletions engine/packages/gasoline/src/ctx/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -302,9 +302,9 @@ impl WorkflowCtx {
tracing::debug!("activity success");

// Write output
let input_val = serde_json::value::to_raw_value(input)
let input_val = rivet_util::serde::json_to_raw_value!(input)
.map_err(WorkflowError::SerializeActivityInput)?;
let output_val = serde_json::value::to_raw_value(&output)
let output_val = rivet_util::serde::json_to_raw_value!(&output)
.map_err(WorkflowError::SerializeActivityOutput)?;

tokio::try_join!(
Expand Down Expand Up @@ -346,7 +346,7 @@ impl WorkflowCtx {
tracing::error!(?err, "activity error");

let err_str = err.to_string();
let input_val = serde_json::value::to_raw_value(input)
let input_val = rivet_util::serde::json_to_raw_value!(input)
.map_err(WorkflowError::SerializeActivityInput)?;

// Write error (failed state)
Expand Down Expand Up @@ -380,7 +380,7 @@ impl WorkflowCtx {
tracing::debug!("activity timeout");

let err_str = err.to_string();
let input_val = serde_json::value::to_raw_value(input)
let input_val = rivet_util::serde::json_to_raw_value!(input)
.map_err(WorkflowError::SerializeActivityInput)?;

self.db
Expand Down
8 changes: 5 additions & 3 deletions engine/packages/gasoline/src/db/mod.rs
Original file line number Diff line number Diff line change
Expand Up @@ -310,17 +310,19 @@ pub struct WorkflowData {

impl WorkflowData {
pub fn parse_input<W: Workflow>(&self) -> WorkflowResult<W::Input> {
serde_json::from_str(self.input.get()).map_err(WorkflowError::DeserializeWorkflowInput)
rivet_util::serde::json_from_str!(self.input.get())
.map_err(WorkflowError::DeserializeWorkflowInput)
}

pub fn parse_state<T: DeserializeOwned>(&self) -> WorkflowResult<T> {
serde_json::from_str(self.state.get()).map_err(WorkflowError::DeserializeWorkflowState)
rivet_util::serde::json_from_str!(self.state.get())
.map_err(WorkflowError::DeserializeWorkflowState)
}

pub fn parse_output<W: Workflow>(&self) -> WorkflowResult<Option<W::Output>> {
self.output
.as_ref()
.map(|x| serde_json::from_str(x.get()))
.map(|x| rivet_util::serde::json_from_str!(x.get()))
.transpose()
.map_err(WorkflowError::DeserializeWorkflowOutput)
}
Expand Down
7 changes: 4 additions & 3 deletions engine/packages/gasoline/src/history/event.rs
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,7 @@ impl ActivityEvent {
pub fn parse_output<O: DeserializeOwned>(&self) -> WorkflowResult<Option<O>> {
self.output
.as_ref()
.map(|x| serde_json::from_str(x.get()))
.map(|x| rivet_util::serde::json_from_str!(x.get()))
.transpose()
.map_err(WorkflowError::DeserializeActivityOutput)
}
Expand Down Expand Up @@ -166,13 +166,14 @@ pub struct LoopEvent {

impl LoopEvent {
pub fn parse_state<S: DeserializeOwned>(&self) -> WorkflowResult<S> {
serde_json::from_str(self.state.get()).map_err(WorkflowError::DeserializeLoopState)
rivet_util::serde::json_from_str!(self.state.get())
.map_err(WorkflowError::DeserializeLoopState)
}

pub fn parse_output<O: DeserializeOwned>(&self) -> WorkflowResult<Option<O>> {
self.output
.as_ref()
.map(|x| serde_json::from_str(x.get()))
.map(|x| rivet_util::serde::json_from_str!(x.get()))
.transpose()
.map_err(WorkflowError::DeserializeLoopOutput)
}
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/message.rs
Original file line number Diff line number Diff line change
Expand Up @@ -35,7 +35,7 @@ where
wrapper: PubsubMessageWrapper<'_>,
) -> WorkflowResult<Self> {
// Deserialize the body
let body = serde_json::from_str(wrapper.body.get())
let body = rivet_util::serde::json_from_str!(wrapper.body.get())
.map_err(WorkflowError::DeserializeMessageBody)?;

Ok(PubsubMessage {
Expand All @@ -51,7 +51,7 @@ where
pub(crate) fn deserialize_wrapper<'a>(
buf: &'a [u8],
) -> WorkflowResult<PubsubMessageWrapper<'a>> {
serde_json::from_slice(buf).map_err(WorkflowError::DeserializeMessage)
rivet_util::serde::json_from_slice!(buf).map_err(WorkflowError::DeserializeMessage)
}
}

Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/registry.rs
Original file line number Diff line number Diff line change
Expand Up @@ -62,7 +62,7 @@ impl Registry {
run: |ctx| {
async move {
// Deserialize input
let input = serde_json::from_str(ctx.input().get())
let input = rivet_util::serde::json_from_str!(ctx.input().get())
.map_err(WorkflowError::DeserializeWorkflowInput)?;

// Run workflow
Expand All @@ -79,7 +79,7 @@ impl Registry {
};

// Serialize output
let output_val = serde_json::value::to_raw_value(&output)
let output_val = rivet_util::serde::json_to_raw_value!(&output)
.map_err(WorkflowError::SerializeWorkflowOutput)?;

Ok(output_val)
Expand Down
2 changes: 1 addition & 1 deletion engine/packages/gasoline/src/signal.rs
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,7 @@ macro_rules! join_signal {
if name == <$types as gas::signal::Signal>::NAME {
std::result::Result::Ok(
Self::$names(
serde_json::from_str(body.get())
rivet_util::serde::json_from_str!(body.get())
.map_err(WorkflowError::DeserializeSignalBody)?
)
)
Expand Down
4 changes: 2 additions & 2 deletions engine/packages/gasoline/src/workflow.rs
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,7 @@ impl<'a, T: DeserializeOwned + Serialize> StateGuard<'a, T> {
pub(crate) fn new(
guard: MutexGuard<'a, (Box<serde_json::value::RawValue>, bool)>,
) -> Result<Self> {
let value = serde_json::from_str::<T>(guard.0.get())?;
let value = rivet_util::observe!(serde_json::from_str::<T>(guard.0.get())?);

Ok(Self {
guard,
Expand All @@ -60,7 +60,7 @@ impl<'a, T: DeserializeOwned + Serialize> std::ops::DerefMut for StateGuard<'a,
impl<'a, T: DeserializeOwned + Serialize> Drop for StateGuard<'a, T> {
fn drop(&mut self) {
// TODO: Somehow don't panic when committing state back into mutex
self.guard.0 = serde_json::value::to_raw_value(&self.inner).expect("bad state");
self.guard.0 = rivet_util::serde::json_to_raw_value!(&self.inner).expect("bad state");
}
}

Expand Down
16 changes: 1 addition & 15 deletions engine/packages/ups-broadcast/src/lib.rs
Original file line number Diff line number Diff line change
Expand Up @@ -5,8 +5,6 @@ use universalpubsub::NextOutput;
use universalpubsub::PublishOpts;
use universalpubsub::Subject;

mod sim;

pub const BROADCAST_TOPIC: &str = "rivet.ups.broadcast";

pub struct BroadcastSubject;
Expand All @@ -28,7 +26,7 @@ impl Subject for BroadcastSubject {
}

#[tracing::instrument(skip_all)]
pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
pub async fn start(_config: rivet_config::Config, pools: rivet_pools::Pools) -> Result<()> {
let ups = pools.ups()?;
let mut sub = ups.subscribe(BroadcastSubject).await?;

Expand All @@ -38,18 +36,6 @@ pub async fn start(config: rivet_config::Config, pools: rivet_pools::Pools) -> R
let handle =
tokio::spawn(async move { while let Ok(NextOutput::Message(_)) = sub.next().await {} });

if let Some(sim_config) = sim::Config::from_env()? {
let sim_udb = pools.udb().ok();
let sim_ups = sim::pubsub_for_sim(
&config,
&ups,
sim_config.force_driver,
sim_config.disable_memory_optimization,
)
.await?;
sim::spawn(sim_ups, sim_udb, sim_config);
}

loop {
if let Err(err) = ups
.publish(BroadcastSubject, &[], PublishOpts::broadcast())
Expand Down
Loading
Loading