From aa7fc3ed18e3156fceb9a62d2965b371941f035e Mon Sep 17 00:00:00 2001 From: taitelee Date: Tue, 28 Apr 2026 09:54:53 -0400 Subject: [PATCH 1/4] KV pipes watcher --- cmd/wavehouse-api/main.go | 3 + cmd/wavehouse/main.go | 3 + internal/pipes/pipes.go | 43 ++++++++++++ internal/pipes/pipes_test.go | 131 +++++++++++++++++++++++++++++++++++ 4 files changed, 180 insertions(+) diff --git a/cmd/wavehouse-api/main.go b/cmd/wavehouse-api/main.go index 3d8a8e71..d02f9880 100644 --- a/cmd/wavehouse-api/main.go +++ b/cmd/wavehouse-api/main.go @@ -188,6 +188,9 @@ func run() int { // Start policy watch for cluster-wide updates. go policyStore.Watch(ctx) + // Start pipes watch + go pipesStore.Watch(ctx) + // Hub bridge: MQ → broadcast to connected clients. consumerName := "hub-bridge-" + uuid.New().String() if err := remoteMQ.Subscribe(ctx, "ingest.>", consumerName, func(msg *mq.Message) error { diff --git a/cmd/wavehouse/main.go b/cmd/wavehouse/main.go index b81c5812..cc9205f7 100644 --- a/cmd/wavehouse/main.go +++ b/cmd/wavehouse/main.go @@ -187,6 +187,9 @@ func run() int { // Start policy watch for cluster-wide updates. go policyStore.Watch(ctx) + // Start pipes watch + go pipesStore.Watch(ctx) + // Start batch consumer → ClickHouse. ingestStream, err := ingest.StartIngestWorker( ctx, diff --git a/internal/pipes/pipes.go b/internal/pipes/pipes.go index 803c10be..17bc4a4d 100644 --- a/internal/pipes/pipes.go +++ b/internal/pipes/pipes.go @@ -300,3 +300,46 @@ func NewMemoryStore(queries ...*NamedQuery) *Store { logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } } + +// Watch subscribes to all pipe changes in the NATS KV store so all cluster nodes stay in sync. Blocks until ctx is cancelled. +func (s *Store) Watch(ctx context.Context) { + // If NATS KV isn't configured (e.g., running NewMemoryStore for tests), just exit. + if s.kv == nil { + return + } + + // Use WatchAll because we need to monitor EVERY key (every named query) in the WAVEHOUSE_PIPES bucket. + watcher, err := s.kv.WatchAll(ctx) + if err != nil { + s.logger.Error("failed to start kv watcher for pipes", "error", err) + return + } + defer watcher.Stop() + + for { + select { + case <-ctx.Done(): + return + case entry := <-watcher.Updates(): + if entry == nil { + continue + } + + s.mu.Lock() + switch entry.Operation() { + case jetstream.KeyValuePut: + var q NamedQuery + if err := json.Unmarshal(entry.Value(), &q); err != nil { + s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err) + } else { + s.cached[entry.Key()] = &q + s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) + } + case jetstream.KeyValueDelete, jetstream.KeyValuePurge: + delete(s.cached, entry.Key()) + s.logger.Info("pipe deleted via cluster sync", "name", entry.Key()) + } + s.mu.Unlock() + } + } +} \ No newline at end of file diff --git a/internal/pipes/pipes_test.go b/internal/pipes/pipes_test.go index d1df3af1..72163d8b 100644 --- a/internal/pipes/pipes_test.go +++ b/internal/pipes/pipes_test.go @@ -4,8 +4,11 @@ import ( "context" "os" "path/filepath" + "encoding/json" "testing" + "time" + "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -266,3 +269,131 @@ func TestStore_LoadFromDirectory_EmptyDirReturnsNil(t *testing.T) { require.NoError(t, os.Mkdir(filepath.Join(dir, "sub"), 0o750)) assert.NoError(t, store.loadFromDirectory(ctx, dir)) } + +// ----------------------------------------------------------------------------- +// Watcher Tests & NATS Mocks +// ----------------------------------------------------------------------------- + +// mockWatcher implements jetstream.KeyWatcher for testing +type mockWatcher struct { + updates chan jetstream.KeyValueEntry +} + +func (m *mockWatcher) Updates() <-chan jetstream.KeyValueEntry { + return m.updates + +} +func (m *mockWatcher) Stop() error { + return nil +} + +// mockKVEntry implements jetstream.KeyValueEntry for testing +type mockKVEntry struct { + key string + val []byte + op jetstream.KeyValueOp + rev uint64 // Keeps track of any updates to a pipe (version number) + delta uint64 // Keeps track of how many more updates are waiting in the queue behind the current one +} + +func (m *mockKVEntry) Bucket() string { + return kvBucket +} + +func (m *mockKVEntry) Key() string { + return m.key +} + +func (m *mockKVEntry) Value() []byte { + return m.val +} + +func (m *mockKVEntry) Revision() uint64 { + return m.rev +} + +func (m *mockKVEntry) Created() time.Time { + return time.Now() +} + +func (m *mockKVEntry) Delta() uint64 { + return m.delta +} + +func (m *mockKVEntry) Operation() jetstream.KeyValueOp { + return m.op +} + +// mockKV implements a subset of jetstream.KeyValue for testing +type mockKV struct { + jetstream.KeyValue // embed to satisfy interface for unused methods + watcher *mockWatcher +} + +func (m *mockKV) WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error) { + return m.watcher, nil +} + +func TestStore_Watch_SyncsCluster(t *testing.T) { + t.Parallel() + + // Setup the mock NATS watcher + updatesCh := make(chan jetstream.KeyValueEntry) + watcher := &mockWatcher{updates: updatesCh} + kv := &mockKV{watcher: watcher} + + // Create a Store and inject our mock NATS KV + store := NewMemoryStore() + store.kv = kv // Override the nil kv with our mock + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the watcher in the background + go store.Watch(ctx) + + // Yield briefly to let the background goroutine start + time.Sleep(10 * time.Millisecond) + + // Simulate NATS sending a "Put" (A new Pipe was created) + newPipe := NamedQuery{Name: "test_pipe", SQL: "SELECT 1"} + data, err := json.Marshal(newPipe) + require.NoError(t, err) + + updatesCh <- &mockKVEntry{ + key: "test_pipe", + val: data, + op: jetstream.KeyValuePut, + } + + // Wait for the background goroutine to process the channel + time.Sleep(10 * time.Millisecond) + + // Verify the pipe is now in local memory + cached := store.Get("test_pipe") + require.NotNil(t, cached, "Pipe should have been added to cache by watcher") + assert.Equal(t, "SELECT 1", cached.SQL) + + // Simulate NATS sending a "Delete" (A Pipe was removed) + updatesCh <- &mockKVEntry{ + key: "test_pipe", + op: jetstream.KeyValueDelete, + } + + time.Sleep(10 * time.Millisecond) + + // Verify the pipe was removed from local memory + assert.Nil(t, store.Get("test_pipe"), "Pipe should have been removed from cache by watcher") +} + +func TestStore_Watch_NilKV(t *testing.T) { + t.Parallel() + + // If the Store is purely in-memory (no NATS configured), Watch should exit cleanly. + store := NewMemoryStore() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // This would panic if the guard clause `if s.kv == nil` wasn't there + store.Watch(ctx) +} From 49faf2acbcb82ff751544d43b16b34868321a22c Mon Sep 17 00:00:00 2001 From: taitelee Date: Tue, 28 Apr 2026 14:04:14 -0400 Subject: [PATCH 2/4] perf(pipes): optimize lock duration in watcher and fix flaky unit tests --- internal/pipes/pipes.go | 16 +++++++++++----- internal/pipes/pipes_test.go | 16 ++++++---------- 2 files changed, 17 insertions(+), 15 deletions(-) diff --git a/internal/pipes/pipes.go b/internal/pipes/pipes.go index 17bc4a4d..c8dfd426 100644 --- a/internal/pipes/pipes.go +++ b/internal/pipes/pipes.go @@ -325,21 +325,27 @@ func (s *Store) Watch(ctx context.Context) { continue } - s.mu.Lock() switch entry.Operation() { case jetstream.KeyValuePut: var q NamedQuery if err := json.Unmarshal(entry.Value(), &q); err != nil { s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err) - } else { - s.cached[entry.Key()] = &q - s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) + continue } + + s.mu.Lock() + s.cached[entry.Key()] = &q + s.mu.Unlock() + + s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) + case jetstream.KeyValueDelete, jetstream.KeyValuePurge: + s.mu.Lock() delete(s.cached, entry.Key()) + s.mu.Unlock() + s.logger.Info("pipe deleted via cluster sync", "name", entry.Key()) } - s.mu.Unlock() } } } \ No newline at end of file diff --git a/internal/pipes/pipes_test.go b/internal/pipes/pipes_test.go index 72163d8b..33844b89 100644 --- a/internal/pipes/pipes_test.go +++ b/internal/pipes/pipes_test.go @@ -352,9 +352,6 @@ func TestStore_Watch_SyncsCluster(t *testing.T) { // Start the watcher in the background go store.Watch(ctx) - // Yield briefly to let the background goroutine start - time.Sleep(10 * time.Millisecond) - // Simulate NATS sending a "Put" (A new Pipe was created) newPipe := NamedQuery{Name: "test_pipe", SQL: "SELECT 1"} data, err := json.Marshal(newPipe) @@ -366,12 +363,12 @@ func TestStore_Watch_SyncsCluster(t *testing.T) { op: jetstream.KeyValuePut, } - // Wait for the background goroutine to process the channel - time.Sleep(10 * time.Millisecond) + require.Eventually(t, func() bool { + return store.Get("test_pipe") != nil + }, 1*time.Second, 10*time.Millisecond, "Pipe should have been added to cache by watcher") // Verify the pipe is now in local memory cached := store.Get("test_pipe") - require.NotNil(t, cached, "Pipe should have been added to cache by watcher") assert.Equal(t, "SELECT 1", cached.SQL) // Simulate NATS sending a "Delete" (A Pipe was removed) @@ -380,10 +377,9 @@ func TestStore_Watch_SyncsCluster(t *testing.T) { op: jetstream.KeyValueDelete, } - time.Sleep(10 * time.Millisecond) - - // Verify the pipe was removed from local memory - assert.Nil(t, store.Get("test_pipe"), "Pipe should have been removed from cache by watcher") + require.Eventually(t, func() bool { + return store.Get("test_pipe") == nil + }, 1*time.Second, 10*time.Millisecond, "Pipe should have been removed from cache by watcher") } func TestStore_Watch_NilKV(t *testing.T) { From 48ddf2f400352acdca553b963d4fc0543cc48461 Mon Sep 17 00:00:00 2001 From: taitelee Date: Tue, 28 Apr 2026 14:22:30 -0400 Subject: [PATCH 3/4] make fmt --- internal/pipes/pipes.go | 4 ++-- internal/pipes/pipes_test.go | 12 ++++++------ 2 files changed, 8 insertions(+), 8 deletions(-) diff --git a/internal/pipes/pipes.go b/internal/pipes/pipes.go index c8dfd426..abb0992d 100644 --- a/internal/pipes/pipes.go +++ b/internal/pipes/pipes.go @@ -338,7 +338,7 @@ func (s *Store) Watch(ctx context.Context) { s.mu.Unlock() s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) - + case jetstream.KeyValueDelete, jetstream.KeyValuePurge: s.mu.Lock() delete(s.cached, entry.Key()) @@ -348,4 +348,4 @@ func (s *Store) Watch(ctx context.Context) { } } } -} \ No newline at end of file +} diff --git a/internal/pipes/pipes_test.go b/internal/pipes/pipes_test.go index 33844b89..abdedc40 100644 --- a/internal/pipes/pipes_test.go +++ b/internal/pipes/pipes_test.go @@ -2,9 +2,9 @@ package pipes import ( "context" + "encoding/json" "os" "path/filepath" - "encoding/json" "testing" "time" @@ -281,8 +281,8 @@ type mockWatcher struct { func (m *mockWatcher) Updates() <-chan jetstream.KeyValueEntry { return m.updates - } + func (m *mockWatcher) Stop() error { return nil } @@ -292,8 +292,8 @@ type mockKVEntry struct { key string val []byte op jetstream.KeyValueOp - rev uint64 // Keeps track of any updates to a pipe (version number) - delta uint64 // Keeps track of how many more updates are waiting in the queue behind the current one + rev uint64 // Keeps track of any updates to a pipe (version number) + delta uint64 // Keeps track of how many more updates are waiting in the queue behind the current one } func (m *mockKVEntry) Bucket() string { @@ -336,7 +336,7 @@ func (m *mockKV) WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jets func TestStore_Watch_SyncsCluster(t *testing.T) { t.Parallel() - + // Setup the mock NATS watcher updatesCh := make(chan jetstream.KeyValueEntry) watcher := &mockWatcher{updates: updatesCh} @@ -384,7 +384,7 @@ func TestStore_Watch_SyncsCluster(t *testing.T) { func TestStore_Watch_NilKV(t *testing.T) { t.Parallel() - + // If the Store is purely in-memory (no NATS configured), Watch should exit cleanly. store := NewMemoryStore() ctx, cancel := context.WithCancel(context.Background()) From 8ff50585702cf23a8592c493543a49fe26cf55ef Mon Sep 17 00:00:00 2001 From: taitelee Date: Thu, 30 Apr 2026 09:29:17 -0400 Subject: [PATCH 4/4] Checking watcher.Stop() for errcheck --- internal/pipes/pipes.go | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/internal/pipes/pipes.go b/internal/pipes/pipes.go index abb0992d..c612a124 100644 --- a/internal/pipes/pipes.go +++ b/internal/pipes/pipes.go @@ -314,7 +314,11 @@ func (s *Store) Watch(ctx context.Context) { s.logger.Error("failed to start kv watcher for pipes", "error", err) return } - defer watcher.Stop() + defer func() { + if err := watcher.Stop(); err != nil { + s.logger.Error("failed to stop watcher", "error", err) + } + }() for { select {