Skip to content

feat: drain-on-Stop and NewTicker primitive#4

Open
klaidliadon wants to merge 17 commits intomasterfrom
feat/drain-and-ticker
Open

feat: drain-on-Stop and NewTicker primitive#4
klaidliadon wants to merge 17 commits intomasterfrom
feat/drain-and-ticker

Conversation

@klaidliadon
Copy link
Copy Markdown

Summary

Adds two opt-in primitives for shutdown-safe periodic workers, motivated by the OMSX Frontegg reconciler (and a queue of similar workers across Polygon services).

  • WithDrain(timeout) switches Stop from "cancel runFunc's ctx" to "close Stopping(ctx) and wait up to timeout for runFunc to return on its own." Lets workers drain in-flight external calls (e.g. an HTTP request that creates remote state) instead of aborting them mid-request, which today causes orphan-state bugs at SIGTERM. Falls back to ctx-cancel + wait once the drain timeout elapses, so stuck runFuncs still surface as DeadlineExceeded.
  • NewTicker(interval, tick, opts...) wraps the time.NewTicker + select-loop pattern that periodic workers reimplement (~25 LoC × N workers). Composes with WithDrain (in-flight tick finishes; loop exits without firing a new tick) and WithRecoverer (panics in tick are caught).

Both are additive — Stop semantics for existing callers (8+ sequence repos, 0xPolygon/bpn-api) are unchanged when neither option is used.

The third commit clears pre-existing go vet and staticcheck warnings (unreachable returns, a context-leak in the example, single-case selects) so the repo is clean from CI's perspective.

Test plan

  • go test -race ./... passes
  • go test -race -count=5 -run 'TestNewTicker|TestWithDrain' ./... stable across 5 runs
  • go vet ./... clean
  • 11 new test scenarios across with_drain_test.go and ticker_test.go
  • Existing runnable, stop timeout test strengthened to assert ctx-cancel under no-drain
  • examples/ticker-with-drain/main.go builds (full SIGTERM-safe shape)

Notes for review

  • Two small deviations from the original plan in tmp/drain-and-ticker/2026-05-01-plan.md, both required to honor the plan's prose contract:
    1. ticker.go adds an inner non-blocking re-check of <-stopping/<-ctx.Done() after <-t.C fires. Without it, when a tick takes longer than the interval, queued ticks on t.C race against <-stopping in Go's random select and fire extra ticks after Stop. The plan said "loop exits without firing a new tick" — this enforces it.
    2. Stop's drain branch explicitly cancels the drain context on every exit path (no defer drainCancel()) to avoid vet's lostcancel warning.
  • Pre-flight check 2 from the plan (gh search across dependents) intentionally skipped — additive API, no existing callers depend on the new exported names.

klaidliadon added 17 commits May 1, 2026 23:27
- Stop now waits for runFunc to return on its own when WithDrain is set,
  closing the channel returned by Stopping(ctx) instead of cancelling.
- Falls back to ctx cancel + wait once the drain timeout elapses, so
  stuck runFuncs still surface as DeadlineExceeded.
- No-op for existing callers — Stop semantics are unchanged when
  WithDrain is not used.
- Strengthen the existing stop-timeout test to assert ctx-cancel
  behavior when WithDrain is absent.
- NewTicker(interval, tick, opts...) wraps the standard
  time.NewTicker + select-loop pattern that periodic workers reimplement.
- Composes with WithDrain (in-flight tick finishes; loop exits without
  firing a new tick) and WithRecoverer (panics in tick are caught).
- Skip queued ticks accumulated while a slow tick was running once Stop
  has been called, so shutdown isn't delayed by stale t.C buffer.
- Add examples/ticker-with-drain showing the full SIGTERM-safe shape
  (ticker + drain + recoverer + signal.NotifyContext).
- examples/main.go: drop unreachable return after infinite loop;
  capture and defer the cancel from context.WithTimeout to fix the
  ctx leak.
