perf(stream): project SSE frames once per role, not per subscriber#353
perf(stream): project SSE frames once per role, not per subscriber#353EricAndrechek wants to merge 5 commits into
Conversation
Move the broadcast hub into internal/stream as `Hub`: subscribers register under (topic, role), and Broadcast decodes each event once, applies each subscribed role's column policy once, builds one SSE frame per role, and fans it to every member of that role's Bucket. Previously every connection re-ran unmarshal -> Evaluate -> filter -> marshal (plus a second unmarshal for the id: timestamp) on the same event, so the work scaled with subscribers, not distinct output shapes — the ~2270 deliveries/s ceiling from #294. The (topic, role) key is claims-independent: column visibility derives only from the role+table policy entry, and the stream path applies no row-level filter (documented invariant). The handler's two select cases collapse into one byte-pump over a single Subscriber.Frames() queue of typed Frames; the queue grows from cap 1 to 64 so live events buffer while the handler is mid-write. Gap-fill replay and Last-Event-ID/?since= resumption are unchanged (replay stays per-connection via the shared stream.ReplayFrame; live frames carry the same id: <received_timestamp>). Slow-consumer drops now increment wavehouse_sse_dropped_frames_total; an inert Subscriber.Evicted() seam is wired for the eviction follow-up. The per-delivery OTel span (another #294 item) was already removed in #346. Also drops the now-orphaned, test-only internal/api/transform.go (transformForClient had no non-test caller). First PR of the #294 epic. Deferred to follow-ups: active slow-consumer eviction (#94) and right-sizing the subscriber buffer + lock cost (#152). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
|
Warning Review limit reached
Next review available in: 23 minutes Enable usage-based reviews in Billing to review now. Otherwise, wait until the next included review is available. How can I continue?After more reviews become available, a review can be triggered using the To avoid repeated limits, reduce automatic review volume by pausing incremental auto-reviews earlier, using label-based review opt-in, excluding WIP or generated PR titles, or requesting reviews manually when the PR is ready. If your team needs uninterrupted high-volume reviews, an organization admin can enable usage-based reviews. How do review limits work?CodeRabbit enforces per-developer PR review limits for each organization. Most developers receive the normal plan review availability. For paid Pro and Pro+ PR reviews, CodeRabbit uses adaptive limits for sustained high-volume activity. When a developer's recent PR review activity reaches the 95th percentile or higher among CodeRabbit users, additional reviews become available more gradually as earlier reviews age out of the rolling window. Please refer docs for additional details. Review details⚙️ Run configurationConfiguration used: Organization UI Review profile: ASSERTIVE Plan: Pro Plus Run ID: 📒 Files selected for processing (1)
📝 WalkthroughWalkthroughThe SSE path now uses ChangesSSE Delivery-Path Refactor
Estimated code review effort: 4 (Complex) | ~60 minutes Possibly related issues
Possibly related PRs
Suggested reviewers: 🚥 Pre-merge checks | ✅ 4 | ❌ 1❌ Failed checks (1 warning)
✅ Passed checks (4 passed)
✨ Finishing Touches🧪 Generate unit tests (beta)
✨ Simplify code
Thanks for using CodeRabbit! It's free for OSS, and your support helps us grow. If you like it, consider giving us a shout-out. Comment |
|
📚 Docs preview is live → https://d1825c66-wavehouse-docs.wave-rf.workers.dev
|
There was a problem hiding this comment.
Actionable comments posted: 6
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: d4ab6da2-7dcf-4d42-b5f8-704077280b08
📒 Files selected for processing (25)
AGENTS.mdCHANGELOG.mdcmd/wavehouse/main.godocs/src/content/docs/architecture.mdinternal/api/errors_test.gointernal/api/hub.gointernal/api/hub_test.gointernal/api/router_test.gointernal/api/stream.gointernal/api/stream_test.gointernal/api/transform.gointernal/api/transform_test.gointernal/policy/policy.gointernal/stream/bucket.gointernal/stream/bucket_test.gointernal/stream/doc.gointernal/stream/filter_test.gointernal/stream/heartbeat.gointernal/stream/heartbeat_test.gointernal/stream/hub.gointernal/stream/hub_test.gointernal/stream/metrics.gointernal/stream/subscriber.gointernal/stream/subscriber_test.gotests/integration/setup_test.go
💤 Files with no reviewable changes (4)
- internal/api/transform.go
- internal/api/hub.go
- internal/api/transform_test.go
- internal/api/hub_test.go
📜 Review details
⏰ Context from checks skipped due to timeout. (2)
- GitHub Check: Docs preview
- GitHub Check: Lint
⚠️ CI failures not shown inline (2)
GitHub Actions: PR housekeeping / 0_PR housekeeping.txt: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
GitHub Actions: PR housekeeping / PR housekeeping: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
🧰 Additional context used
📓 Path-based instructions (7)
internal/stream/**
📄 CodeRabbit inference engine (AGENTS.md)
Structured-query and live-stream reads must share the same per-column decision function so column visibility cannot drift.
Files:
internal/stream/doc.gointernal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/stream/heartbeat.gointernal/stream/metrics.gointernal/stream/heartbeat_test.gointernal/stream/bucket.gointernal/stream/subscriber.gointernal/stream/bucket_test.gointernal/stream/hub.gointernal/stream/hub_test.go
**/*_test.go
📄 CodeRabbit inference engine (AGENTS.md)
Use table-driven tests with
t.Run(tt.name, ...), and add corresponding test cases for every new function.
Files:
internal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/api/errors_test.gotests/integration/setup_test.gointernal/stream/heartbeat_test.gointernal/stream/bucket_test.gointernal/api/router_test.gointernal/stream/hub_test.gointernal/api/stream_test.go
internal/policy/**
📄 CodeRabbit inference engine (AGENTS.md)
Hasura-style access control must fail closed:
policy.IsAdminis the single admin check, empty or absent roles match nothing,Validaterejects empty role keys,nilpolicy denies everyone, anddefault_role == admin_roleis dev-only and loudly warned.
Files:
internal/policy/policy.go
internal/api/**
📄 CodeRabbit inference engine (AGENTS.md)
Handler error responses must stay in sync with the API docs error tables.
Files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.gointernal/api/stream.go
CHANGELOG.md
📄 CodeRabbit inference engine (AGENTS.md)
Record any notable change under
[Unreleased]inCHANGELOG.md.
Files:
CHANGELOG.md
docs/src/content/docs/architecture.md
📄 CodeRabbit inference engine (AGENTS.md)
When changing core architecture or adding a package, update
docs/src/content/docs/architecture.mdandAGENTS.md.
Files:
docs/src/content/docs/architecture.md
docs/src/content/docs/**/*.md
📄 CodeRabbit inference engine (AGENTS.md)
Author Mermaid diagrams vertically by default, avoid wide side-by-side diagrams, and keep node labels short so diagrams remain legible in the docs column width.
Files:
docs/src/content/docs/architecture.md
🧠 Learnings (5)
📓 Common learnings
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Validate locally before every push by running `make ci` the documented way; do not use CI as the first feedback loop.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: On PR-branch pushes, run the required pre-push reviewers via `/prepush` until each applicable reviewer returns `ship_it`, and never hardcode the reviewer set.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Every code change must update its corresponding documentation and `CHANGELOG.md` in the same PR.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Address and resolve every review finding; do not silently drop findings, and either fix them or track them in an issue.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Agents must create draft PRs only, and PR titles must pass the Conventional Commits gate and stay within 72 characters.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Never force-push or rebase a PR branch; merge `origin/main` instead when syncing with upstream.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-06-26T22:13:01.491Z
Learning: Never hand-write markers or use `--no-verify`; use the documented gates and marker tooling instead.
📚 Learning: 2026-06-26T12:23:22.696Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 346
File: internal/stream/subscriber_test.go:9-28
Timestamp: 2026-06-26T12:23:22.696Z
Learning: In this Go repository, prefer table-driven tests (e.g., `[]struct{...}` with `t.Run(...)`) only for tests that cover multiple scenarios/inputs and can be cleanly enumerated. Do not artificially rewrite a clear single-scenario sequential behavioral-flow test into a table-driven form just to fit the pattern; if there’s only one meaningful scenario, keep the test as a straightforward linear flow (as in `TestSubscriber_SendDeliversThenDropsWhenFull`).
Applied to files:
internal/stream/filter_test.gointernal/stream/subscriber_test.gointernal/api/errors_test.gotests/integration/setup_test.gointernal/stream/heartbeat_test.gointernal/stream/bucket_test.gointernal/api/router_test.gointernal/stream/hub_test.gointernal/api/stream_test.go
📚 Learning: 2026-05-20T01:02:00.784Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 164
File: internal/api/router_test.go:289-350
Timestamp: 2026-05-20T01:02:00.784Z
Learning: In WaveHouse’s internal API tests (files matching internal/api/**/*_test.go), follow the existing separation-of-concerns convention for testing the RequireRole middleware: inject `ContextKeyRole` directly into the request `context.Context` instead of using `testutil.MakeJWT`/JWT-driven flows. Do not refactor role-gate tests to use JWT tokens—JWT parsing and token handling are covered separately in `middleware_test.go` (the dedicated JWT parsing tests), and mixing those concerns would expand the failure surface and reduce isolation.
Applied to files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.go
📚 Learning: 2026-05-23T01:23:59.268Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 174
File: internal/api/ingest_test.go:111-111
Timestamp: 2026-05-23T01:23:59.268Z
Learning: In WaveHouse Go tests in internal/api/**/*_test.go, use internal/testutil.AssertJSONErrorResponse(t, w) for HTTP error-path JSON assertions. Do not use (or reintroduce) package-local assertJSONErrorResponse helpers. AssertJSONErrorResponse verifies the response Content-Type is application/json, includes the X-Content-Type-Options: nosniff header, and that the JSON body contains an "error" field.
Applied to files:
internal/api/errors_test.gointernal/api/router_test.gointernal/api/stream_test.go
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.
Applied to files:
CHANGELOG.mddocs/src/content/docs/architecture.mdAGENTS.md
🪛 GitHub Check: CodeQL
internal/stream/hub.go
[failure] 232-232: Size computation for allocation may overflow
This operation, which is used in an allocation, involves a potentially large value and might overflow.
🔇 Additional comments (30)
AGENTS.md (5)
31-31: LGTM!
44-44: LGTM!
390-390: LGTM!
403-403: LGTM!
61-61: No change needed:internal/stream/hub.godefinesfilterColumns, so thestream.filterColumnsreference is correct.docs/src/content/docs/architecture.md (6)
31-31: LGTM!
52-52: LGTM!
65-65: LGTM!
79-83: LGTM!
84-92: LGTM!
253-262: LGTM!internal/policy/policy.go (2)
282-282: LGTM! (pending verification thatfilterColumnsexists ininternal/stream/— see prior request)
326-326: LGTM! (pending verification thatfilterColumnsexists ininternal/stream/— see prior request)internal/stream/doc.go (1)
3-11: LGTM!CHANGELOG.md (1)
22-23: No changelog structure fix needed The file already has an## Unreleasedsection above### Changed; the bracketed## [Unreleased]form isn’t used here.> Likely an incorrect or invalid review comment.internal/stream/hub_test.go (2)
20-38: LGTM!Also applies to: 54-64, 71-125, 148-200, 224-266
44-44: 🎯 Functional CorrectnessNo issue here: the repo targets Go 1.26.4, which supports both
strings.SplitSeqandfor i := range 8.> Likely an incorrect or invalid review comment.internal/api/stream.go (1)
18-24: LGTM!Also applies to: 46-50, 80-85, 105-110, 125-147
cmd/wavehouse/main.go (1)
283-287: LGTM!Also applies to: 311-315, 357-359
internal/api/errors_test.go (1)
17-17: LGTM!Also applies to: 197-197
internal/api/router_test.go (1)
13-13: LGTM!Also applies to: 256-256, 312-312, 384-384, 445-445, 496-496, 521-521, 618-618
internal/api/stream_test.go (1)
18-25: LGTM!Also applies to: 53-53, 78-78, 117-117
tests/integration/setup_test.go (1)
36-36: LGTM!Also applies to: 314-327
internal/stream/subscriber.go (1)
3-68: LGTM!internal/stream/bucket.go (1)
5-14: LGTM!Also applies to: 47-64
internal/stream/heartbeat.go (1)
112-112: LGTM!internal/stream/bucket_test.go (1)
34-36: LGTM!Also applies to: 56-61, 70-75, 83-95
internal/stream/heartbeat_test.go (1)
69-69: LGTM!Also applies to: 110-110
internal/stream/metrics.go (1)
27-27: LGTM!Also applies to: 43-45, 75-83
internal/stream/subscriber_test.go (1)
11-12: LGTM!Also applies to: 30-41
…mes) - project: fail closed when a policy store is wired but the payload isn't a valid EventMessage, so malformed JSON on ingest.<table> can't bypass column filtering (passthrough now only when no store is configured). - wireFrame: build by append (no payload-derived allocation size — clears the CodeQL go/allocation-size-overflow alert) and emit a "data:" prefix per newline-split line so a multi-line payload stays valid SSE. - ReplayFrame becomes a *Hub method using the Hub's own policy store, so replay can't project against a different (or nil) policy than the live fan-out; drop the now-redundant StreamHandler.PolicyStore field and its main.go wiring. - tests: table-driven filterColumns / passthrough+fail-closed / ReplayFrame, guard both frame buffers before indexing, and a wireFrame multi-line test. Addresses CodeRabbit + CodeQL feedback on #353. Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
Code Coverage OverviewLanguages: Go GoThe overall coverage in the branch remains at 90%, unchanged from the branch. Show a code coverage summary of the most impacted files.
Updated |
Add TestHub_ProjectsPerRole_DistinctRolesGetDistinctFrames: two *allowed* roles on one topic receive different column projections with distinct backing arrays. The existing tests proved same-role subscribers share bytes and a denied role gets nothing; this locks in that the frame is serialized once PER ROLE, not once globally. Also trims a defensive sentence from the wireFrame doc comment and modernizes its bytes.Split → bytes.SplitSeq (matching hub_test.go). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
There was a problem hiding this comment.
Actionable comments posted: 1
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: ASSERTIVE
Plan: Pro Plus
Run ID: dd82b859-869b-4d1c-86e5-429ea93f16d8
📒 Files selected for processing (7)
CHANGELOG.mdcmd/wavehouse/main.gointernal/api/stream.gointernal/policy/policy.gointernal/stream/filter_test.gointernal/stream/hub.gointernal/stream/hub_test.go
💤 Files with no reviewable changes (1)
- cmd/wavehouse/main.go
📜 Review details
⏰ Context from checks skipped due to timeout. (4)
- GitHub Check: Coverage
- GitHub Check: E2E tests
- GitHub Check: Docs build
- GitHub Check: Lint
⚠️ CI failures not shown inline (2)
GitHub Actions: PR housekeeping / 0_PR housekeeping.txt: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
GitHub Actions: PR housekeeping / PR housekeeping: perf(stream): project SSE frames once per role, not per subscriber
Conclusion: failure
##[group]Run # Single source of truth for the rule: scripts/lint-pr-title.sh — the
�[36;1m# Single source of truth for the rule: scripts/lint-pr-title.sh — the�[0m
�[36;1m# SAME validator the local agent gate runs (.claude/hooks/agent-bash-gate.sh),�[0m
�[36;1m# so CI and local can't drift. The checkout above is ref: main, so this is�[0m
�[36;1m# always the default-branch script. Dependabot's grouped-update titles�[0m
�[36;1m# routinely exceed the 72-char subject cap and the format isn't�[0m
�[36;1m# configurable, so Dependabot PRs are exempt from the length check�[0m
�[36;1m# (the format check still applies).�[0m
�[36;1mif [[ "$PR_AUTHOR" == "dependabot[bot]" || "$PR_AUTHOR" == "app/dependabot" ]]; then�[0m
�[36;1m export PR_TITLE_SKIP_LENGTH=1�[0m
�[36;1mfi�[0m
�[36;1m�[0m
�[36;1mif reason=$(bash scripts/lint-pr-title.sh "$PR_TITLE" 2>&1); then�[0m
�[36;1m echo "passed=true" >> "$GITHUB_OUTPUT"�[0m
�[36;1m echo "PR title OK: $PR_TITLE"�[0m
�[36;1melse�[0m
�[36;1m echo "passed=false" >> "$GITHUB_OUTPUT"�[0m
�[36;1m printf '%s\n' "$reason"�[0m
�[36;1m echo "::error::$(printf '%s' "$reason" | head -1)"�[0m
🧰 Additional context used
📓 Path-based instructions (3)
internal/policy/**
📄 CodeRabbit inference engine (AGENTS.md)
Hasura-style access control must remain fail-closed, including admin checks, empty-role handling, and deny-by-default behavior.
Files:
internal/policy/policy.go
CHANGELOG.md
📄 CodeRabbit inference engine (AGENTS.md)
Record any notable change under
[Unreleased]in the changelog.
Files:
CHANGELOG.md
internal/api/*.go
📄 CodeRabbit inference engine (AGENTS.md)
Handler error responses must match the documented API error tables.
Files:
internal/api/stream.go
🧠 Learnings (3)
📓 Common learnings
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Validate locally before every push by running `make ci` the documented way; do not use CI as the first feedback loop.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: For PR-branch pushes, run `/prepush` and ensure every required pre-push reviewer reaches `ship_it` (or is explicitly skipped on the record) before pushing.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Every code change must update the corresponding docs and `CHANGELOG.md` in the same PR.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Address every review finding substantively: fix it or track it in an issue, reply appropriately, `@-mention` the bot when required, and resolve the thread before merge.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Agents must create draft PRs only, and PR titles must pass the Conventional Commits lint check and be 72 characters or fewer.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Never force-push or rebase a PR branch; absorb upstream changes with `git merge origin/main` instead.
Learnt from: CR
Repo: Wave-RF/WaveHouse
Timestamp: 2026-07-01T18:31:43.145Z
Learning: Never hand-write markers or bypass gates with `--no-verify`; use the documented tooling instead.
📚 Learning: 2026-06-10T15:01:09.027Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 312
File: docs/src/content/docs/development.md:0-0
Timestamp: 2026-06-10T15:01:09.027Z
Learning: In this repo’s Markdown review (all .md files), do not flag capitalization/style issues for literal paths starting with ".github/" (or any substring that is a path beginning with ".github/"). Treat ".github" as the correct lowercase dotfile directory name, even when it appears inside prose or code spans; automated checks such as LanguageTool’s "(GITHUB)" rule commonly produce false positives for this literal filesystem path.
Applied to files:
CHANGELOG.md
📚 Learning: 2026-06-26T12:23:22.696Z
Learnt from: EricAndrechek
Repo: Wave-RF/WaveHouse PR: 346
File: internal/stream/subscriber_test.go:9-28
Timestamp: 2026-06-26T12:23:22.696Z
Learning: In this Go repository, prefer table-driven tests (e.g., `[]struct{...}` with `t.Run(...)`) only for tests that cover multiple scenarios/inputs and can be cleanly enumerated. Do not artificially rewrite a clear single-scenario sequential behavioral-flow test into a table-driven form just to fit the pattern; if there’s only one meaningful scenario, keep the test as a straightforward linear flow (as in `TestSubscriber_SendDeliversThenDropsWhenFull`).
Applied to files:
internal/stream/filter_test.gointernal/stream/hub_test.go
🔇 Additional comments (7)
CHANGELOG.md (1)
32-43: LGTM!internal/policy/policy.go (1)
279-283: LGTM!Also applies to: 323-327, 386-404
internal/api/stream.go (1)
18-18: LGTM!Also applies to: 44-48, 78-83, 99-108, 123-125, 132-146
internal/stream/hub.go (2)
153-208: LGTM!
235-240: 🎯 Functional CorrectnessNo action needed:
bytes.SplitSeqmatches the configured Go version. The repo targets Go 1.26.4 ingo.mod, and CI reads that file forsetup-go.> Likely an incorrect or invalid review comment.internal/stream/filter_test.go (1)
10-47: LGTM!internal/stream/hub_test.go (1)
67-72: LGTM!Also applies to: 112-149, 168-213, 269-344
TestFilterColumns_DoesNotMutateOriginal only checked the input map's length, so an in-place value rewrite or key swap that preserved cardinality would slip through. Snapshot the map before the call and compare the whole thing (CodeRabbit review, PR #353). Co-Authored-By: Claude Opus 4.8 (1M context) <noreply@anthropic.com> Claude-Session: https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF
| } | ||
| id := extractEventTimestamp(out) | ||
| n, err := fmt.Fprintf(w, "id: %s\ndata: %s\n\n", id, out) | ||
| case <-sub.Evicted(): |
There was a problem hiding this comment.
Right now the loop can only observe Evicted() while it's in this select, but the w.Write below has no deadline. So a client that stops reading blocks that write indefinitely, and the goroutine never returns here to see the signal. Do we need some sort of deadline? I'm assuming this would be in the follow up and if so making a note here.
What
First PR of the #294 SSE delivery-path throughput epic: move the broadcast hub into
internal/streamand project/serialize each live event once per(topic, role)instead of once per subscriber, building on theinternal/streamprimitives from #346.Today every connection's read loop independently runs
json.Unmarshal → policy.Evaluate → filterEventColumns → json.Marshal(plus a second unmarshal just to read theid:timestamp) on the same event — byte-identical work repeated N times. For a single-role audience (the public dashboard, every viewerpublic) that's the measured ~2 270 deliveries/s ceiling. This collapses N re-projections to 1.How
internal/stream/hub.go(new) —Hub. Subscribers register under(topic, role).Broadcastdecodes the event once, snapshots the policy once, then per subscribed role applies column policy once, builds one SSE frame, and fans it to that role'sBucket. Skips the decode entirely when nobody is listening.(topic, role)is the whole key — claims don't matter. The stream path's only per-subscriber transform is column filtering (policy.IsColumnAllowed), which derives solely from the role+table policy entry. Claims feed only the row-levelWHERE/CHECK, which the stream path never applies — so the projection is byte-identical for every subscriber of a(role, table). Documented as an invariant in code: if row-level filtering is ever added to streaming, the key must take claims into account.selectcases (keepalive vs. per-subscriber event) collapse into one pump over a singleSubscriber.Frames()queue of typedFrame{Kind, Data}. The queue grows from cap 1 (keepalive-only) to 64 so live events buffer while the handler is mid-write.stream.ReplayFrame; live frames carry the sameid: <received_timestamp>, soLast-Event-ID/?since=resumption and the SSE wire format are byte-for-byte identical (no SDK change).wavehouse_sse_dropped_frames_total(silent before). An inertSubscriber.Evicted()seam is wired for the eviction follow-up.internal/api/transform.go(transformForClienthad no non-test caller).Scope / staging
This PR is the architecture shift only. Already done by #346: the per-delivery OTel span (another #294 checkbox). Deferred to follow-ups:
Evicted()→ handler disconnects → client reconnects + gap-fills.Testing
internal/stream/{hub,filter,subscriber,bucket,heartbeat}_test.go: project-once-per-role (with a shared-backing-array assertion proving a single serialization), column-filter + table-denial, topic isolation, passthrough/invalid payloads, bucket/topic GC, the drop-metric increment,ReplayFrame, and a concurrent add/remove/broadcast race.make cigreen: unit 89.5% / integration / e2e (67 passed) + every coverage gate, all under-race.Docs
architecture.md(diagram, package tree,stream/section, Streaming Path),AGENTS.md,CHANGELOG.md, andinternal/stream/doc.goare in sync. SSE wire format / event payload (api.md) is unchanged.Part of #294 (epic — not auto-closed).
🤖 Generated with Claude Code
https://claude.ai/code/session_0131uzPDJtg8As2RnyU5nhUF