Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
28 changes: 24 additions & 4 deletions Cargo.lock

Some generated files are not rendered by default. Learn more about how customized files appear on GitHub.

1 change: 1 addition & 0 deletions Cargo.toml
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@ uuid = { version = "1.11.0", features = ["v4"] }
[dev-dependencies]
criterion = { version = "0.5.1", features = ["async_tokio"] }
rstest = "0.23"
tracing-test = "0.2"

[[bench]]
name = "store_bench"
Expand Down
4 changes: 3 additions & 1 deletion clients/python/src/taskbroker_client/app.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,7 @@
from taskbroker_client.retry import Retry
from taskbroker_client.router import TaskRouter
from taskbroker_client.task import Task
from taskbroker_client.types import AtMostOnceStore, ContextHook, ProducerFactory
from taskbroker_client.types import AtMostOnceStore, ContextHook, ErrorHook, ProducerFactory


class TaskbrokerApp:
Expand All @@ -28,10 +28,12 @@ def __init__(
metrics_class: str | MetricsBackend = "taskbroker_client.metrics.NoOpMetricsBackend",
at_most_once_store: AtMostOnceStore | None = None,
context_hooks: list[ContextHook] | None = None,
error_hook: ErrorHook | None = None,
) -> None:
self.name = name
self.metrics = self._build_metrics(metrics_class)
self.context_hooks: list[ContextHook] = context_hooks if context_hooks is not None else []
self.error_hook = error_hook
self._config = {
"rpc_secret": None,
"grpc_config": None,
Expand Down
13 changes: 12 additions & 1 deletion clients/python/src/taskbroker_client/types.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,7 +6,7 @@
from arroyo.backends.abstract import ProducerFuture
from arroyo.backends.kafka import KafkaPayload
from arroyo.types import BrokerValue, Topic
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus
from sentry_protos.taskbroker.v1.taskbroker_pb2 import TaskActivation, TaskActivationStatus, TaskError


class ContextHook(Protocol):
Expand All @@ -23,6 +23,16 @@ def on_dispatch(self, headers: MutableMapping[str, Any]) -> None: ...
def on_execute(self, headers: dict[str, str]) -> contextlib.AbstractContextManager[None]: ...


class ErrorHook(Protocol):
"""Hook for capturing task execution exceptions before status reporting."""

def on_exception(
self,
task_meta: "InflightTaskActivation",
exc: BaseException,
) -> TaskError | None: ...


class AtMostOnceStore(Protocol):
"""
Interface for the at_most_once store used for idempotent task execution.
Expand Down Expand Up @@ -65,3 +75,4 @@ class ProcessingResult:
status: TaskActivationStatus.ValueType
host: str
receive_timestamp: float
error: TaskError | None = None
2 changes: 2 additions & 0 deletions clients/python/src/taskbroker_client/worker/client.py
Original file line number Diff line number Diff line change
Expand Up @@ -445,6 +445,7 @@ def update_task(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=fetch_next_task,
error=processing_result.error,
)

try:
Expand Down Expand Up @@ -566,6 +567,7 @@ def update_task(
id=processing_result.task_id,
status=processing_result.status,
fetch_next_task=None,
error=processing_result.error,
)

retries = 0
Expand Down
20 changes: 20 additions & 0 deletions clients/python/src/taskbroker_client/worker/workerchild.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,6 +22,7 @@
TASK_ACTIVATION_STATUS_RETRY,
TaskActivation,
TaskActivationStatus,
TaskError,
)
from sentry_sdk.consts import OP, SPANDATA, SPANSTATUS
from sentry_sdk.crons import MonitorStatus, capture_checkin
Expand Down Expand Up @@ -160,6 +161,18 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
f"execution deadline of {deadline} seconds exceeded by {taskname}"
)

def _get_task_error_from_hook(
inflight_activation: InflightTaskActivation,
exc: BaseException,
) -> TaskError | None:
if app.error_hook is None:
return None
try:
return app.error_hook.on_exception(inflight_activation, exc)
except Exception:
logger.exception("taskbroker_client.error_hook_failed")
return None

while not shutdown_event.is_set():
if max_task_count and processed_task_count >= max_task_count:
metrics.incr(
Expand Down Expand Up @@ -234,6 +247,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
set_current_task(inflight.activation)

next_state = TASK_ACTIVATION_STATUS_FAILURE
task_error = None
# Use time.time() so we can measure against activation.received_at
execution_start_time = time.time()
try:
Expand Down Expand Up @@ -263,6 +277,9 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
next_state = TASK_ACTIVATION_STATUS_RETRY
else:
next_state = TASK_ACTIVATION_STATUS_FAILURE

task_error = _get_task_error_from_hook(inflight, err)

except Exception as err:
retry = task_func.retry
captured_error = False
Expand Down Expand Up @@ -296,6 +313,8 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
if not captured_error and next_state != TASK_ACTIVATION_STATUS_RETRY:
sentry_sdk.capture_exception(err)

task_error = _get_task_error_from_hook(inflight, err)

clear_current_task()
processed_task_count += 1

Expand All @@ -313,6 +332,7 @@ def handle_alarm(signum: int, frame: FrameType | None) -> None:
status=next_state,
host=inflight.host,
receive_timestamp=inflight.receive_timestamp,
error=task_error,
)
)

Expand Down
60 changes: 51 additions & 9 deletions src/grpc/server.rs
Original file line number Diff line number Diff line change
Expand Up @@ -6,13 +6,13 @@ use prost::Message;
use sentry_protos::taskbroker::v1::consumer_service_server::ConsumerService;
use sentry_protos::taskbroker::v1::{
FetchNextTask, GetTaskRequest, GetTaskResponse, SetTaskStatusRequest, SetTaskStatusResponse,
TaskActivation, TaskActivationStatus,
TaskActivation, TaskActivationStatus, TaskError,
};
use tonic::{Request, Response, Status};
use tracing::{error, instrument, warn};

use crate::config::{Config, DeliveryMode};
use crate::store::activation::InflightActivationStatus;
use crate::store::activation::{InflightActivation, InflightActivationStatus};
use crate::store::traits::InflightActivationStore;

pub struct TaskbrokerServer {
Expand Down Expand Up @@ -97,17 +97,22 @@ impl ConsumerService for TaskbrokerServer {
"Invalid status, expects 3 (Failure), 4 (Retry), or 5 (Complete), but got: {status:?}"
)));
}

if status == InflightActivationStatus::Failure {
metrics::counter!("grpc_server.set_status.failure").increment(1);
}

match self.store.set_status(&id, status).await {
Ok(Some(_)) => metrics::counter!(
"grpc_server.set_status",
"result" => "ok",
"status" => status.to_string()
)
.increment(1),
Ok(Some(row)) => {
metrics::counter!(
"grpc_server.set_status",
"result" => "ok",
"status" => status.to_string()
)
.increment(1);

log_failure_context(&row, status, request.get_ref().error.as_ref());
}

Ok(None) => metrics::counter!(
"grpc_server.set_status",
Expand Down Expand Up @@ -167,7 +172,8 @@ impl ConsumerService for TaskbrokerServer {
Ok(None) => {
warn!("No pending activations");

// If we return an error, the worker will place the result back in its internal queue and send the update again in the future, which is not desired
// If we return an error, the worker will place the result back in its internal queue
// and send the update again in the future, which is not desired.
Ok(Response::new(SetTaskStatusResponse { task: None }))
}

Expand All @@ -190,7 +196,43 @@ impl ConsumerService for TaskbrokerServer {
}))
}
};

metrics::histogram!("grpc_server.fetch_next.duration").record(start_time.elapsed());
res
}
}

/// Emit a structured ERROR log on task conclusions that reflect a caught worker
/// exception. Called only after the store update succeeded.
///
/// Logs on Failure always, and on Retry only when an error envelope is present
/// (bare retries triggered by retry_task() with no exception stay quiet).
///
/// Intentionally does not touch metrics — taskname/exception_type are too
/// high-cardinality to tag counters with.
fn log_failure_context(
row: &InflightActivation,
status: InflightActivationStatus,
error: Option<&TaskError>,
) {
let should_log = match status {
InflightActivationStatus::Failure => true,
InflightActivationStatus::Retry => error.is_some(),
_ => false,
};

if !should_log {
return;
}

error!(
task_id = %row.id,
taskname = %row.taskname,
namespace = %row.namespace,
status = %status,
attempts = row.processing_attempts,
exception_type = error.map(|e| e.exception_type.as_str()).unwrap_or(""),
exception_message = error.map(|e| e.exception_message.as_str()).unwrap_or(""),
"task reported failure",
);
}
Loading