- runnable_test.go, runnable_group_test.go: replace single-case
  selects with plain channel receives (S1000).
- with_recoverer_test.go: drop unreachable returns after panic.
- with_recoverer_test.go: drop forced string type assertion in
  Report; use %v formatting so non-string panic values don't crash.
- examples/ticker-with-drain: silence errcheck on os.Stderr.Write.
Concurrent Stop callers raced to close the same stoppingChan, causing
"close of closed channel" panic on the second close. Pre-PR Stop was
safe because context.CancelFunc tolerates repeat calls; WithDrain
silently regressed that contract — and K8s does occasionally fire
SIGTERM twice on shutdown.

Claim ownership under the mutex: the first Stop copies and nils
stoppingChan, drives the drain. Subsequent Stops see nil, skip the
close, and fall through to the existing runCancel + <-runStop wait
(which is already idempotent). All callers observe the same outcome:
runFunc returns, then Stop returns.

Adds TestWithDrain/Stop_is_concurrency_safe — 10 concurrent Stops,
no panic, every caller returns nil or ErrNotRunning.
Stop's drain fall-through path was indistinguishable from a clean
drain — both returned nil. Callers had no way to detect runFuncs that
ignored Stopping(ctx) and only exited via ctx.Done().

Track the fall-through and return ErrDrainTimedOut once runFunc
finally exits. ctx-cancel paths still return ctx.Err() unchanged.
The runnable is fully stopped either way; the sentinel is observability.
- Stopping(ctx) godoc and README example: callers must select on
  both ctx.Done() and Stopping(ctx). A loop that observes only
  Stopping hangs on outer-ctx cancellation. README example now
  shows the full three-case select.
- README also reflects ErrDrainTimedOut on the fall-through path.
- NewTicker godoc: composing with WithRetry resets the ticker
  cadence on every retry (tick error bails the loop, WithRetry
  re-enters runFunc, fresh ticker). Document the behavior; users
  who need stable cadence should retry inside the tick handler.
context.WithTimeout(ctx, drainTimeout) derived the drain ctx from the
caller's ctx, so a Stop deadline shorter than drainTimeout made
<-ctx.Done() and the drain expiry race in the same select. The
<-ctx.Done() branch returned ctx.Err() *without calling r.runCancel()*
— leaving the runnable alive after Stop returned. The other branch
could misreport ErrDrainTimedOut when the caller's deadline was the
real cause.

Switch to time.NewTimer(drainTimeout) and let <-ctx.Done() in the
drain select fall through to r.runCancel() so the runnable always
gets force-cancelled before Stop returns. drainTimedOut is only set
when the standalone timer fires, so the sentinel is no longer
spuriously returned on caller-ctx cancellation.

Adds TestWithDrain/Stop_forces_cancel_when_caller_ctx_expires_during_drain
which exercises a 100ms caller ctx against a 10s drain.
The example passed signal.NotifyContext's ctx to rc.Run. On SIGTERM
that ctx cancels immediately — the ticker's runFunc observes
<-ctx.Done() and exits before Stop ever closes Stopping(ctx),
completely defeating WithDrain.

Pass context.Background() to Run; use sigCtx only to detect when to
call Stop. Stop's drain budget is now the sole driver of shutdown for
the drain-enabled worker.
The previous concurrency-safe fix avoided the double-close panic but
introduced two regressions Codex caught on re-review:

1. Secondary Stop callers fell through to r.runCancel() unconditionally,
   hard-cancelling the runCtx mid-drain. The drain that the primary
   caller was honoring got bypassed — exactly the failure mode WithDrain
   exists to prevent.
2. After fixing (1) by making secondary callers wait on runStop, a
   later Stop with a shorter deadline could no longer enforce it: it
   would return DeadlineExceeded but never escalate, so the runnable
   kept draining until the primary caller's drainTimeout expired.

