feat: drain-on-Stop and NewTicker primitive#4
Open
klaidliadon wants to merge 17 commits intomasterfrom
Open
Conversation
- Stop now waits for runFunc to return on its own when WithDrain is set, closing the channel returned by Stopping(ctx) instead of cancelling. - Falls back to ctx cancel + wait once the drain timeout elapses, so stuck runFuncs still surface as DeadlineExceeded. - No-op for existing callers — Stop semantics are unchanged when WithDrain is not used. - Strengthen the existing stop-timeout test to assert ctx-cancel behavior when WithDrain is absent.
- NewTicker(interval, tick, opts...) wraps the standard time.NewTicker + select-loop pattern that periodic workers reimplement. - Composes with WithDrain (in-flight tick finishes; loop exits without firing a new tick) and WithRecoverer (panics in tick are caught). - Skip queued ticks accumulated while a slow tick was running once Stop has been called, so shutdown isn't delayed by stale t.C buffer. - Add examples/ticker-with-drain showing the full SIGTERM-safe shape (ticker + drain + recoverer + signal.NotifyContext).
- examples/main.go: drop unreachable return after infinite loop; capture and defer the cancel from context.WithTimeout to fix the ctx leak. - runnable_test.go, runnable_group_test.go: replace single-case selects with plain channel receives (S1000). - with_recoverer_test.go: drop unreachable returns after panic.
- with_recoverer_test.go: drop forced string type assertion in Report; use %v formatting so non-string panic values don't crash. - examples/ticker-with-drain: silence errcheck on os.Stderr.Write.
Concurrent Stop callers raced to close the same stoppingChan, causing "close of closed channel" panic on the second close. Pre-PR Stop was safe because context.CancelFunc tolerates repeat calls; WithDrain silently regressed that contract — and K8s does occasionally fire SIGTERM twice on shutdown. Claim ownership under the mutex: the first Stop copies and nils stoppingChan, drives the drain. Subsequent Stops see nil, skip the close, and fall through to the existing runCancel + <-runStop wait (which is already idempotent). All callers observe the same outcome: runFunc returns, then Stop returns. Adds TestWithDrain/Stop_is_concurrency_safe — 10 concurrent Stops, no panic, every caller returns nil or ErrNotRunning.
Stop's drain fall-through path was indistinguishable from a clean drain — both returned nil. Callers had no way to detect runFuncs that ignored Stopping(ctx) and only exited via ctx.Done(). Track the fall-through and return ErrDrainTimedOut once runFunc finally exits. ctx-cancel paths still return ctx.Err() unchanged. The runnable is fully stopped either way; the sentinel is observability.
- Stopping(ctx) godoc and README example: callers must select on both ctx.Done() and Stopping(ctx). A loop that observes only Stopping hangs on outer-ctx cancellation. README example now shows the full three-case select. - README also reflects ErrDrainTimedOut on the fall-through path. - NewTicker godoc: composing with WithRetry resets the ticker cadence on every retry (tick error bails the loop, WithRetry re-enters runFunc, fresh ticker). Document the behavior; users who need stable cadence should retry inside the tick handler.
context.WithTimeout(ctx, drainTimeout) derived the drain ctx from the caller's ctx, so a Stop deadline shorter than drainTimeout made <-ctx.Done() and the drain expiry race in the same select. The <-ctx.Done() branch returned ctx.Err() *without calling r.runCancel()* — leaving the runnable alive after Stop returned. The other branch could misreport ErrDrainTimedOut when the caller's deadline was the real cause. Switch to time.NewTimer(drainTimeout) and let <-ctx.Done() in the drain select fall through to r.runCancel() so the runnable always gets force-cancelled before Stop returns. drainTimedOut is only set when the standalone timer fires, so the sentinel is no longer spuriously returned on caller-ctx cancellation. Adds TestWithDrain/Stop_forces_cancel_when_caller_ctx_expires_during_drain which exercises a 100ms caller ctx against a 10s drain.
The example passed signal.NotifyContext's ctx to rc.Run. On SIGTERM that ctx cancels immediately — the ticker's runFunc observes <-ctx.Done() and exits before Stop ever closes Stopping(ctx), completely defeating WithDrain. Pass context.Background() to Run; use sigCtx only to detect when to call Stop. Stop's drain budget is now the sole driver of shutdown for the drain-enabled worker.
The previous concurrency-safe fix avoided the double-close panic but introduced two regressions Codex caught on re-review: 1. Secondary Stop callers fell through to r.runCancel() unconditionally, hard-cancelling the runCtx mid-drain. The drain that the primary caller was honoring got bypassed — exactly the failure mode WithDrain exists to prevent. 2. After fixing (1) by making secondary callers wait on runStop, a later Stop with a shorter deadline could no longer enforce it: it would return DeadlineExceeded but never escalate, so the runnable kept draining until the primary caller's drainTimeout expired. Restructure the secondary path: wait on runStop, but escalate to r.runCancel() if the caller's ctx expires first. The shortest deadline among concurrent callers wins, and the drain is only bypassed when some caller actively asks for it via deadline expiry. Adds two tests: - "concurrent Stop preserves drain semantics" — strengthened to assert runFunc observes Stopping(ctx), not ctx.Done() - "secondary Stop with shorter deadline escalates runCancel" — new regression for Codex's escalation finding
main was reading runErr only inside the SIGTERM branch, so if the worker exited early (tick error, recovered panic) main would block on sigCtx forever. Select on either sigCtx or runErr; an early worker exit now propagates the failure without waiting for a signal.
Stop captured runStop and stoppingChan under the mutex but read r.runCancel via the field after waiting in a select. With drain enabled, Stop's drainTimer / ctx.Done() branches fall through to runCancel after a wait. If the runnable exited and Run was called again before that wait returned, r.runCancel pointed at the *new* run's cancel — and Stop tore down the freshly started worker. Snapshot runCancel alongside the other fields so the Stop call operates on the runnable it observed under the lock, regardless of what the next Run does to the field. Adds "late runCancel does not tear down a subsequent Run" — runs Stop followed by an immediate fresh Run and asserts the second run isn't prematurely cancelled. Also adds "WithRetry stops retrying after Stopping fires" as part of the same drain-correctness sweep — see follow-up commit.
WithRetry had no awareness of WithDrain. After Stop was called and Stopping(ctx) closed, a transient error from the in-flight attempt would still trigger a retry — re-entering runFunc and starting fresh work mid-shutdown, defeating drain semantics. Add a non-blocking Stopping(ctx) check between attempts. When drain is not configured, Stopping returns nil and the default branch runs unchanged.
The previous "late runCancel" test created two separate *runnable instances for round 1 and round 2. They share no state, so a stale runCancel from round 1's Stop could not affect round 2 — the test passed even with the snapshot fix reverted. Replace with a single-runnable test: Run, then a primary Stop + secondary Stop with already-cancelled ctx (exercising the runCancel escalation path), then re-Run on the *same* runnable. Round 2 must run undisturbed until explicitly Stopped. The snapshot fix prevents round 1's secondary runCancel from cancelling round 2's runCtx via the field overwrite, so this is the closest deterministic regression shape available without runtime hooks.
A reviewer empirically reverted the runCancel-snapshot fix and ran the test 20×; all 20 passed. The test gates round 2 strictly after both round-1 Stops have returned, so by the time round 2 starts the field-overwrite vs. snapshot distinction is no longer observable. Rename to "runnable can be re-Run after a concurrent-Stop lifecycle" and document that it's a lifecycle smoke test, not a snapshot-fix regression. Deterministic coverage of that race needs testing/synctest (Go 1.25+) or a runtime hook — both out of scope. The snapshot fix is verified by inspection.
The lastTime field on withRetry persisted across Run cycles. After a Stop and re-Run, the first iteration of the new cycle compared against the prior cycle's last attempt — usually far in the past — which always triggered the i=0 reset. The reset was a no-op (i was already 0), so behavior didn't visibly change, but the field carried state across what callers reasonably treat as independent invocations. Move lastTime to a function-local variable inside the closure so each Run cycle gets a fresh timer. Field on the struct is removed. Adds "retry budget is per-Run-cycle" — runs the same WithRetry runnable twice and asserts both cycles exhaust the same retry budget.
Reviewer reverted the lastTime-scoping fix and ran the test against the buggy code: it passed. The bug is purely code-hygiene (state on a struct that shouldn't have it) — observable behavior is unchanged because the i=0 reset is a no-op when i is already 0. There's nothing to assert. The commit message on the fix accurately describes what changed; a test that lies about its coverage is worse than no test.
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
Summary
Adds two opt-in primitives for shutdown-safe periodic workers, motivated by the OMSX Frontegg reconciler (and a queue of similar workers across Polygon services).
WithDrain(timeout)switchesStopfrom "cancel runFunc's ctx" to "closeStopping(ctx)and wait up to timeout for runFunc to return on its own." Lets workers drain in-flight external calls (e.g. an HTTP request that creates remote state) instead of aborting them mid-request, which today causes orphan-state bugs at SIGTERM. Falls back to ctx-cancel + wait once the drain timeout elapses, so stuck runFuncs still surface asDeadlineExceeded.NewTicker(interval, tick, opts...)wraps thetime.NewTicker+ select-loop pattern that periodic workers reimplement (~25 LoC × N workers). Composes withWithDrain(in-flight tick finishes; loop exits without firing a new tick) andWithRecoverer(panics in tick are caught).Both are additive —
Stopsemantics for existing callers (8+ sequence repos,0xPolygon/bpn-api) are unchanged when neither option is used.The third commit clears pre-existing
go vetandstaticcheckwarnings (unreachable returns, a context-leak in the example, single-case selects) so the repo is clean from CI's perspective.Test plan
go test -race ./...passesgo test -race -count=5 -run 'TestNewTicker|TestWithDrain' ./...stable across 5 runsgo vet ./...cleanwith_drain_test.goandticker_test.gorunnable, stop timeouttest strengthened to assert ctx-cancel under no-drainexamples/ticker-with-drain/main.gobuilds (full SIGTERM-safe shape)Notes for review
tmp/drain-and-ticker/2026-05-01-plan.md, both required to honor the plan's prose contract:ticker.goadds an inner non-blocking re-check of<-stopping/<-ctx.Done()after<-t.Cfires. Without it, when a tick takes longer than the interval, queued ticks ont.Crace against<-stoppingin Go's random select and fire extra ticks after Stop. The plan said "loop exits without firing a new tick" — this enforces it.Stop's drain branch explicitly cancels the drain context on every exit path (nodefer drainCancel()) to avoidvet's lostcancel warning.gh searchacross dependents) intentionally skipped — additive API, no existing callers depend on the new exported names.