diff --git a/Cargo.lock b/Cargo.lock index 900761b1..d43c2c95 100644 --- a/Cargo.lock +++ b/Cargo.lock @@ -1400,7 +1400,7 @@ checksum = "e04d7f318608d35d4b61ddd75cbdaee86b023ebe2bd5a66ee0915f0bf93095a9" dependencies = [ "hermit-abi", "libc", - "windows-sys 0.59.0", + "windows-sys 0.52.0", ] [[package]] @@ -2535,9 +2535,7 @@ dependencies = [ [[package]] name = "sentry_protos" -version = "0.8.13" -source = "registry+https://github.com/rust-lang/crates.io-index" -checksum = "60dfb8c1b03c3f6e800a91eca7daea05205dd87f63b8d70b50b7e2211a2e0be2" +version = "0.8.17" dependencies = [ "prost", "prost-types", @@ -3006,6 +3004,7 @@ dependencies = [ "tower", "tracing", "tracing-subscriber", + "tracing-test", "uuid", ] @@ -3397,6 +3396,27 @@ dependencies = [ "tracing-serde", ] +[[package]] +name = "tracing-test" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "19a4c448db514d4f24c5ddb9f73f2ee71bfb24c526cf0c570ba142d1119e0051" +dependencies = [ + "tracing-core", + "tracing-subscriber", + "tracing-test-macro", +] + +[[package]] +name = "tracing-test-macro" +version = "0.2.6" +source = "registry+https://github.com/rust-lang/crates.io-index" +checksum = "ad06847b7afb65c7866a36664b75c40b895e318cea4f71299f013fb22965329d" +dependencies = [ + "quote", + "syn", +] + [[package]] name = "try-lock" version = "0.2.5" diff --git a/Cargo.toml b/Cargo.toml index 1ad2b992..73b627ad 100644 --- a/Cargo.toml +++ b/Cargo.toml @@ -64,6 +64,7 @@ uuid = { version = "1.11.0", features = ["v4"] } [dev-dependencies] criterion = { version = "0.5.1", features = ["async_tokio"] } rstest = "0.23" +tracing-test = "0.2" [[bench]] name = "store_bench" diff --git a/clients/python/src/taskbroker_client/app.py b/clients/python/src/taskbroker_client/app.py index 39374c9f..9ec0da99 100644 --- a/clients/python/src/taskbroker_client/app.py +++ b/clients/python/src/taskbroker_client/app.py @@ -12,7 +12,7 @@ from taskbroker_client.retry import Retry from taskbroker_client.router import TaskRouter from taskbroker_client.task import Task -from taskbroker_client.types import AtMostOnceStore, ContextHook, ProducerFactory +from taskbroker_client.types import AtMostOnceStore, ContextHook, ErrorHook, ProducerFactory class TaskbrokerApp: @@ -28,10 +28,12 @@ def __init__( metrics_class: str | MetricsBackend = "taskbroker_client.metrics.NoOpMetricsBackend", at_most_once_store: AtMostOnceStore | None = None, context_hooks: list[ContextHook] | None = None, + error_hook: ErrorHook | None = None, ) -> None: self.name = name self.metrics = self._build_metrics(metrics_class) self.context_hooks: list[ContextHook] = context_hooks if context_hooks is not None else [] + self.error_hook = error_hook self._config = { "rpc_secret": None, "grpc_config": None, diff --git a/clients/python/src/taskbroker_client/types.py b/clients/python/src/taskbroker_client/types.py index c4a3783f..f0fd54eb 100644 --- a/clients/python/src/taskbroker_client/types.py +++ b/clients/python/src/taskbroker_client/types.py @@ -6,7 +6,7 @@ from arroyo.backends.abstract import ProducerFuture from arroyo.backends.kafka import KafkaPayload from arroyo.types import BrokerValue, Topic -from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus +from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus, TaskError class ContextHook(Protocol): @@ -23,6 +23,16 @@ def on_dispatch(self, headers: MutableMapping[str, Any]) -> None: ... def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManager[None]: ... +class ErrorHook(Protocol): + """Hook for capturing task execution exceptions before status reporting.""" + + def on_exception( + self, + task_meta: "InflightTaskActivation", + exc: BaseException, + ) -> TaskError | None: ... + + class AtMostOnceStore(Protocol): """ Interface for the at_most_once store used for idempotent task execution. @@ -65,3 +75,4 @@ class ProcessingResult: status: TaskActivationStatus.ValueType host: str receive_timestamp: float + error: TaskError | None = None diff --git a/clients/python/src/taskbroker_client/worker/client.py b/clients/python/src/taskbroker_client/worker/client.py index 2a1c56b2..4140a79c 100644 --- a/clients/python/src/taskbroker_client/worker/client.py +++ b/clients/python/src/taskbroker_client/worker/client.py @@ -445,6 +445,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=fetch_next_task, + error=processing_result.error, ) try: @@ -566,6 +567,7 @@ def update_task( id=processing_result.task_id, status=processing_result.status, fetch_next_task=None, + error=processing_result.error, ) retries = 0 diff --git a/clients/python/src/taskbroker_client/worker/workerchild.py b/clients/python/src/taskbroker_client/worker/workerchild.py index cf8162a9..bdd3d0ad 100644 --- a/clients/python/src/taskbroker_client/worker/workerchild.py +++ b/clients/python/src/taskbroker_client/worker/workerchild.py @@ -22,6 +22,7 @@ TASK_ACTIVATION_STATUS_RETRY, TaskActivation, TaskActivationStatus, + TaskError, ) from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS from sentry_sdk.crons import MonitorStatus, capture_checkin @@ -160,6 +161,18 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: f"execution deadline of {deadline} seconds exceeded by {taskname}" ) + def _get_task_error_from_hook( + inflight_activation: InflightTaskActivation, + exc: BaseException, + ) -> TaskError | None: + if app.error_hook is None: + return None + try: + return app.error_hook.on_exception(inflight_activation, exc) + except Exception: + logger.exception("taskbroker_client.error_hook_failed") + return None + while not shutdown_event.is_set(): if max_task_count and processed_task_count >= max_task_count: metrics.incr( @@ -234,6 +247,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: set_current_task(inflight.activation) next_state = TASK_ACTIVATION_STATUS_FAILURE + task_error = None # Use time.time() so we can measure against activation.received_at execution_start_time = time.time() try: @@ -263,6 +277,9 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: next_state = TASK_ACTIVATION_STATUS_RETRY else: next_state = TASK_ACTIVATION_STATUS_FAILURE + + task_error = _get_task_error_from_hook(inflight, err) + except Exception as err: retry = task_func.retry captured_error = False @@ -296,6 +313,8 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: if not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY: sentry_sdk.capture_exception(err) + task_error = _get_task_error_from_hook(inflight, err) + clear_current_task() processed_task_count += 1 @@ -313,6 +332,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None: status=next_state, host=inflight.host, receive_timestamp=inflight.receive_timestamp, + error=task_error, ) ) diff --git a/src/grpc/server.rs b/src/grpc/server.rs index cf4a6bdd..d26a4050 100644 --- a/src/grpc/server.rs +++ b/src/grpc/server.rs @@ -6,13 +6,13 @@ use prost::Message; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; use sentry_protos::taskbroker::v1::{ FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse, - TaskActivation, TaskActivationStatus, + TaskActivation, TaskActivationStatus, TaskError, }; use tonic::{Request, Response, Status}; use tracing::{error, instrument, warn}; use crate::config::{Config, DeliveryMode}; -use crate::store::activation::InflightActivationStatus; +use crate::store::activation::{InflightActivation, InflightActivationStatus}; use crate::store::traits::InflightActivationStore; pub struct TaskbrokerServer { @@ -97,17 +97,22 @@ impl ConsumerService for TaskbrokerServer { "Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}" ))); } + if status == InflightActivationStatus::Failure { metrics::counter!("grpc_server.set_status.failure").increment(1); } match self.store.set_status(&id, status).await { - Ok(Some(_)) => metrics::counter!( - "grpc_server.set_status", - "result" => "ok", - "status" => status.to_string() - ) - .increment(1), + Ok(Some(row)) => { + metrics::counter!( + "grpc_server.set_status", + "result" => "ok", + "status" => status.to_string() + ) + .increment(1); + + log_failure_context(&row, status, request.get_ref().error.as_ref()); + } Ok(None) => metrics::counter!( "grpc_server.set_status", @@ -167,7 +172,8 @@ impl ConsumerService for TaskbrokerServer { Ok(None) => { warn!("No pending activations"); - // If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired + // If we return an error, the worker will place the result back in its internal queue + // and send the update again in the future, which is not desired. Ok(Response::new(SetTaskStatusResponse { task: None })) } @@ -190,7 +196,43 @@ impl ConsumerService for TaskbrokerServer { })) } }; + metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed()); res } } + +/// Emit a structured ERROR log on task conclusions that reflect a caught worker +/// exception. Called only after the store update succeeded. +/// +/// Logs on Failure always, and on Retry only when an error envelope is present +/// (bare retries triggered by retry_task() with no exception stay quiet). +/// +/// Intentionally does not touch metrics — taskname/exception_type are too +/// high-cardinality to tag counters with. +fn log_failure_context( + row: &InflightActivation, + status: InflightActivationStatus, + error: Option<&TaskError>, +) { + let should_log = match status { + InflightActivationStatus::Failure => true, + InflightActivationStatus::Retry => error.is_some(), + _ => false, + }; + + if !should_log { + return; + } + + error!( + task_id = %row.id, + taskname = %row.taskname, + namespace = %row.namespace, + status = %status, + attempts = row.processing_attempts, + exception_type = error.map(|e| e.exception_type.as_str()).unwrap_or(""), + exception_message = error.map(|e| e.exception_message.as_str()).unwrap_or(""), + "task reported failure", + ); +} diff --git a/src/grpc/server_tests.rs b/src/grpc/server_tests.rs index 2b986d66..ebb59d00 100644 --- a/src/grpc/server_tests.rs +++ b/src/grpc/server_tests.rs @@ -4,14 +4,17 @@ use prost::Message; use rstest::rstest; use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService; use sentry_protos::taskbroker::v1::{ - FetchNextTask, GetTaskRequest, SetTaskStatusRequest, TaskActivation, + FetchNextTask, GetTaskRequest, SetTaskStatusRequest, TaskActivation, TaskActivationStatus, + TaskError, }; use tonic::{Code, Request}; use crate::config::{Config, DeliveryMode}; use crate::grpc::server::TaskbrokerServer; use crate::store::activation::InflightActivationStatus; -use crate::test_utils::{create_config, create_test_store, make_activations}; +use crate::test_utils::{ + create_config, create_test_store, make_activations, make_failing_store, seed_inflight, +}; #[tokio::test] async fn test_get_task_push_mode_returns_permission_denied() { @@ -68,6 +71,7 @@ async fn test_set_task_status(#[case] adapter: &str) { id: "test_task".to_string(), status: 5, // Complete fetch_next_task: None, + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -89,6 +93,7 @@ async fn test_set_task_status_invalid(#[case] adapter: &str) { id: "test_task".to_string(), status: 1, // Invalid fetch_next_task: None, + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_err()); @@ -125,7 +130,7 @@ async fn test_get_task_success(#[case] adapter: &str) { let resp = response.unwrap(); assert!(resp.get_ref().task.is_some()); let task = resp.get_ref().task.as_ref().unwrap(); - assert!(task.id == "id_0"); + assert_eq!(task.id, "id_0"); let row = store.get_by_id("id_0").await.unwrap().expect("claimed row"); assert_eq!(row.status, InflightActivationStatus::Processing); @@ -212,7 +217,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { let resp = response.unwrap(); assert!(resp.get_ref().task.is_some()); let task = resp.get_ref().task.as_ref().unwrap(); - assert!(task.id == "id_0"); + assert_eq!(task.id, "id_0"); let request = SetTaskStatusRequest { id: "id_0".to_string(), @@ -221,6 +226,7 @@ async fn test_set_task_status_success(#[case] adapter: &str) { namespace: None, application: None, }), + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -256,6 +262,7 @@ async fn test_set_task_status_with_application(#[case] adapter: &str) { application: Some("hammers".into()), namespace: None, }), + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -296,6 +303,7 @@ async fn test_set_task_status_with_application_no_match(#[case] adapter: &str) { application: Some("no-matches".into()), namespace: None, }), + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -324,6 +332,7 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte application: None, namespace: Some(namespace), }), + error: None, }; let response = service.set_task_status(Request::new(request)).await; assert!(response.is_ok()); @@ -332,3 +341,192 @@ async fn test_set_task_status_with_namespace_requires_application(#[case] adapte "namespace without application yields no next task in response" ); } + +#[tokio::test] +async fn set_task_status_failure_with_error_logs_after_persist() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = create_test_store("sqlite").await; + let config = create_config(); + let id = seed_inflight(&store, "sentry.tasks.store.save_event", "ingest.errors").await; + + let server = TaskbrokerServer { + store: store.clone(), + config, + }; + + server + .set_task_status(Request::new(SetTaskStatusRequest { + id: id.clone(), + status: TaskActivationStatus::Failure as i32, + fetch_next_task: None, + error: Some(TaskError { + exception_type: "django.db.utils.OperationalError".into(), + exception_message: r#"could not access file "$libdir/btree_gist""#.into(), + traceback: Some("Traceback ...".into()), + }), + })) + .await + .unwrap(); + + assert!(logs_contain("task reported failure")); + assert!(logs_contain("sentry.tasks.store.save_event")); + assert!(logs_contain("django.db.utils.OperationalError")); + assert!(logs_contain("btree_gist")); + assert_eq!( + store.get_by_id(&id).await.unwrap().unwrap().status, + InflightActivationStatus::Failure, + ); + } + + inner().await; +} + +#[tokio::test] +async fn set_task_status_retry_with_error_logs() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = create_test_store("sqlite").await; + let config = create_config(); + let id = seed_inflight(&store, "x", "y").await; + let server = TaskbrokerServer { + store: store.clone(), + config, + }; + + server + .set_task_status(Request::new(SetTaskStatusRequest { + id, + status: TaskActivationStatus::Retry as i32, + fetch_next_task: None, + error: Some(TaskError { + exception_type: "requests.exceptions.ConnectionError".into(), + exception_message: "connection reset".into(), + traceback: None, + }), + })) + .await + .unwrap(); + + assert!(logs_contain("task reported failure")); + assert!(logs_contain("requests.exceptions.ConnectionError")); + } + + inner().await; +} + +#[tokio::test] +async fn set_task_status_retry_without_error_is_silent() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = create_test_store("sqlite").await; + let config = create_config(); + let id = seed_inflight(&store, "x", "y").await; + let server = TaskbrokerServer { store, config }; + + server + .set_task_status(Request::new(SetTaskStatusRequest { + id, + status: TaskActivationStatus::Retry as i32, + fetch_next_task: None, + error: None, + })) + .await + .unwrap(); + + assert!(!logs_contain("task reported failure")); + } + + inner().await; +} + +#[tokio::test] +async fn set_task_status_failure_without_error_still_logs_task_context() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = create_test_store("sqlite").await; + let config = create_config(); + let id = seed_inflight(&store, "some.task", "some.namespace").await; + let server = TaskbrokerServer { store, config }; + + server + .set_task_status(Request::new(SetTaskStatusRequest { + id: id.clone(), + status: TaskActivationStatus::Failure as i32, + fetch_next_task: None, + error: None, + })) + .await + .unwrap(); + + assert!(logs_contain("task reported failure")); + assert!(logs_contain("some.task")); + assert!(logs_contain("some.namespace")); + } + + inner().await; +} + +#[tokio::test] +async fn set_task_status_does_not_log_when_store_update_fails() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = make_failing_store().await; + let config = create_config(); + let server = TaskbrokerServer { store, config }; + + let _ = server + .set_task_status(Request::new(SetTaskStatusRequest { + id: "abc".into(), + status: TaskActivationStatus::Failure as i32, + fetch_next_task: None, + error: Some(TaskError { + exception_type: "X".into(), + exception_message: "Y".into(), + traceback: None, + }), + })) + .await; + + assert!(!logs_contain("task reported failure")); + } + + inner().await; +} + +#[tokio::test] +async fn set_task_status_complete_does_not_log_failure() { + use tracing_test::traced_test; + + #[traced_test] + async fn inner() { + let store = create_test_store("sqlite").await; + let config = create_config(); + let id = seed_inflight(&store, "x", "y").await; + let server = TaskbrokerServer { store, config }; + + server + .set_task_status(Request::new(SetTaskStatusRequest { + id, + status: TaskActivationStatus::Complete as i32, + fetch_next_task: None, + error: None, + })) + .await + .unwrap(); + + assert!(!logs_contain("task reported failure")); + } + + inner().await; +} diff --git a/src/test_utils.rs b/src/test_utils.rs index 7dd6d776..fa1ca7b1 100644 --- a/src/test_utils.rs +++ b/src/test_utils.rs @@ -3,12 +3,14 @@ use std::env::var; use std::sync::Arc; use std::time::SystemTime; +use anyhow::{Error, anyhow}; +use async_trait::async_trait; use rdkafka::Message; use rdkafka::admin::{AdminClient, AdminOptions, NewTopic, TopicReplication}; use rdkafka::consumer::{CommitMode, Consumer, StreamConsumer}; use rdkafka::producer::FutureProducer; -use chrono::Utc; +use chrono::{DateTime, Utc}; use futures::StreamExt; use prost::Message as ProstMessage; use prost_types::Timestamp; @@ -22,6 +24,7 @@ use crate::store::activation::{ use crate::store::adapters::postgres::{PostgresActivationStore, PostgresActivationStoreConfig}; use crate::store::adapters::sqlite::{InflightActivationStoreConfig, SqliteActivationStore}; use crate::store::traits::InflightActivationStore; +use crate::store::types::{BucketRange, FailedTasksForwarder}; /// Builder for `TaskActivation`. We cannot generate a builder automatically because `TaskActivation` is defined in `sentry-protos`. pub struct TaskActivationBuilder { @@ -297,6 +300,135 @@ pub async fn create_test_store(adapter: &str) -> Arc, +} + +#[async_trait] +impl InflightActivationStore for FailingSetStatusStore { + async fn store(&self, batch: Vec) -> Result { + self.inner.store(batch).await + } + + fn assign_partitions(&self, partitions: Vec) -> Result<(), Error> { + self.inner.assign_partitions(partitions) + } + + async fn claim_activations( + &self, + application: Option<&str>, + namespaces: Option<&[String]>, + limit: Option, + bucket: Option, + mark_processing: bool, + ) -> Result, Error> { + self.inner + .claim_activations(application, namespaces, limit, bucket, mark_processing) + .await + } + + async fn mark_activation_processing(&self, id: &str) -> Result<(), Error> { + self.inner.mark_activation_processing(id).await + } + + async fn set_status( + &self, + _id: &str, + _status: InflightActivationStatus, + ) -> Result, Error> { + Err(anyhow!("injected set_status failure for test")) + } + + async fn pending_activation_max_lag(&self, now: &DateTime) -> f64 { + self.inner.pending_activation_max_lag(now).await + } + + async fn count_by_status(&self, status: InflightActivationStatus) -> Result { + self.inner.count_by_status(status).await + } + + async fn count(&self) -> Result { + self.inner.count().await + } + + async fn get_by_id(&self, id: &str) -> Result, Error> { + self.inner.get_by_id(id).await + } + + async fn set_processing_deadline( + &self, + id: &str, + deadline: Option>, + ) -> Result<(), Error> { + self.inner.set_processing_deadline(id, deadline).await + } + + async fn delete_activation(&self, id: &str) -> Result<(), Error> { + self.inner.delete_activation(id).await + } + + async fn vacuum_db(&self) -> Result<(), Error> { + self.inner.vacuum_db().await + } + + async fn full_vacuum_db(&self) -> Result<(), Error> { + self.inner.full_vacuum_db().await + } + + async fn db_size(&self) -> Result { + self.inner.db_size().await + } + + async fn get_retry_activations(&self) -> Result, Error> { + self.inner.get_retry_activations().await + } + + async fn handle_claim_expiration(&self) -> Result { + self.inner.handle_claim_expiration().await + } + + async fn handle_processing_deadline(&self) -> Result { + self.inner.handle_processing_deadline().await + } + + async fn handle_processing_attempts(&self) -> Result { + self.inner.handle_processing_attempts().await + } + + async fn handle_expires_at(&self) -> Result { + self.inner.handle_expires_at().await + } + + async fn handle_delay_until(&self) -> Result { + self.inner.handle_delay_until().await + } + + async fn handle_failed_tasks(&self) -> Result { + self.inner.handle_failed_tasks().await + } + + async fn mark_completed(&self, ids: Vec) -> Result { + self.inner.mark_completed(ids).await + } + + async fn remove_completed(&self) -> Result { + self.inner.remove_completed().await + } + + async fn remove_killswitched(&self, killswitched_tasks: Vec) -> Result { + self.inner.remove_killswitched(killswitched_tasks).await + } + + async fn clear(&self) -> Result<(), Error> { + self.inner.clear().await + } +} + +pub async fn make_failing_store() -> Arc { + let inner = create_test_store("sqlite").await; + Arc::new(FailingSetStatusStore { inner }) as Arc +} + /// Create a Config instance that uses a testing topic /// and earliest auto_offset_reset. This is intended to be combined /// with [`reset_topic`] @@ -336,8 +468,8 @@ pub fn create_integration_config_with_ssl() -> Arc { Arc::new(config) } -pub fn create_integration_config_with_topic(topic: String) -> Config { - Config { +pub fn create_integration_config_with_topic(topic: String) -> Arc { + let config = Config { pg_host: get_pg_host(), pg_port: get_pg_port(), pg_username: get_pg_username(), @@ -347,7 +479,9 @@ pub fn create_integration_config_with_topic(topic: String) -> Config { kafka_topic: topic, kafka_auto_offset_reset: "earliest".into(), ..Config::default() - } + }; + + Arc::new(config) } /// Create a kafka producer for a given config @@ -516,3 +650,28 @@ pub async fn assert_counts(expected: StatusCount, store: &dyn InflightActivation "difference in failure count", ); } + +/// Store a single inflight activation with a caller-provided task name and +/// namespace, then return its task id. +/// +/// This helper is used by gRPC server tests that need a real inflight row in +/// the store before calling `set_task_status(...)`. +pub async fn seed_inflight( + store: &Arc, + taskname: &str, + namespace: &str, +) -> String { + let mut activations = make_activations(1); + + let mut payload = TaskActivation::decode(&activations[0].activation as &[u8]).unwrap(); + payload.taskname = taskname.to_string(); + payload.namespace = namespace.to_string(); + + activations[0].taskname = taskname.to_string(); + activations[0].namespace = namespace.to_string(); + activations[0].activation = payload.encode_to_vec(); + + let id = activations[0].id.clone(); + store.store(activations).await.unwrap(); + id +}