Restructure the secondary path: wait on runStop, but escalate to
r.runCancel() if the caller's ctx expires first. The shortest deadline
among concurrent callers wins, and the drain is only bypassed when
some caller actively asks for it via deadline expiry.

Adds two tests:
- "concurrent Stop preserves drain semantics" — strengthened to assert
  runFunc observes Stopping(ctx), not ctx.Done()
- "secondary Stop with shorter deadline escalates runCancel" — new
  regression for Codex's escalation finding
main was reading runErr only inside the SIGTERM branch, so if the
worker exited early (tick error, recovered panic) main would block
on sigCtx forever. Select on either sigCtx or runErr; an early
worker exit now propagates the failure without waiting for a signal.
Stop captured runStop and stoppingChan under the mutex but read
r.runCancel via the field after waiting in a select. With drain
enabled, Stop's drainTimer / ctx.Done() branches fall through to
runCancel after a wait. If the runnable exited and Run was called
again before that wait returned, r.runCancel pointed at the *new*
run's cancel — and Stop tore down the freshly started worker.

Snapshot runCancel alongside the other fields so the Stop call
operates on the runnable it observed under the lock, regardless of
what the next Run does to the field. Adds
"late runCancel does not tear down a subsequent Run" — runs Stop
followed by an immediate fresh Run and asserts the second run isn't
prematurely cancelled.

Also adds "WithRetry stops retrying after Stopping fires" as part of
the same drain-correctness sweep — see follow-up commit.
WithRetry had no awareness of WithDrain. After Stop was called and
Stopping(ctx) closed, a transient error from the in-flight attempt
would still trigger a retry — re-entering runFunc and starting fresh
work mid-shutdown, defeating drain semantics.

Add a non-blocking Stopping(ctx) check between attempts. When drain
is not configured, Stopping returns nil and the default branch runs
unchanged.
The previous "late runCancel" test created two separate *runnable
instances for round 1 and round 2. They share no state, so a stale
runCancel from round 1's Stop could not affect round 2 — the test
passed even with the snapshot fix reverted.

Replace with a single-runnable test: Run, then a primary Stop +
secondary Stop with already-cancelled ctx (exercising the runCancel
escalation path), then re-Run on the *same* runnable. Round 2 must
run undisturbed until explicitly Stopped. The snapshot fix prevents
round 1's secondary runCancel from cancelling round 2's runCtx via
the field overwrite, so this is the closest deterministic regression
shape available without runtime hooks.
A reviewer empirically reverted the runCancel-snapshot fix and ran
the test 20×; all 20 passed. The test gates round 2 strictly after
both round-1 Stops have returned, so by the time round 2 starts the
field-overwrite vs. snapshot distinction is no longer observable.

Rename to "runnable can be re-Run after a concurrent-Stop lifecycle"
and document that it's a lifecycle smoke test, not a snapshot-fix
regression. Deterministic coverage of that race needs testing/synctest
(Go 1.25+) or a runtime hook — both out of scope. The snapshot fix is
verified by inspection.
The lastTime field on withRetry persisted across Run cycles. After a
Stop and re-Run, the first iteration of the new cycle compared
against the prior cycle's last attempt — usually far in the past —
which always triggered the i=0 reset. The reset was a no-op (i was
already 0), so behavior didn't visibly change, but the field carried
state across what callers reasonably treat as independent invocations.

Move lastTime to a function-local variable inside the closure so
each Run cycle gets a fresh timer. Field on the struct is removed.

Adds "retry budget is per-Run-cycle" — runs the same WithRetry
runnable twice and asserts both cycles exhaust the same retry budget.
Reviewer reverted the lastTime-scoping fix and ran the test against
the buggy code: it passed. The bug is purely code-hygiene (state on
a struct that shouldn't have it) — observable behavior is unchanged
because the i=0 reset is a no-op when i is already 0. There's nothing
to assert. The commit message on the fix accurately describes what
changed; a test that lies about its coverage is worse than no test.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant