diff --git a/cmd/wavehouse-api/main.go b/cmd/wavehouse-api/main.go index 3d8a8e71..d02f9880 100644 --- a/cmd/wavehouse-api/main.go +++ b/cmd/wavehouse-api/main.go @@ -188,6 +188,9 @@ func run() int { // Start policy watch for cluster-wide updates. go policyStore.Watch(ctx) + // Start pipes watch + go pipesStore.Watch(ctx) + // Hub bridge: MQ → broadcast to connected clients. consumerName := "hub-bridge-" + uuid.New().String() if err := remoteMQ.Subscribe(ctx, "ingest.>", consumerName, func(msg *mq.Message) error { diff --git a/cmd/wavehouse/main.go b/cmd/wavehouse/main.go index b81c5812..cc9205f7 100644 --- a/cmd/wavehouse/main.go +++ b/cmd/wavehouse/main.go @@ -187,6 +187,9 @@ func run() int { // Start policy watch for cluster-wide updates. go policyStore.Watch(ctx) + // Start pipes watch + go pipesStore.Watch(ctx) + // Start batch consumer → ClickHouse. ingestStream, err := ingest.StartIngestWorker( ctx, diff --git a/internal/pipes/pipes.go b/internal/pipes/pipes.go index 803c10be..c612a124 100644 --- a/internal/pipes/pipes.go +++ b/internal/pipes/pipes.go @@ -300,3 +300,56 @@ func NewMemoryStore(queries ...*NamedQuery) *Store { logger: slog.New(slog.NewTextHandler(io.Discard, nil)), } } + +// Watch subscribes to all pipe changes in the NATS KV store so all cluster nodes stay in sync. Blocks until ctx is cancelled. +func (s *Store) Watch(ctx context.Context) { + // If NATS KV isn't configured (e.g., running NewMemoryStore for tests), just exit. + if s.kv == nil { + return + } + + // Use WatchAll because we need to monitor EVERY key (every named query) in the WAVEHOUSE_PIPES bucket. + watcher, err := s.kv.WatchAll(ctx) + if err != nil { + s.logger.Error("failed to start kv watcher for pipes", "error", err) + return + } + defer func() { + if err := watcher.Stop(); err != nil { + s.logger.Error("failed to stop watcher", "error", err) + } + }() + + for { + select { + case <-ctx.Done(): + return + case entry := <-watcher.Updates(): + if entry == nil { + continue + } + + switch entry.Operation() { + case jetstream.KeyValuePut: + var q NamedQuery + if err := json.Unmarshal(entry.Value(), &q); err != nil { + s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err) + continue + } + + s.mu.Lock() + s.cached[entry.Key()] = &q + s.mu.Unlock() + + s.logger.Info("pipe updated via cluster sync", "name", entry.Key()) + + case jetstream.KeyValueDelete, jetstream.KeyValuePurge: + s.mu.Lock() + delete(s.cached, entry.Key()) + s.mu.Unlock() + + s.logger.Info("pipe deleted via cluster sync", "name", entry.Key()) + } + } + } +} diff --git a/internal/pipes/pipes_test.go b/internal/pipes/pipes_test.go index d1df3af1..abdedc40 100644 --- a/internal/pipes/pipes_test.go +++ b/internal/pipes/pipes_test.go @@ -2,10 +2,13 @@ package pipes import ( "context" + "encoding/json" "os" "path/filepath" "testing" + "time" + "github.com/nats-io/nats.go/jetstream" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -266,3 +269,127 @@ func TestStore_LoadFromDirectory_EmptyDirReturnsNil(t *testing.T) { require.NoError(t, os.Mkdir(filepath.Join(dir, "sub"), 0o750)) assert.NoError(t, store.loadFromDirectory(ctx, dir)) } + +// ----------------------------------------------------------------------------- +// Watcher Tests & NATS Mocks +// ----------------------------------------------------------------------------- + +// mockWatcher implements jetstream.KeyWatcher for testing +type mockWatcher struct { + updates chan jetstream.KeyValueEntry +} + +func (m *mockWatcher) Updates() <-chan jetstream.KeyValueEntry { + return m.updates +} + +func (m *mockWatcher) Stop() error { + return nil +} + +// mockKVEntry implements jetstream.KeyValueEntry for testing +type mockKVEntry struct { + key string + val []byte + op jetstream.KeyValueOp + rev uint64 // Keeps track of any updates to a pipe (version number) + delta uint64 // Keeps track of how many more updates are waiting in the queue behind the current one +} + +func (m *mockKVEntry) Bucket() string { + return kvBucket +} + +func (m *mockKVEntry) Key() string { + return m.key +} + +func (m *mockKVEntry) Value() []byte { + return m.val +} + +func (m *mockKVEntry) Revision() uint64 { + return m.rev +} + +func (m *mockKVEntry) Created() time.Time { + return time.Now() +} + +func (m *mockKVEntry) Delta() uint64 { + return m.delta +} + +func (m *mockKVEntry) Operation() jetstream.KeyValueOp { + return m.op +} + +// mockKV implements a subset of jetstream.KeyValue for testing +type mockKV struct { + jetstream.KeyValue // embed to satisfy interface for unused methods + watcher *mockWatcher +} + +func (m *mockKV) WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error) { + return m.watcher, nil +} + +func TestStore_Watch_SyncsCluster(t *testing.T) { + t.Parallel() + + // Setup the mock NATS watcher + updatesCh := make(chan jetstream.KeyValueEntry) + watcher := &mockWatcher{updates: updatesCh} + kv := &mockKV{watcher: watcher} + + // Create a Store and inject our mock NATS KV + store := NewMemoryStore() + store.kv = kv // Override the nil kv with our mock + + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // Start the watcher in the background + go store.Watch(ctx) + + // Simulate NATS sending a "Put" (A new Pipe was created) + newPipe := NamedQuery{Name: "test_pipe", SQL: "SELECT 1"} + data, err := json.Marshal(newPipe) + require.NoError(t, err) + + updatesCh <- &mockKVEntry{ + key: "test_pipe", + val: data, + op: jetstream.KeyValuePut, + } + + require.Eventually(t, func() bool { + return store.Get("test_pipe") != nil + }, 1*time.Second, 10*time.Millisecond, "Pipe should have been added to cache by watcher") + + // Verify the pipe is now in local memory + cached := store.Get("test_pipe") + assert.Equal(t, "SELECT 1", cached.SQL) + + // Simulate NATS sending a "Delete" (A Pipe was removed) + updatesCh <- &mockKVEntry{ + key: "test_pipe", + op: jetstream.KeyValueDelete, + } + + require.Eventually(t, func() bool { + return store.Get("test_pipe") == nil + }, 1*time.Second, 10*time.Millisecond, "Pipe should have been removed from cache by watcher") +} + +func TestStore_Watch_NilKV(t *testing.T) { + t.Parallel() + + // If the Store is purely in-memory (no NATS configured), Watch should exit cleanly. + store := NewMemoryStore() + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() + + // This would panic if the guard clause `if s.kv == nil` wasn't there + store.Watch(ctx) +}