diff --git a/lib/sentry/test.ex b/lib/sentry/test.ex index 7c98dcd9..2c11e3ab 100644 --- a/lib/sentry/test.ex +++ b/lib/sentry/test.ex @@ -52,7 +52,7 @@ defmodule Sentry.Test do @moduledoc since: "10.2.0" - @compile {:no_warn_undefined, [Bypass, Plug.Conn]} + @compile {:no_warn_undefined, [Bypass, Plug.Conn, :telemetry, Broadway]} @ownership_server Sentry.Test.OwnershipServer @@ -75,8 +75,12 @@ defmodule Sentry.Test do ## Options - Any extra Sentry config options (e.g., `dedup_events: false`, `traces_sample_rate: 1.0`) - will be forwarded to the test config. + * `:allowance` - a list of integration module atoms (currently `Oban` + and `Broadway`) to enable automatic `Sentry.Test.allow_sentry_reports/2` + wiring for. See the "Oban tests" and "Broadway tests" sections below. + + Any other key is forwarded to the per-test Sentry config (e.g., + `dedup_events: false`, `traces_sample_rate: 1.0`). The reserved `:telemetry_processor` option is *not* forwarded to the test config. Instead, its value (a keyword list) is passed to the per-test @@ -94,6 +98,67 @@ defmodule Sentry.Test do This collapses the common `bypass = setup_sentry(...); ref = setup_bypass_envelope_collector(bypass)` two-step into one call. + ## Oban tests + + When you run Oban in `:inline` or `:manual` mode (per the + [Oban testing guide](https://hexdocs.pm/oban/testing.html)), jobs + execute synchronously in the calling process and `Sentry.Test` + captures their events automatically — no `:allowance` option needed. + + Use `allowance: [Oban]` when your test exercises a real Oban + supervisor with worker processes (the production-like setup that + issue #1052 was filed for). The option installs telemetry handlers + that tag jobs at insert time and route the worker's captured events + back to the inserting test: + + setup do + Sentry.Test.setup_sentry(allowance: [Oban]) + end + + test "captures events from a real Oban worker" do + {:ok, _} = Oban.insert(MyWorker.new(%{})) + # ... wait for the worker to run ... + assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports() + end + + Jobs inserted by other processes (cron plugins, jobs scheduling + jobs) are not auto-tagged and require manual + `Sentry.Test.allow_sentry_reports/2`. + + ## Broadway tests + + To route events from a Broadway processor or batch-processor back + to your test, pass `:sentry_test_owner` in the message metadata + when injecting messages via `Broadway.test_message/3` or + `Broadway.test_batch/3`: + + setup do + Sentry.Test.setup_sentry(allowance: [Broadway]) + start_supervised!(MyPipeline) + :ok + end + + test "captures events from the processor" do + ref = + Broadway.test_message(MyPipeline, payload, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []} + + assert [%Sentry.Event{}] = Sentry.Test.pop_sentry_reports() + end + + This mirrors the [Ecto sandbox pattern documented in + Broadway](https://hexdocs.pm/broadway/Broadway.html#module-testing-with-ecto): + the test owner travels with the message itself, so two `async: true` + tests racing through the same pipeline are routed independently. + + Messages submitted without the `:sentry_test_owner` metadata are not + auto-allowed — the handler silently skips them. For production + producers (Kafka, SQS, etc.) that need the same routing, attach the + same key to the messages they emit; the handler reads it regardless + of source. ## Examples @@ -135,6 +200,7 @@ defmodule Sentry.Test do {tp_opts, extra_config} = Keyword.pop(extra_config, :telemetry_processor, []) {collect_envelopes, extra_config} = Keyword.pop(extra_config, :collect_envelopes, false) + {allowance, extra_config} = Keyword.pop(extra_config, :allowance, []) # Open a per-test Bypass and stub the envelope endpoint bypass = Bypass.open() @@ -151,6 +217,8 @@ defmodule Sentry.Test do bypass_config = [dsn: "http://public:secret@localhost:#{bypass.port}/1"] setup_collector(bypass_config ++ extra_config) + attach_allowance_handlers(allowance, self()) + case collect_envelopes do false -> %{bypass: bypass, telemetry_processor: processor_name} @@ -833,6 +901,191 @@ defmodule Sentry.Test do end end + defp ensure_telemetry_loaded! do + unless Code.ensure_loaded?(:telemetry) do + raise """ + `:telemetry` is required for the `:allowance` option of Sentry.Test + but is not available. Add it to your test dependencies: + + {:telemetry, "~> 1.0", only: [:test]} + """ + end + end + + # ── :allowance plumbing ── + # + # Each integration atom (e.g. Oban, Broadway) is mapped by + # allowance_handlers!/1 to one or more {telemetry_event, {module, fun}} + # pairs. Commit 1 ships only the catch-all clause; commits 2 and 3 add + # the integration-specific clauses. + + defp attach_allowance_handlers([], _owner_pid), do: :ok + + defp attach_allowance_handlers(modules, owner_pid) when is_list(modules) do + ensure_telemetry_loaded!() + Enum.each(modules, &attach_allowance_handler(&1, owner_pid)) + end + + defp attach_allowance_handler(module, owner_pid) do + case allowance_handlers(module) do + :unknown -> + raise ArgumentError, + "unknown :allowance entry #{inspect(module)}. Supported integrations: " <> + "(none built-in yet — Oban and Broadway land in follow-up commits)" + + pairs when is_list(pairs) -> + Enum.each(pairs, fn {event, handler_fun} -> + __attach_allowance__(event, handler_fun, %{owner_pid: owner_pid}) + end) + end + end + + @doc false + @spec __attach_allowance__([atom()], {module(), atom()}, map()) :: :ok + def __attach_allowance__(event, {module, function}, config) + when is_list(event) and is_atom(module) and is_atom(function) and is_map(config) do + ref = System.unique_integer([:positive]) + handler_id = {:sentry_test_allowance, ref} + + :ok = + :telemetry.attach( + handler_id, + event, + Function.capture(module, function, 4), + config + ) + + ExUnit.Callbacks.on_exit(fn -> :telemetry.detach(handler_id) end) + :ok + end + + # Generic "allow whatever fired this event for owner_pid" handler. Used + # by the foundation unit tests and available for ad-hoc telemetry + # routing; the Oban / Broadway dispatches use their own handlers that + # consult metadata rather than blindly allowing the emitting pid. + @doc false + def __handle_allowance_event__(_event, _measurements, _metadata, %{owner_pid: owner_pid}) do + allow_sentry_reports(owner_pid, self()) + rescue + ArgumentError -> :ok + end + + # Returns the list of `{event_path, {module, function}}` handler pairs for + # a given integration atom, or `:unknown` for unsupported entries (the + # caller turns that into an `ArgumentError`). + defp allowance_handlers(Oban) do + [ + {[:oban, :engine, :insert_job, :stop], {__MODULE__, :__handle_oban_insert_job__}}, + {[:oban, :engine, :insert_all_jobs, :stop], {__MODULE__, :__handle_oban_insert_all_jobs__}}, + {[:oban, :job, :start], {__MODULE__, :__handle_oban_job_start__}}, + {[:oban, :job, :stop], {__MODULE__, :__handle_oban_job_finish__}}, + {[:oban, :job, :exception], {__MODULE__, :__handle_oban_job_finish__}} + ] + end + + defp allowance_handlers(Broadway) do + # Both events fire once per worker invocation (per batch) in the + # processor / batch-processor pid, with metadata.messages giving + # the full batch. Reading the owner from message metadata is the + # documented Broadway pattern (same shape as `ecto_sandbox`). + [ + {[:broadway, :processor, :start], {__MODULE__, :__handle_broadway_batch_start__}}, + {[:broadway, :batch_processor, :start], {__MODULE__, :__handle_broadway_batch_start__}} + ] + end + + defp allowance_handlers(_other), do: :unknown + + # ── Oban allowance handlers ── + # + # Guards on `is_integer(id)` so synthetic jobs from `:inline` mode or + # ad-hoc telemetry simulations (no persisted id) are silently skipped — + # keeps the handlers safe to install in any test config. + + @doc false + def __handle_oban_insert_job__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + Sentry.Test.Registry.tag_oban_job(id, self()) + end + + def __handle_oban_insert_job__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_insert_all_jobs__(_event, _measurements, %{jobs: jobs}, _config) + when is_list(jobs) do + pid = self() + + Enum.each(jobs, fn + %{id: id} when is_integer(id) -> Sentry.Test.Registry.tag_oban_job(id, pid) + _ -> :ok + end) + end + + def __handle_oban_insert_all_jobs__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_job_start__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + case Sentry.Test.Registry.lookup_oban_job(id) do + nil -> :ok + test_pid -> safe_allow(test_pid, self()) + end + end + + def __handle_oban_job_start__(_event, _measurements, _metadata, _config), do: :ok + + @doc false + def __handle_oban_job_finish__(_event, _measurements, %{job: %{id: id}}, _config) + when is_integer(id) do + Sentry.Test.Registry.untag_oban_job(id) + end + + def __handle_oban_job_finish__(_event, _measurements, _metadata, _config), do: :ok + + # ── Broadway allowance handler ── + # + # The handler walks the batch's messages looking for a + # `:sentry_test_owner` metadata entry — the documented Broadway test + # pattern, identical in shape to the `:ecto_sandbox` example in the + # Broadway testing guide. Tests submit messages via + # `Broadway.test_message/3` with + # `metadata: %{sentry_test_owner: self()}` (or any custom producer + # that propagates `:metadata` onto `%Broadway.Message{}`). + @doc false + def __handle_broadway_batch_start__(_event, _measurements, %{messages: messages}, _config) + when is_list(messages) do + case find_broadway_owner(messages) do + nil -> :ok + owner_pid -> safe_allow(owner_pid, self()) + end + end + + def __handle_broadway_batch_start__(_event, _measurements, _metadata, _config), do: :ok + + defp find_broadway_owner(messages) do + Enum.find_value(messages, fn + %{metadata: %{sentry_test_owner: pid}} when is_pid(pid) -> pid + _ -> nil + end) + end + + # Best-effort allow used by the Oban / Broadway dispatch handlers. + # Swallows the `ArgumentError` that `allow_sentry_reports/2` raises + # when: + # + # * the would-be worker pid is already allowed by another live test + # (concurrent tests racing on a shared worker — first wins); + # * the worker pid is the test pid itself (e.g. Oban :manual mode + # with `drain_queue/2` — `$callers` already routes the events); + # * the owner pid is no longer collecting (test exited between + # insert and start). + defp safe_allow(owner_pid, allowed_pid) + when is_pid(owner_pid) and is_pid(allowed_pid) do + allow_sentry_reports(owner_pid, allowed_pid) + rescue + ArgumentError -> :ok + end + # Sets up collection infrastructure (ETS table, before_send wrapping, config) # without opening a new Bypass. When no :dsn is provided in extra_config, # falls back to the default Bypass DSN from Registry. @@ -897,7 +1150,11 @@ defmodule Sentry.Test do # cleans up the key and allowances automatically when this test exits. # Drop any worker→processor routing rows that point at this test's # processor so a test that exits before its allowed pids do not - # leave stale rows pointing at a stopped per-test processor. + # leave stale rows pointing at a stopped per-test processor. Also + # defensively drop any Oban job tags owned by this test in case a + # job crashed before emitting a `:stop` / `:exception` event — leaves a + # stale row pointing at a dead pid otherwise. + test_pid = self() processor_name = Process.get(:sentry_telemetry_processor) ExUnit.Callbacks.on_exit(fn -> @@ -908,6 +1165,8 @@ defmodule Sentry.Test do if is_atom(processor_name) and not is_nil(processor_name) do Sentry.Test.Registry.drop_processor_routing_for(processor_name) end + + Sentry.Test.Registry.drop_oban_tags_for(test_pid) end) :ok diff --git a/lib/sentry/test/registry.ex b/lib/sentry/test/registry.ex index 4de239d5..9eea2480 100644 --- a/lib/sentry/test/registry.ex +++ b/lib/sentry/test/registry.ex @@ -20,6 +20,10 @@ defmodule Sentry.Test.Registry do # `tag_processor_for/2` sets it. @routing_table :sentry_test_pid_routing + # Separate ETS table tagging Oban job ids to the test pid that + # scheduled them, used by the Oban auto-allowance integration. + @oban_jobs_table :sentry_test_oban_job_tags + @spec start_link(keyword()) :: GenServer.on_start() def start_link([] = _opts) do GenServer.start_link(__MODULE__, nil, name: __MODULE__) @@ -236,9 +240,77 @@ defmodule Sentry.Test.Registry do :ok end + @doc """ + Tags an inserted Oban job with the pid of the process that scheduled + it. Used by the `allowance: [Oban]` telemetry handlers in + `Sentry.Test` to route a worker's captured events back to the + inserting test under `async: true`. + + Direct ETS write — atomic, no GenServer round-trip. + """ + @spec tag_oban_job(integer(), pid()) :: :ok + def tag_oban_job(job_id, owner_pid) + when is_integer(job_id) and is_pid(owner_pid) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.insert(@oban_jobs_table, {job_id, owner_pid}) + end + + :ok + end + + @doc """ + Returns the pid that tagged `job_id`, or `nil` if the tag is missing + or the tagging pid is no longer alive. + """ + @spec lookup_oban_job(integer()) :: pid() | nil + def lookup_oban_job(job_id) when is_integer(job_id) do + case :ets.whereis(@oban_jobs_table) do + :undefined -> + nil + + _ -> + case :ets.lookup(@oban_jobs_table, job_id) do + [{^job_id, pid}] when is_pid(pid) -> + if Process.alive?(pid), do: pid, else: nil + + [] -> + nil + end + end + end + + @doc """ + Removes the tag for `job_id`. Called from the `:oban, :job, :stop` + and `:oban, :job, :exception` handlers. + """ + @spec untag_oban_job(integer()) :: :ok + def untag_oban_job(job_id) when is_integer(job_id) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.delete(@oban_jobs_table, job_id) + end + + :ok + end + + @doc """ + Removes every tag whose owner is `owner_pid`. Used by + `setup_collector/1`'s `on_exit/1` cleanup so jobs that crashed + before emitting a `:stop`/`:exception` event don't leave stale tags + behind. + """ + @spec drop_oban_tags_for(pid()) :: :ok + def drop_oban_tags_for(owner_pid) when is_pid(owner_pid) do + if :ets.whereis(@oban_jobs_table) != :undefined do + :ets.match_delete(@oban_jobs_table, {:_, owner_pid}) + end + + :ok + end + @impl true def init(nil) do _routing_table = :ets.new(@routing_table, [:named_table, :public, :set]) + _oban_jobs_table = :ets.new(@oban_jobs_table, [:named_table, :public, :set]) maybe_start_default_bypass() {:ok, %{owner_monitors: %{}}} end diff --git a/test/sentry/test_test.exs b/test/sentry/test_test.exs index 79c065b6..cc06de8d 100644 --- a/test/sentry/test_test.exs +++ b/test/sentry/test_test.exs @@ -375,6 +375,312 @@ defmodule Sentry.TestTest do end end + describe "setup_sentry/1 with :allowance (foundation)" do + test "empty allowance list is a no-op" do + assert %{bypass: _, telemetry_processor: _} = + SentryTest.setup_sentry(allowance: []) + end + + test "raises a clear error for unknown allowance entries" do + assert_raise ArgumentError, ~r/unknown :allowance entry/, fn -> + SentryTest.setup_sentry(allowance: [SomeUnknownThing]) + end + end + + test "__attach_allowance__/3 routes worker events back to the owner" do + SentryTest.setup_sentry() + test_pid = self() + + SentryTest.__attach_allowance__( + [:sentry_test_allowance, :synthetic, :start], + {SentryTest, :__handle_allowance_event__}, + %{owner_pid: test_pid} + ) + + worker_done = make_ref() + + {:ok, _worker} = + Task.start(fn -> + :telemetry.execute([:sentry_test_allowance, :synthetic, :start], %{}, %{}) + + assert {:ok, _} = + Sentry.capture_message("hello from synthetic worker", result: :sync) + + send(test_pid, worker_done) + end) + + assert_receive ^worker_done, 5_000 + + assert [%Sentry.Event{message: %{formatted: "hello from synthetic worker"}}] = + SentryTest.pop_sentry_reports() + end + end + + describe "setup_sentry/1 with allowance: [Oban] (synthetic events)" do + setup do + SentryTest.setup_sentry(allowance: [Oban]) + end + + test "tags the job at insert time and routes the worker on start" do + test_pid = self() + job = %{id: System.unique_integer([:positive])} + + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == test_pid + + worker_done = make_ref() + + # Raw spawn/1 — does NOT propagate $callers, so the worker has no + # caller-chain link back to the test. The tag store is the only + # path that can route this worker's events to the test's collector. + worker = + spawn(fn -> + :telemetry.execute([:oban, :job, :start], %{}, %{job: job}) + + captured = + case Sentry.capture_message("oban hello", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "oban hello"}}] = + SentryTest.pop_sentry_reports() + end + + test "ignores jobs that were not tagged at insert time" do + test_pid = self() + job = %{id: System.unique_integer([:positive])} + worker_done = make_ref() + + worker = + spawn(fn -> + :telemetry.execute([:oban, :job, :start], %{}, %{job: job}) + + captured = + case Sentry.capture_message("untagged", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [] == SentryTest.pop_sentry_reports() + end + + test "untags the job on :stop" do + job = %{id: System.unique_integer([:positive])} + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + :telemetry.execute([:oban, :job, :stop], %{}, %{job: job}) + refute Sentry.Test.Registry.lookup_oban_job(job.id) + end + + test "untags the job on :exception" do + job = %{id: System.unique_integer([:positive])} + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job}) + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + :telemetry.execute([:oban, :job, :exception], %{}, %{job: job}) + refute Sentry.Test.Registry.lookup_oban_job(job.id) + end + + test "insert_all_jobs tags every job in the batch" do + jobs = [ + %{id: System.unique_integer([:positive])}, + %{id: System.unique_integer([:positive])} + ] + + :telemetry.execute([:oban, :engine, :insert_all_jobs, :stop], %{}, %{jobs: jobs}) + + for job <- jobs do + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + end + end + + test "silently ignores synthetic jobs without an integer id" do + # :inline mode jobs / ad-hoc telemetry simulations may carry no id. + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: %{id: nil}}) + :telemetry.execute([:oban, :job, :start], %{}, %{job: %{id: nil}}) + :ok + end + + test "two concurrent test scopes are routed independently" do + test_pid = self() + job_for_me = %{id: System.unique_integer([:positive])} + job_for_peer = %{id: System.unique_integer([:positive])} + + # Spawn a peer that acts as a separate live owner via NimbleOwnership, + # tags its own Oban job, and reports back when ready. + peer = + spawn(fn -> + {:ok, _} = + NimbleOwnership.get_and_update( + Sentry.Test.OwnershipServer, + self(), + :sentry_test_collector, + fn _ -> {:ok, :peer_table} end + ) + + Sentry.Test.Registry.tag_oban_job(job_for_peer.id, self()) + send(test_pid, :claimed) + + receive do + :exit -> :ok + end + end) + + on_exit(fn -> Process.exit(peer, :kill) end) + assert_receive :claimed, 5_000 + + # Tag my own job. + :telemetry.execute([:oban, :engine, :insert_job, :stop], %{}, %{job: job_for_me}) + + assert Sentry.Test.Registry.lookup_oban_job(job_for_me.id) == test_pid + assert Sentry.Test.Registry.lookup_oban_job(job_for_peer.id) == peer + end + end + + describe "setup_sentry/1 with allowance: [Broadway] (synthetic events)" do + setup do + SentryTest.setup_sentry(allowance: [Broadway]) + end + + test "processor batch start with a tagged message routes the worker" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{sentry_test_owner: test_pid}}] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("from broadway processor", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway processor"}}] = + SentryTest.pop_sentry_reports() + end + + test "batch_processor start with a tagged message routes the worker" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{sentry_test_owner: test_pid}}] + + :telemetry.execute( + [:broadway, :batch_processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("from broadway batch", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway batch"}}] = + SentryTest.pop_sentry_reports() + end + + test "messages without :sentry_test_owner metadata are not auto-allowed" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [%{metadata: %{some_other_key: :foo}}] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + captured = + case Sentry.capture_message("untagged broadway", result: :sync) do + {:ok, _} -> :captured + other -> {:unexpected, other} + end + + send(test_pid, {worker_done, captured}) + end) + + ref = Process.monitor(worker) + assert_receive {^worker_done, :captured}, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [] == SentryTest.pop_sentry_reports() + end + + test "uses the first tagged message in a mixed batch" do + test_pid = self() + worker_done = make_ref() + + worker = + spawn(fn -> + messages = [ + %{metadata: %{some_other_key: :foo}}, + %{metadata: %{sentry_test_owner: test_pid}}, + %{metadata: %{}} + ] + + :telemetry.execute( + [:broadway, :processor, :start], + %{}, + %{messages: messages} + ) + + Sentry.capture_message("mixed batch", result: :sync) + send(test_pid, worker_done) + end) + + ref = Process.monitor(worker) + assert_receive ^worker_done, 5_000 + assert_receive {:DOWN, ^ref, :process, ^worker, _}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "mixed batch"}}] = + SentryTest.pop_sentry_reports() + end + end + describe "before_send wrapping" do test "wraps existing before_send callback" do test_pid = self() diff --git a/test_integrations/phoenix_app/mix.exs b/test_integrations/phoenix_app/mix.exs index 8eccda4a..1f9e77fe 100644 --- a/test_integrations/phoenix_app/mix.exs +++ b/test_integrations/phoenix_app/mix.exs @@ -75,7 +75,8 @@ defmodule PhoenixApp.MixProject do {:opentelemetry_ecto, "~> 1.2"}, {:opentelemetry_logger_metadata, "~> 0.2.0"}, {:hackney, "~> 1.18"}, - {:oban, "~> 2.10"} + {:oban, "~> 2.10"}, + {:broadway, "~> 1.0", only: [:test]} ] end diff --git a/test_integrations/phoenix_app/mix.lock b/test_integrations/phoenix_app/mix.lock index b44b46cc..8259a7d1 100644 --- a/test_integrations/phoenix_app/mix.lock +++ b/test_integrations/phoenix_app/mix.lock @@ -1,6 +1,7 @@ %{ "acceptor_pool": {:hex, :acceptor_pool, "1.0.1", "d88c2e8a0be9216cf513fbcd3e5a4beb36bee3ff4168e85d6152c6f899359cdb", [:rebar3], [], "hexpm", "f172f3d74513e8edd445c257d596fc84dbdd56d2c6fa287434269648ae5a421e"}, "bandit": {:hex, :bandit, "1.10.3", "1e5d168fa79ec8de2860d1b4d878d97d4fbbe2fdbe7b0a7d9315a4359d1d4bb9", [:mix], [{:hpax, "~> 1.0", [hex: :hpax, repo: "hexpm", optional: false]}, {:plug, "~> 1.18", [hex: :plug, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}, {:thousand_island, "~> 1.0", [hex: :thousand_island, repo: "hexpm", optional: false]}, {:websock, "~> 0.5", [hex: :websock, repo: "hexpm", optional: false]}], "hexpm", "99a52d909c48db65ca598e1962797659e3c0f1d06e825a50c3d75b74a5e2db18"}, + "broadway": {:hex, :broadway, "1.3.0", "f75f6376159b74f55c5ba2629dac613e4fd79d9e71148ab5fbac8fdd7c999d2a", [:mix], [{:gen_stage, "~> 1.0", [hex: :gen_stage, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.3.7 or ~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4.3 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "bef3b4c5512d0072917b70239cbecf8f76a2587465a5b7c3e2b9ae18b4bc405b"}, "bypass": {:hex, :bypass, "2.1.0", "909782781bf8e20ee86a9cabde36b259d44af8b9f38756173e8f5e2e1fabb9b1", [:mix], [{:plug, "~> 1.7", [hex: :plug, repo: "hexpm", optional: false]}, {:plug_cowboy, "~> 2.0", [hex: :plug_cowboy, repo: "hexpm", optional: false]}, {:ranch, "~> 1.3", [hex: :ranch, repo: "hexpm", optional: false]}], "hexpm", "d9b5df8fa5b7a6efa08384e9bbecfe4ce61c77d28a4282f79e02f1ef78d96b80"}, "castore": {:hex, :castore, "1.0.17", "4f9770d2d45fbd91dcf6bd404cf64e7e58fed04fadda0923dc32acca0badffa2", [:mix], [], "hexpm", "12d24b9d80b910dd3953e165636d68f147a31db945d2dcb9365e441f8b5351e5"}, "cc_precompiler": {:hex, :cc_precompiler, "0.1.11", "8c844d0b9fb98a3edea067f94f616b3f6b29b959b6b3bf25fee94ffe34364768", [:mix], [{:elixir_make, "~> 0.7", [hex: :elixir_make, repo: "hexpm", optional: false]}], "hexpm", "3427232caf0835f94680e5bcf082408a70b48ad68a5f5c0b02a3bea9f3a075b9"}, @@ -24,6 +25,7 @@ "finch": {:hex, :finch, "0.21.0", "b1c3b2d48af02d0c66d2a9ebfb5622be5c5ecd62937cf79a88a7f98d48a8290c", [:mix], [{:mime, "~> 1.0 or ~> 2.0", [hex: :mime, repo: "hexpm", optional: false]}, {:mint, "~> 1.6.2 or ~> 1.7", [hex: :mint, repo: "hexpm", optional: false]}, {:nimble_options, "~> 0.4 or ~> 1.0", [hex: :nimble_options, repo: "hexpm", optional: false]}, {:nimble_pool, "~> 1.1", [hex: :nimble_pool, repo: "hexpm", optional: false]}, {:telemetry, "~> 0.4 or ~> 1.0", [hex: :telemetry, repo: "hexpm", optional: false]}], "hexpm", "87dc6e169794cb2570f75841a19da99cfde834249568f2a5b121b809588a4377"}, "fine": {:hex, :fine, "0.1.4", "b19a89c1476c7c57afb5f9314aed5960b5bc95d5277de4cb5ee8e1d1616ce379", [:mix], [], "hexpm", "be3324cc454a42d80951cf6023b9954e9ff27c6daa255483b3e8d608670303f5"}, "floki": {:hex, :floki, "0.38.0", "62b642386fa3f2f90713f6e231da0fa3256e41ef1089f83b6ceac7a3fd3abf33", [:mix], [], "hexpm", "a5943ee91e93fb2d635b612caf5508e36d37548e84928463ef9dd986f0d1abd9"}, + "gen_stage": {:hex, :gen_stage, "1.3.2", "7c77e5d1e97de2c6c2f78f306f463bca64bf2f4c3cdd606affc0100b89743b7b", [:mix], [], "hexpm", "0ffae547fa777b3ed889a6b9e1e64566217413d018cabd825f786e843ffe63e7"}, "gettext": {:hex, :gettext, "0.26.2", "5978aa7b21fada6deabf1f6341ddba50bc69c999e812211903b169799208f2a8", [:mix], [{:expo, "~> 0.5.1 or ~> 1.0", [hex: :expo, repo: "hexpm", optional: false]}], "hexpm", "aa978504bcf76511efdc22d580ba08e2279caab1066b76bb9aa81c4a1e0a32a5"}, "gproc": {:hex, :gproc, "0.9.1", "f1df0364423539cf0b80e8201c8b1839e229e5f9b3ccb944c5834626998f5b8c", [:rebar3], [], "hexpm", "905088e32e72127ed9466f0bac0d8e65704ca5e73ee5a62cb073c3117916d507"}, "grpcbox": {:hex, :grpcbox, "0.17.1", "6e040ab3ef16fe699ffb513b0ef8e2e896da7b18931a1ef817143037c454bcce", [:rebar3], [{:acceptor_pool, "~> 1.0.0", [hex: :acceptor_pool, repo: "hexpm", optional: false]}, {:chatterbox, "~> 0.15.1", [hex: :ts_chatterbox, repo: "hexpm", optional: false]}, {:ctx, "~> 0.6.0", [hex: :ctx, repo: "hexpm", optional: false]}, {:gproc, "~> 0.9.1", [hex: :gproc, repo: "hexpm", optional: false]}], "hexpm", "4a3b5d7111daabc569dc9cbd9b202a3237d81c80bf97212fbc676832cb0ceb17"}, diff --git a/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs b/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs new file mode 100644 index 00000000..b674b034 --- /dev/null +++ b/test_integrations/phoenix_app/test/phoenix_app/broadway_test.exs @@ -0,0 +1,53 @@ +defmodule PhoenixApp.BroadwayTest do + # async: true — the auto-allowance design routes per-message via the + # :sentry_test_owner metadata, which Broadway propagates onto the + # %Broadway.Message{} struct. Two of these tests racing against each + # other on the same shared pipeline still produce the right per-test + # results because each message carries its origin test pid. + use ExUnit.Case, async: true + + describe "setup_sentry/1 with allowance: [Broadway]" do + setup do + Sentry.Test.setup_sentry(allowance: [Broadway]) + start_supervised!(PhoenixApp.TestBroadway) + :ok + end + + test "events from a Broadway processor are captured when tagged via metadata" do + ref = + Broadway.test_message(PhoenixApp.TestBroadway, :capture, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + + assert [%Sentry.Event{message: %{formatted: "from broadway"}}] = + Sentry.Test.pop_sentry_reports() + end + + test "raw Broadway.test_message without :sentry_test_owner is not auto-allowed" do + ref = Broadway.test_message(PhoenixApp.TestBroadway, :capture) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + assert [] == Sentry.Test.pop_sentry_reports() + end + end + + describe "without allowance" do + setup do + Sentry.Test.setup_sentry() + start_supervised!(PhoenixApp.TestBroadway) + :ok + end + + test "tagged messages are still dropped without allowance: [Broadway]" do + ref = + Broadway.test_message(PhoenixApp.TestBroadway, :capture, + metadata: %{sentry_test_owner: self()} + ) + + assert_receive {:ack, ^ref, [_succeeded], []}, 5_000 + assert [] == Sentry.Test.pop_sentry_reports() + end + end +end diff --git a/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs b/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs index 92e032ca..423af688 100644 --- a/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs +++ b/test_integrations/phoenix_app/test/phoenix_app/oban_test.exs @@ -419,6 +419,133 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do end end + describe "setup_sentry/1 with allowance: [Oban]" do + setup do + Sentry.Test.setup_sentry(allowance: [Oban]) + Sentry.Integrations.Oban.ErrorReporter.attach() + on_exit(fn -> :telemetry.detach(Sentry.Integrations.Oban.ErrorReporter) end) + end + + test "captures events from a real Oban worker with no manual telemetry plumbing" do + job = %Oban.Job{ + id: System.unique_integer([:positive]), + args: %{"should_fail" => true}, + worker: "Sentry.Integrations.Phoenix.ObanTest.FailingWorker", + queue: "background", + attempt: 1, + max_attempts: 1, + meta: %{}, + inserted_at: DateTime.utc_now(), + scheduled_at: DateTime.utc_now(), + attempted_at: DateTime.utc_now() + } + + # Phase 1: simulate the insert-time engine event in the test pid. + # The allowance handler tags this job's id with self() so the + # detached worker can be routed back here on :start. + :telemetry.execute( + [:oban, :engine, :insert_job, :stop], + %{}, + %{job: job, conf: %{name: Oban}} + ) + + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + # Phase 2: run the worker in a detached process — no `$callers` + # link to this test, so the only path that can route the worker's + # captured events back is the tag established above. + run_job_in_detached_process(job) + + # The FailingWorker raises; the Oban error reporter turns it into + # an event, and the allowance routing delivers it to this test. + assert [%Sentry.Event{} = event] = Sentry.Test.pop_sentry_reports() + + assert [%Sentry.Interfaces.Exception{} = exception] = event.exception + assert exception.value == "intentional failure for testing" + end + + test "captures Sentry.Metrics.count/3 from a real Oban worker" do + job = %Oban.Job{ + id: System.unique_integer([:positive]), + args: %{}, + worker: "Sentry.Integrations.Phoenix.ObanTest.FailingWorker", + queue: "background", + attempt: 1, + max_attempts: 1, + meta: %{}, + inserted_at: DateTime.utc_now(), + scheduled_at: DateTime.utc_now(), + attempted_at: DateTime.utc_now() + } + + :telemetry.execute( + [:oban, :engine, :insert_job, :stop], + %{}, + %{job: job, conf: %{name: Oban}} + ) + + assert Sentry.Test.Registry.lookup_oban_job(job.id) == self() + + emit_metric_in_detached_process(job, fn -> + Sentry.Metrics.count("oban.allowance.metric.test", 1) + end) + + Sentry.TelemetryProcessor.flush() + Sentry.TelemetryProcessor.flush(Sentry.TelemetryProcessor) + + metrics = Sentry.Test.pop_sentry_metrics() + + assert Enum.any?(metrics, &(&1.name == "oban.allowance.metric.test")), + "expected metric emitted from the Oban worker process to land in the " <> + "test collector, got: #{inspect(metrics)}" + end + end + + defp run_job_in_detached_process(job) do + parent = self() + ref = make_ref() + + spawn(fn -> + start_metadata = %{job: job, conf: %{name: Oban}} + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) + + {kind, reason, stacktrace} = + try do + FailingWorker.perform(job) + {:ok, nil, []} + catch + kind, reason -> {kind, reason, __STACKTRACE__} + end + + :telemetry.execute( + [:oban, :job, :exception], + %{duration: 0}, + Map.merge(start_metadata, %{ + kind: kind, + reason: reason, + error: reason, + stacktrace: stacktrace, + state: :failure + }) + ) + + send(parent, {ref, :done}) + end) + + receive do + {^ref, :done} -> :ok + after + 5_000 -> flunk("worker process did not finish in time") + end + + Process.sleep(50) + end + defp run_failing_worker_in_detached_process(opts) do parent = self() ref = make_ref() @@ -440,7 +567,12 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do } start_metadata = %{job: job, conf: %{name: Oban}} - :telemetry.execute([:oban, :job, :start], %{system_time: System.system_time()}, start_metadata) + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) {kind, reason, stacktrace} = try do @@ -476,4 +608,37 @@ defmodule Sentry.Integrations.Phoenix.ObanTest do Process.sleep(50) end + + defp emit_metric_in_detached_process(job, fun) do + parent = self() + ref = make_ref() + + spawn(fn -> + start_metadata = %{job: job, conf: %{name: Oban}} + + :telemetry.execute( + [:oban, :job, :start], + %{system_time: System.system_time()}, + start_metadata + ) + + fun.() + + :telemetry.execute( + [:oban, :job, :stop], + %{duration: 0}, + Map.merge(start_metadata, %{state: :success, result: :ok}) + ) + + send(parent, {ref, :done}) + end) + + receive do + {^ref, :done} -> :ok + after + 5_000 -> flunk("worker process did not finish in time") + end + + Process.sleep(50) + end end diff --git a/test_integrations/phoenix_app/test/support/test_broadway.ex b/test_integrations/phoenix_app/test/support/test_broadway.ex new file mode 100644 index 00000000..211d7bf9 --- /dev/null +++ b/test_integrations/phoenix_app/test/support/test_broadway.ex @@ -0,0 +1,29 @@ +defmodule PhoenixApp.TestBroadway do + @moduledoc false + + use Broadway + + def start_link(_opts \\ []) do + Broadway.start_link(__MODULE__, + name: __MODULE__, + producer: [module: {Broadway.DummyProducer, []}], + processors: [default: [concurrency: 1]] + ) + end + + @impl true + def handle_message(_processor, %Broadway.Message{data: data} = message, _context) do + case data do + :capture -> + Sentry.capture_message("from broadway", result: :sync) + + :raise -> + raise "broadway boom" + + _ -> + :ok + end + + message + end +end