From b05aa4d5d13a01e7dedff749c35cbac68f77826b Mon Sep 17 00:00:00 2001 From: Peter Solnica Date: Wed, 13 May 2026 10:53:18 +0000 Subject: [PATCH 1/3] feat(tests): add :allowance option to Sentry.Test.setup_sentry/1 MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Introduces the mechanism that future commits will use to install per-test telemetry handlers for popular libraries (Oban, Broadway). This commit ships only the infrastructure: option parsing, a unique per-test handler ID, automatic detach on test exit, and a generic __handle_allowance_event__/4 handler that calls allow_sentry_reports/2 in response to the configured event. The internal allowance_handlers/1 dispatch is empty — any atom passed under :allowance currently raises a clear ArgumentError naming the unsupported entry. Oban and Broadway clauses land in follow-up commits. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/sentry/test.ex | 89 +++++++++++++++++++++++++++++++++++++-- test/sentry/test_test.exs | 41 ++++++++++++++++++ 2 files changed, 127 insertions(+), 3 deletions(-) diff --git a/lib/sentry/test.ex b/lib/sentry/test.ex index 7c98dcd9..a77eb19f 100644 --- a/lib/sentry/test.ex +++ b/lib/sentry/test.ex @@ -52,7 +52,7 @@ defmodule Sentry.Test do @moduledoc since: "10.2.0" - @compile {:no_warn_undefined, [Bypass, Plug.Conn]} + @compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry]} @ownership_server Sentry.Test.OwnershipServer @@ -75,8 +75,13 @@ defmodule Sentry.Test do ## Options - Any extra Sentry config options (e.g., `dedup_events: false`, `traces_sample_rate: 1.0`) - will be forwarded to the test config. + * `:allowance` - a list of integration module atoms to enable automatic + `Sentry.Test.allow_sentry_reports/2` wiring for. The integrations land + in follow-up commits; see the integration-specific sections below for + supported entries. + + Any other key is forwarded to the per-test Sentry config (e.g., + `dedup_events: false`, `traces_sample_rate: 1.0`). The reserved `:telemetry_processor` option is *not* forwarded to the test config. Instead, its value (a keyword list) is passed to the per-test @@ -135,6 +140,7 @@ defmodule Sentry.Test do {tp_opts, extra_config} = Keyword.pop(extra_config, :telemetry_processor, []) {collect_envelopes, extra_config} = Keyword.pop(extra_config, :collect_envelopes, false) + {allowance, extra_config} = Keyword.pop(extra_config, :allowance, []) # Open a per-test Bypass and stub the envelope endpoint bypass = Bypass.open() @@ -151,6 +157,8 @@ defmodule Sentry.Test do bypass_config = [dsn: "http://public:secret@localhost:#{bypass.port}/1"] setup_collector(bypass_config ++ extra_config) + attach_allowance_handlers(allowance, self()) + case collect_envelopes do false -> %{bypass: bypass, telemetry_processor: processor_name} @@ -833,6 +841,81 @@ defmodule Sentry.Test do end end + defp ensure_telemetry_loaded! do + unless Code.ensure_loaded?(:telemetry) do + raise """ + `:telemetry` is required for the `:allowance` option of Sentry.Test + but is not available. Add it to your test dependencies: + + {:telemetry, "~> 1.0", only: [:test]} + """ + end + end + + # ── :allowance plumbing ── + # + # Each integration atom (e.g. Oban, Broadway) is mapped by + # allowance_handlers!/1 to one or more {telemetry_event, {module, fun}} + # pairs. Commit 1 ships only the catch-all clause; commits 2 and 3 add + # the integration-specific clauses. + + defp attach_allowance_handlers([], _owner_pid), do: :ok + + defp attach_allowance_handlers(modules, owner_pid) when is_list(modules) do + ensure_telemetry_loaded!() + Enum.each(modules, &attach_allowance_handler(&1, owner_pid)) + end + + defp attach_allowance_handler(module, owner_pid) do + case allowance_handlers(module) do + :unknown -> + raise ArgumentError, + "unknown :allowance entry #{inspect(module)}. Supported integrations: " <> + "(none built-in yet — Oban and Broadway land in follow-up commits)" + + pairs when is_list(pairs) -> + Enum.each(pairs, fn {event, handler_fun} -> + __attach_allowance__(event, handler_fun, %{owner_pid: owner_pid}) + end) + end + end + + @doc false + @spec __attach_allowance__([atom()], {module(), atom()}, map()) :: :ok + def __attach_allowance__(event, {module, function}, config) + when is_list(event) and is_atom(module) and is_atom(function) and is_map(config) do + ref = System.unique_integer([:positive]) + handler_id = {:sentry_test_allowance, ref} + + :ok = + :telemetry.attach( + handler_id, + event, + Function.capture(module, function, 4), + config + ) + + ExUnit.Callbacks.on_exit(fn -> :telemetry.detach(handler_id) end) + :ok + end + + # Generic "allow whatever fired this event for owner_pid" handler. Used + # by the foundation unit tests and available for ad-hoc telemetry + # routing; the Oban / Broadway dispatches use their own handlers that + # consult metadata rather than blindly allowing the emitting pid. + @doc false + def __handle_allowance_event__(_event, _measurements, _metadata, %{owner_pid: owner_pid}) do + allow_sentry_reports(owner_pid, self()) + rescue + ArgumentError -> :ok + end + + # Returns the list of `{event_path, {module, function}}` handler pairs for + # a given integration atom, or `:unknown` for unsupported entries (the + # caller turns that into an `ArgumentError`). Commits 2 and 3 prepend + # clauses for `Oban` and `Broadway` respectively. + defp allowance_handlers(_other), do: :unknown + # Sets up collection infrastructure (ETS table, before_send wrapping, config) # without opening a new Bypass. When no :dsn is provided in extra_config, # falls back to the default Bypass DSN from Registry. diff --git a/test/sentry/test_test.exs b/test/sentry/test_test.exs index 79c065b6..ca542b23 100644 --- a/test/sentry/test_test.exs +++ b/test/sentry/test_test.exs @@ -375,6 +375,47 @@ defmodule Sentry.TestTest do end end + describe "setup_sentry/1 with :allowance (foundation)" do + test "empty allowance list is a no-op" do + assert %{bypass: _, telemetry_processor: _} = + SentryTest.setup_sentry(allowance: []) + end + + test "raises a clear error for unknown allowance entries" do + assert_raise ArgumentError, ~r/unknown :allowance entry/, fn -> + SentryTest.setup_sentry(allowance: [SomeUnknownThing]) + end + end + + test "__attach_allowance__/3 routes worker events back to the owner" do + SentryTest.setup_sentry() + test_pid = self() + + SentryTest.__attach_allowance__( + [:sentry_test_allowance, :synthetic, :start], + {SentryTest, :__handle_allowance_event__}, + %{owner_pid: test_pid} + ) + + worker_done = make_ref() + + {:ok, _worker} = + Task.start(fn -> + :telemetry.execute([:sentry_test_allowance, :synthetic, :start], %{}, %{}) + + assert {:ok, _} = + Sentry.capture_message("hello from synthetic worker", result: :sync) + + send(test_pid, worker_done) + end) + + assert_receive ^worker_done, 5_000 + + assert [%Sentry.Event{message: %{formatted: "hello from synthetic worker"}}] = + SentryTest.pop_sentry_reports() + end + end + describe "before_send wrapping" do test "wraps existing before_send callback" do test_pid = self() From e041f677013570088354ca5178d9061f26b6ec15 Mon Sep 17 00:00:00 2001 From: Peter Solnica Date: Wed, 13 May 2026 10:59:33 +0000 Subject: [PATCH 2/3] feat(tests): support allowance: [Oban] with async-safe job tagging MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Installs telemetry handlers that capture the test pid at Oban job insert time and route the worker's captured events back to that test on job start. The pairing makes auto-allowance safe under async: true even when multiple tests share an Oban supervisor, because every job is uniquely tied to the test that scheduled it (rather than to whichever telemetry handler happened to fire first). The tag store lives in Sentry.Test.Registry as a public ETS table (:sentry_test_oban_job_tags) alongside the existing scope-allow table. Tags are dropped on :stop/:exception and defensively on test exit so jobs that crash before completion don't leave stale entries behind. Handlers guard on `is_integer(job.id)` so synthetic jobs from inline mode or ad-hoc telemetry simulations (no persisted id) are skipped silently. The :inline / :manual Oban testing modes run jobs in the test pid anyway, so this auto-allowance is a no-op for them — it only adds capability for the production-like worker case from issue #1052. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/sentry/test.ex | 117 +++++++++++- lib/sentry/test/registry.ex | 72 ++++++++ test/sentry/test_test.exs | 137 ++++++++++++++ .../test/phoenix_app/oban_test.exs | 167 +++++++++++++++++- 4 files changed, 485 insertions(+), 8 deletions(-) diff --git a/lib/sentry/test.ex b/lib/sentry/test.ex index a77eb19f..e8c55cb2 100644 --- a/lib/sentry/test.ex +++ b/lib/sentry/test.ex @@ -75,10 +75,9 @@ defmodule Sentry.Test do ## Options - * `:allowance` - a list of integration module atoms to enable automatic - `Sentry.Test.allow_sentry_reports/2` wiring for. The integrations land - in follow-up commits; see the integration-specific sections below for - supported entries. + * `:allowance` - a list of integration module atoms (currently `Oban`) + to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for. + See the "Oban tests" section below. Any other key is forwarded to the per-test Sentry config (e.g., `dedup_events: false`, `traces_sample_rate: 1.0`). @@ -99,6 +98,32 @@ defmodule Sentry.Test do This collapses the common `bypass = setup_sentry(...); ref = setup_bypass_envelope_collector(bypass)` two-step into one call. + ## Oban tests + + When you run Oban in `:inline` or `:manual` mode (per the + [Oban testing guide](https://hexdocs.pm/oban/testing.html)), jobs + execute synchronously in the calling process and `Sentry.Test` + captures their events automatically — no `:allowance` option needed. + + Use `allowance: [Oban]` when your test exercises a real Oban + supervisor with worker processes (the production-like setup that + issue #1052 was filed for). The option installs telemetry handlers + that tag jobs at insert time and route the worker's captured events + back to the inserting test: + + setup do + Sentry.Test.setup_sentry(allowance: [Oban]) + end + + test "captures events from a real Oban worker" do + {:ok, _} = Oban.insert(MyWorker.new(%{})) + # ... wait for the worker to run ... + assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports() + end + + Jobs inserted by other processes (cron plugins, jobs scheduling + jobs) are not auto-tagged and require manual + `Sentry.Test.allow_sentry_reports/2`. ## Examples @@ -912,10 +937,82 @@ defmodule Sentry.Test do # Returns the list of `{event_path, {module, function}}` handler pairs for # a given integration atom, or `:unknown` for unsupported entries (the - # caller turns that into an `ArgumentError`). Commits 2 and 3 prepend - # clauses for `Oban` and `Broadway` respectively. + # caller turns that into an `ArgumentError`). + defp allowance_handlers(Oban) do + [ + {[:oban, :engine, :insert_job, :stop], {__MODULE__, :__handle_oban_insert_job__}}, + {[:oban, :engine, :insert_all_jobs, :stop], {__MODULE__, :__handle_oban_insert_all_jobs__}}, + {[:oban, :job, :start], {__MODULE__, :__handle_oban_job_start__}}, + {[:oban, :job, :stop], {__MODULE__, :__handle_oban_job_finish__}}, + {[:oban, :job, :exception], {__MODULE__, :__handle_oban_job_finish__}} + ] + end + defp allowance_handlers(_other), do: :unknown + # ── Oban allowance handlers ── + # + # Guards on `is_integer(id)` so synthetic jobs from `:inline` mode or + # ad-hoc telemetry simulations (no persisted id) are silently skipped — + # keeps the handlers safe to install in any test config. + + @doc false + def __handle_oban_insert_job__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + Sentry.Test.Registry.tag_oban_job(id, self()) + end + + def __handle_oban_insert_job__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_insert_all_jobs__(_event, _measurements, %{jobs: jobs}, _config) + when is_list(jobs) do + pid = self() + + Enum.each(jobs, fn + %{id: id} when is_integer(id) -> Sentry.Test.Registry.tag_oban_job(id, pid) + _ -> :ok + end) + end + + def __handle_oban_insert_all_jobs__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_job_start__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + case Sentry.Test.Registry.lookup_oban_job(id) do + nil -> :ok + test_pid -> safe_allow(test_pid, self()) + end + end + + def __handle_oban_job_start__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_job_finish__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + Sentry.Test.Registry.untag_oban_job(id) + end + + def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok + + # Best-effort allow used by the Oban / Broadway dispatch handlers. + # Swallows the `ArgumentError` that `allow_sentry_reports/2` raises + # when: + # + # * the would-be worker pid is already allowed by another live test + # (concurrent tests racing on a shared worker — first wins); + # * the worker pid is the test pid itself (e.g. Oban :manual mode + # with `drain_queue/2` — `$callers` already routes the events); + # * the owner pid is no longer collecting (test exited between + # insert and start). + defp safe_allow(owner_pid, allowed_pid) + when is_pid(owner_pid) and is_pid(allowed_pid) do + allow_sentry_reports(owner_pid, allowed_pid) + rescue + ArgumentError -> :ok + end + # Sets up collection infrastructure (ETS table, before_send wrapping, config) # without opening a new Bypass. When no :dsn is provided in extra_config, # falls back to the default Bypass DSN from Registry. @@ -980,7 +1077,11 @@ defmodule Sentry.Test do # cleans up the key and allowances automatically when this test exits. # Drop any worker→processor routing rows that point at this test's # processor so a test that exits before its allowed pids do not - # leave stale rows pointing at a stopped per-test processor. + # leave stale rows pointing at a stopped per-test processor. Also + # defensively drop any Oban job tags owned by this test in case a + # job crashed before emitting a `:stop` / `:exception` event — leaves a + # stale row pointing at a dead pid otherwise. + test_pid = self() processor_name = Process.get(:sentry_telemetry_processor) ExUnit.Callbacks.on_exit(fn -> @@ -991,6 +1092,8 @@ defmodule Sentry.Test do if is_atom(processor_name) and not is_nil(processor_name) do Sentry.Test.Registry.drop_processor_routing_for(processor_name) end + + Sentry.Test.Registry.drop_oban_tags_for(test_pid) end) :ok diff --git a/lib/sentry/test/registry.ex b/lib/sentry/test/registry.ex index 4de239d5..9eea2480 100644 --- a/lib/sentry/test/registry.ex +++ b/lib/sentry/test/registry.ex @@ -20,6 +20,10 @@ defmodule Sentry.Test.Registry do # `tag_processor_for/2` sets it. @routing_table :sentry_test_pid_routing + # Separate ETS table tagging Oban job ids to the test pid that + # scheduled them, used by the Oban auto-allowance integration. + @oban_jobs_table :sentry_test_oban_job_tags + @spec start_link(keyword()) :: GenServer.on_start() def start_link([] = _opts) do GenServer.start_link(__MODULE__, nil, name: __MODULE__) @@ -236,9 +240,77 @@ defmodule Sentry.Test.Registry do :ok end + @doc """ + Tags an inserted Oban job with the pid of the process that scheduled + it. Used by the `allowance: [Oban]` telemetry handlers in + `Sentry.Test` to route a worker's captured events back to the + inserting test under `async: true`. + + Direct ETS write — atomic, no GenServer round-trip. + """ + @spec tag_oban_job(integer(), pid()) :: :ok + def tag_oban_job(job_id, owner_pid) + when is_integer(job_id) and is_pid(owner_pid) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.insert(@oban_jobs_table, {job_id, owner_pid}) + end + + :ok + end + + @doc """ + Returns the pid that tagged `job_id`, or `nil` if the tag is missing + or the tagging pid is no longer alive. + """ + @spec lookup_oban_job(integer()) :: pid() | nil + def lookup_oban_job(job_id) when is_integer(job_id) do + case :ets.whereis(@oban_jobs_table) do + :undefined -> + nil + + _ -> + case :ets.lookup(@oban_jobs_table, job_id) do + [{^job_id, pid}] when is_pid(pid) -> + if Process.alive?(pid), do: pid, else: nil + + [] -> + nil + end + end + end + + @doc """ + Removes the tag for `job_id`. Called from the `:oban, :job, :stop` + and `:oban, :job, :exception` handlers. + """ + @spec untag_oban_job(integer()) :: :ok + def untag_oban_job(job_id) when is_integer(job_id) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.delete(@oban_jobs_table, job_id) + end + + :ok + end + + @doc """ + Removes every tag whose owner is `owner_pid`. Used by + `setup_collector/1`'s `on_exit/1` cleanup so jobs that crashed + before emitting a `:stop`/`:exception` event don't leave stale tags + behind. + """ + @spec drop_oban_tags_for(pid()) :: :ok + def drop_oban_tags_for(owner_pid) when is_pid(owner_pid) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.match_delete(@oban_jobs_table, {:_, owner_pid}) + end + + :ok + end + @impl true def init(nil) do _routing_table = :ets.new(@routing_table, [:named_table, :public, :set]) + _oban_jobs_table = :ets.new(@oban_jobs_table, [:named_table, :public, :set]) maybe_start_default_bypass() {:ok, %{owner_monitors: %{}}} end diff --git a/test/sentry/test_test.exs b/test/sentry/test_test.exs index ca542b23..cc9a717a 100644 --- a/test/sentry/test_test.exs +++ b/test/sentry/test_test.exs @@ -416,6 +416,143 @@ defmodule Sentry.TestTest do end end + describe "setup_sentry/1 with allowance: [Oban] (synthetic events)" do + setup do + SentryTest.setup_sentry(allowance: [Oban]) + end + + test "tags the job at insert time and routes the worker on start" do + test_pid = self() + job = %{id: System.unique_integer([:positive])} + + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == test_pid + + worker_done = make_ref() + + # Raw spawn/1 — does NOT propagate $callers, so the worker has no + # caller-chain link back to the test. The tag store is the only + # path that can route this worker's events to the test's collector. + worker = + spawn(fn -> + :telemetry.execute([:oban, :job, :start], %{}, %{job: job}) + + captured = + case Sentry.capture_message("oban hello", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "oban hello"}}] = + SentryTest.pop_sentry_reports() + end + + test "ignores jobs that were not tagged at insert time" do + test_pid = self() + job = %{id: System.unique_integer([:positive])} + worker_done = make_ref() + + worker = + spawn(fn -> + :telemetry.execute([:oban, :job, :start], %{}, %{job: job}) + + captured = + case Sentry.capture_message("untagged", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [] == SentryTest.pop_sentry_reports() + end + + test "untags the job on :stop" do + job = %{id: System.unique_integer([:positive])} + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + :telemetry.execute([:oban, :job, :stop], %{}, %{job: job}) + refute Sentry.Test.Registry.lookup_oban_job(job.id) + end + + test "untags the job on :exception" do + job = %{id: System.unique_integer([:positive])} + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + :telemetry.execute([:oban, :job, :exception], %{}, %{job: job}) + refute Sentry.Test.Registry.lookup_oban_job(job.id) + end + + test "insert_all_jobs tags every job in the batch" do + jobs = [ + %{id: System.unique_integer([:positive])}, + %{id: System.unique_integer([:positive])} + ] + + :telemetry.execute([:oban, :engine, :insert_all_jobs, :stop], %{}, %{jobs: jobs}) + + for job <- jobs do + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + end + end + + test "silently ignores synthetic jobs without an integer id" do + # :inline mode jobs / ad-hoc telemetry simulations may carry no id. + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: %{id: nil}}) + :telemetry.execute([:oban, :job, :start], %{}, %{job: %{id: nil}}) + :ok + end + + test "two concurrent test scopes are routed independently" do + test_pid = self() + job_for_me = %{id: System.unique_integer([:positive])} + job_for_peer = %{id: System.unique_integer([:positive])} + + # Spawn a peer that acts as a separate live owner via NimbleOwnership, + # tags its own Oban job, and reports back when ready. + peer = + spawn(fn -> + {:ok, _} = + NimbleOwnership.get_and_update( + Sentry.Test.OwnershipServer, + self(), + :sentry_test_collector, + fn _ -> {:ok, :peer_table} end + ) + + Sentry.Test.Registry.tag_oban_job(job_for_peer.id, self()) + send(test_pid, :claimed) + + receive do + :exit -> :ok + end + end) + + on_exit(fn -> Process.exit(peer, :kill) end) + assert_receive :claimed, 5_000 + + # Tag my own job. + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job_for_me}) + + assert Sentry.Test.Registry.lookup_oban_job(job_for_me.id) == test_pid + assert Sentry.Test.Registry.lookup_oban_job(job_for_peer.id) == peer + end + end + describe "before_send wrapping" do test "wraps existing before_send callback" do test_pid = self() diff --git a/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs b/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs index 92e032ca..423af688 100644 --- a/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs +++ b/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs @@ -419,6 +419,133 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do end end + describe "setup_sentry/1 with allowance: [Oban]" do + setup do + Sentry.Test.setup_sentry(allowance: [Oban]) + Sentry.Integrations.Oban.ErrorReporter.attach() + on_exit(fn -> :telemetry.detach(Sentry.Integrations.Oban.ErrorReporter) end) + end + + test "captures events from a real Oban worker with no manual telemetry plumbing" do + job = %Oban.Job{ + id: System.unique_integer([:positive]), + args: %{"should_fail" => true}, + worker: "Sentry.Integrations.Phoenix.ObanTest.FailingWorker", + queue: "background", + attempt: 1, + max_attempts: 1, + meta: %{}, + inserted_at: DateTime.utc_now(), + scheduled_at: DateTime.utc_now(), + attempted_at: DateTime.utc_now() + } + + # Phase 1: simulate the insert-time engine event in the test pid. + # The allowance handler tags this job's id with self() so the + # detached worker can be routed back here on :start. + :telemetry.execute( + [:oban, :engine, :insert_job, :stop], + %{}, + %{job: job, conf: %{name: Oban}} + ) + + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + # Phase 2: run the worker in a detached process — no `$callers` + # link to this test, so the only path that can route the worker's + # captured events back is the tag established above. + run_job_in_detached_process(job) + + # The FailingWorker raises; the Oban error reporter turns it into + # an event, and the allowance routing delivers it to this test. + assert [%Sentry.Event{} = event] = Sentry.Test.pop_sentry_reports() + + assert [%Sentry.Interfaces.Exception{} = exception] = event.exception + assert exception.value == "intentional failure for testing" + end + + test "captures Sentry.Metrics.count/3 from a real Oban worker" do + job = %Oban.Job{ + id: System.unique_integer([:positive]), + args: %{}, + worker: "Sentry.Integrations.Phoenix.ObanTest.FailingWorker", + queue: "background", + attempt: 1, + max_attempts: 1, + meta: %{}, + inserted_at: DateTime.utc_now(), + scheduled_at: DateTime.utc_now(), + attempted_at: DateTime.utc_now() + } + + :telemetry.execute( + [:oban, :engine, :insert_job, :stop], + %{}, + %{job: job, conf: %{name: Oban}} + ) + + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + emit_metric_in_detached_process(job, fn -> + Sentry.Metrics.count("oban.allowance.metric.test", 1) + end) + + Sentry.TelemetryProcessor.flush() + Sentry.TelemetryProcessor.flush(Sentry.TelemetryProcessor) + + metrics = Sentry.Test.pop_sentry_metrics() + + assert Enum.any?(metrics, &(&1.name == "oban.allowance.metric.test")), + "expected metric emitted from the Oban worker process to land in the " <> + "test collector, got: #{inspect(metrics)}" + end + end + + defp run_job_in_detached_process(job) do + parent = self() + ref = make_ref() + + spawn(fn -> + start_metadata = %{job: job, conf: %{name: Oban}} + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) + + {kind, reason, stacktrace} = + try do + FailingWorker.perform(job) + {:ok, nil, []} + catch + kind, reason -> {kind, reason, __STACKTRACE__} + end + + :telemetry.execute( + [:oban, :job, :exception], + %{duration: 0}, + Map.merge(start_metadata, %{ + kind: kind, + reason: reason, + error: reason, + stacktrace: stacktrace, + state: :failure + }) + ) + + send(parent, {ref, :done}) + end) + + receive do + {^ref, :done} -> :ok + after + 5_000 -> flunk("worker process did not finish in time") + end + + Process.sleep(50) + end + defp run_failing_worker_in_detached_process(opts) do parent = self() ref = make_ref() @@ -440,7 +567,12 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do } start_metadata = %{job: job, conf: %{name: Oban}} - :telemetry.execute([:oban, :job, :start], %{system_time: System.system_time()}, start_metadata) + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) {kind, reason, stacktrace} = try do @@ -476,4 +608,37 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do Process.sleep(50) end + + defp emit_metric_in_detached_process(job, fun) do + parent = self() + ref = make_ref() + + spawn(fn -> + start_metadata = %{job: job, conf: %{name: Oban}} + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) + + fun.() + + :telemetry.execute( + [:oban, :job, :stop], + %{duration: 0}, + Map.merge(start_metadata, %{state: :success, result: :ok}) + ) + + send(parent, {ref, :done}) + end) + + receive do + {^ref, :done} -> :ok + after + 5_000 -> flunk("worker process did not finish in time") + end + + Process.sleep(50) + end end From 545e3b654be8deee117113faf8ad6803862a7eb6 Mon Sep 17 00:00:00 2001 From: Peter Solnica Date: Wed, 13 May 2026 11:04:41 +0000 Subject: [PATCH 3/3] feat(tests): support allowance: [Broadway] with metadata-tagged routing MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Adds Broadway to the :allowance dispatch. The handler subscribes to [:broadway, :processor, :start] and [:broadway, :batch_processor, :start], reads the first :sentry_test_owner found in metadata.messages[*].metadata, and calls allow_sentry_reports/2 for that test pid. This follows the same shape Broadway documents for Ecto sandbox testing (metadata: %{ecto_sandbox: self()}) — no wrapper around Broadway.test_message/3, no hidden state, and async-safe by design because each message carries its origin test pid. Messages without :sentry_test_owner are silently skipped. Integration coverage adds a minimal Broadway pipeline (PhoenixApp.TestBroadway) and a 3-test broadway_test.exs in phoenix_app proving: (1) tagged + allowance captures, (2) untagged + allowance does not, (3) tagged + no allowance does not. async: true on the describe block to validate cross-test isolation through a shared pipeline. Co-Authored-By: Claude Opus 4.7 (1M context) --- lib/sentry/test.ex | 81 ++++++++++- test/sentry/test_test.exs | 128 ++++++++++++++++++ test_integrations/phoenix_app/mix.exs | 3 +- test_integrations/phoenix_app/mix.lock | 2 + .../test/phoenix_app/broadway_test.exs | 53 ++++++++ .../phoenix_app/test/support/test_broadway.ex | 29 ++++ 6 files changed, 291 insertions(+), 5 deletions(-) create mode 100644 test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs create mode 100644 test_integrations/phoenix_app/test/support/test_broadway.ex diff --git a/lib/sentry/test.ex b/lib/sentry/test.ex index e8c55cb2..2c11e3ab 100644 --- a/lib/sentry/test.ex +++ b/lib/sentry/test.ex @@ -52,7 +52,7 @@ defmodule Sentry.Test do @moduledoc since: "10.2.0" - @compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry]} + @compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry, Broadway]} @ownership_server Sentry.Test.OwnershipServer @@ -75,9 +75,9 @@ defmodule Sentry.Test do ## Options - * `:allowance` - a list of integration module atoms (currently `Oban`) - to enable automatic `Sentry.Test.allow_sentry_reports/2` wiring for. - See the "Oban tests" section below. + * `:allowance` - a list of integration module atoms (currently `Oban` + and `Broadway`) to enable automatic `Sentry.Test.allow_sentry_reports/2` + wiring for. See the "Oban tests" and "Broadway tests" sections below. Any other key is forwarded to the per-test Sentry config (e.g., `dedup_events: false`, `traces_sample_rate: 1.0`). @@ -125,6 +125,41 @@ defmodule Sentry.Test do jobs) are not auto-tagged and require manual `Sentry.Test.allow_sentry_reports/2`. + ## Broadway tests + + To route events from a Broadway processor or batch-processor back + to your test, pass `:sentry_test_owner` in the message metadata + when injecting messages via `Broadway.test_message/3` or + `Broadway.test_batch/3`: + + setup do + Sentry.Test.setup_sentry(allowance: [Broadway]) + start_supervised!(MyPipeline) + :ok + end + + test "captures events from the processor" do + ref = + Broadway.test_message(MyPipeline, payload, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []} + + assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports() + end + + This mirrors the [Ecto sandbox pattern documented in + Broadway](https://hexdocs.pm/broadway/Broadway.html#module-testing-with-ecto): + the test owner travels with the message itself, so two `async: true` + tests racing through the same pipeline are routed independently. + + Messages submitted without the `:sentry_test_owner` metadata are not + auto-allowed — the handler silently skips them. For production + producers (Kafka, SQS, etc.) that need the same routing, attach the + same key to the messages they emit; the handler reads it regardless + of source. + ## Examples setup do @@ -948,6 +983,17 @@ defmodule Sentry.Test do ] end + defp allowance_handlers(Broadway) do + # Both events fire once per worker invocation (per batch) in the + # processor / batch-processor pid, with metadata.messages giving + # the full batch. Reading the owner from message metadata is the + # documented Broadway pattern (same shape as `ecto_sandbox`). + [ + {[:broadway, :processor, :start], {__MODULE__, :__handle_broadway_batch_start__}}, + {[:broadway, :batch_processor, :start], {__MODULE__, :__handle_broadway_batch_start__}} + ] + end + defp allowance_handlers(_other), do: :unknown # ── Oban allowance handlers ── @@ -996,6 +1042,33 @@ defmodule Sentry.Test do def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok + # ── Broadway allowance handler ── + # + # The handler walks the batch's messages looking for a + # `:sentry_test_owner` metadata entry — the documented Broadway test + # pattern, identical in shape to the `:ecto_sandbox` example in the + # Broadway testing guide. Tests submit messages via + # `Broadway.test_message/3` with + # `metadata: %{sentry_test_owner: self()}` (or any custom producer + # that propagates `:metadata` onto `%Broadway.Message{}`). + @doc false + def __handle_broadway_batch_start__(_event, _measurements, %{messages: messages}, _config) + when is_list(messages) do + case find_broadway_owner(messages) do + nil -> :ok + owner_pid -> safe_allow(owner_pid, self()) + end + end + + def __handle_broadway_batch_start__(_event, _measurements, _metadata, _config), do: :ok + + defp find_broadway_owner(messages) do + Enum.find_value(messages, fn + %{metadata: %{sentry_test_owner: pid}} when is_pid(pid) -> pid + _ -> nil + end) + end + # Best-effort allow used by the Oban / Broadway dispatch handlers. # Swallows the `ArgumentError` that `allow_sentry_reports/2` raises # when: diff --git a/test/sentry/test_test.exs b/test/sentry/test_test.exs index cc9a717a..cc06de8d 100644 --- a/test/sentry/test_test.exs +++ b/test/sentry/test_test.exs @@ -553,6 +553,134 @@ defmodule Sentry.TestTest do end end + describe "setup_sentry/1 with allowance: [Broadway] (synthetic events)" do + setup do + SentryTest.setup_sentry(allowance: [Broadway]) + end + + test "processor batch start with a tagged message routes the worker" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{sentry_test_owner: test_pid}}] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("from broadway processor", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway processor"}}] = + SentryTest.pop_sentry_reports() + end + + test "batch_processor start with a tagged message routes the worker" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{sentry_test_owner: test_pid}}] + + :telemetry.execute( + [:broadway, :batch_processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("from broadway batch", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway batch"}}] = + SentryTest.pop_sentry_reports() + end + + test "messages without :sentry_test_owner metadata are not auto-allowed" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{some_other_key: :foo}}] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("untagged broadway", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [] == SentryTest.pop_sentry_reports() + end + + test "uses the first tagged message in a mixed batch" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [ + %{metadata: %{some_other_key: :foo}}, + %{metadata: %{sentry_test_owner: test_pid}}, + %{metadata: %{}} + ] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + Sentry.capture_message("mixed batch", result: :sync) + send(test_pid, worker_done) + end) + + ref = Process.monitor(worker) + assert_receive ^worker_done, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "mixed batch"}}] = + SentryTest.pop_sentry_reports() + end + end + describe "before_send wrapping" do test "wraps existing before_send callback" do test_pid = self() diff --git a/test_integrations/phoenix_app/mix.exs b/test_integrations/phoenix_app/mix.exs index 8eccda4a..1f9e77fe 100644 --- a/test_integrations/phoenix_app/mix.exs +++ b/test_integrations/phoenix_app/mix.exs @@ -75,7 +75,8 @@ defmodule PhoenixApp.MixProject do {:opentelemetry_ecto, "~> 1.2"}, {:opentelemetry_logger_metadata, "~> 0.2.0"}, {:hackney, "~> 1.18"}, - {:oban, "~> 2.10"} + {:oban, "~> 2.10"}, + {:broadway, "~> 1.0", only: [:test]} ] end diff --git a/test_integrations/phoenix_app/mix.lock b/test_integrations/phoenix_app/mix.lock index b44b46cc..8259a7d1 100644 --- a/test_integrations/phoenix_app/mix.lock +++ b/test_integrations/phoenix_app/mix.lock @@ -1,6 +1,7 @@ %{ "acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"}, "bandit": {:hex, :bandit, "1.10.3", "1e5d168fa79ec8de2860d1b4d878d97d4fbbe2fdbe7b0a7d9315a4359d1d4bb9", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "99a52d909c48db65ca598e1962797659e3c0f1d06e825a50c3d75b74a5e2db18"}, + "broadway": {:hex, :broadway, "1.3.0", "f75f6376159b74f55c5ba2629dac613e4fd79d9e71148ab5fbac8fdd7c999d2a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bef3b4c5512d0072917b70239cbecf8f76a2587465a5b7c3e2b9ae18b4bc405b"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "castore": {:hex, :castore, "1.0.17", "4f9770d2d45fbd91dcf6bd404cf64e7e58fed04fadda0923dc32acca0badffa2", [:mix], [], "hexpm", "12d24b9d80b910dd3953e165636d68f147a31db945d2dcb9365e441f8b5351e5"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"}, @@ -24,6 +25,7 @@ "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, "fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"}, "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, + "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, diff --git a/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs b/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs new file mode 100644 index 00000000..b674b034 --- /dev/null +++ b/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs @@ -0,0 +1,53 @@ +defmodule PhoenixApp.BroadwayTest do + # async: true — the auto-allowance design routes per-message via the + # :sentry_test_owner metadata, which Broadway propagates onto the + # %Broadway.Message{} struct. Two of these tests racing against each + # other on the same shared pipeline still produce the right per-test + # results because each message carries its origin test pid. + use ExUnit.Case, async: true + + describe "setup_sentry/1 with allowance: [Broadway]" do + setup do + Sentry.Test.setup_sentry(allowance: [Broadway]) + start_supervised!(PhoenixApp.TestBroadway) + :ok + end + + test "events from a Broadway processor are captured when tagged via metadata" do + ref = + Broadway.test_message(PhoenixApp.TestBroadway, :capture, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway"}}] = + Sentry.Test.pop_sentry_reports() + end + + test "raw Broadway.test_message without :sentry_test_owner is not auto-allowed" do + ref = Broadway.test_message(PhoenixApp.TestBroadway, :capture) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + assert [] == Sentry.Test.pop_sentry_reports() + end + end + + describe "without allowance" do + setup do + Sentry.Test.setup_sentry() + start_supervised!(PhoenixApp.TestBroadway) + :ok + end + + test "tagged messages are still dropped without allowance: [Broadway]" do + ref = + Broadway.test_message(PhoenixApp.TestBroadway, :capture, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + assert [] == Sentry.Test.pop_sentry_reports() + end + end +end diff --git a/test_integrations/phoenix_app/test/support/test_broadway.ex b/test_integrations/phoenix_app/test/support/test_broadway.ex new file mode 100644 index 00000000..211d7bf9 --- /dev/null +++ b/test_integrations/phoenix_app/test/support/test_broadway.ex @@ -0,0 +1,29 @@ +defmodule PhoenixApp.TestBroadway do + @moduledoc false + + use Broadway + + def start_link(_opts \\ []) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [module: {Broadway.DummyProducer, []}], + processors: [default: [concurrency: 1]] + ) + end + + @impl true + def handle_message(_processor, %Broadway.Message{data: data} = message, _context) do + case data do + :capture -> + Sentry.capture_message("from broadway", result: :sync) + + :raise -> + raise "broadway boom" + + _ -> + :ok + end + + message + end +end