Skip to content
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions cmd/wavehouse-api/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -188,6 +188,9 @@ func run() int {
// Start policy watch for cluster-wide updates.
go policyStore.Watch(ctx)

// Start pipes watch
go pipesStore.Watch(ctx)

// Hub bridge: MQ → broadcast to connected clients.
consumerName := "hub-bridge-" + uuid.New().String()
if err := remoteMQ.Subscribe(ctx, "ingest.>", consumerName, func(msg *mq.Message) error {
Expand Down
3 changes: 3 additions & 0 deletions cmd/wavehouse/main.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,6 +187,9 @@ func run() int {
// Start policy watch for cluster-wide updates.
go policyStore.Watch(ctx)

// Start pipes watch
go pipesStore.Watch(ctx)

// Start batch consumer → ClickHouse.
ingestStream, err := ingest.StartIngestWorker(
ctx,
Expand Down
53 changes: 53 additions & 0 deletions internal/pipes/pipes.go
Original file line number Diff line number Diff line change
Expand Up @@ -300,3 +300,56 @@ func NewMemoryStore(queries ...*NamedQuery) *Store {
logger: slog.New(slog.NewTextHandler(io.Discard, nil)),
}
}

// Watch subscribes to all pipe changes in the NATS KV store so all cluster nodes stay in sync. Blocks until ctx is cancelled.
func (s *Store) Watch(ctx context.Context) {
// If NATS KV isn't configured (e.g., running NewMemoryStore for tests), just exit.
if s.kv == nil {
return
}

// Use WatchAll because we need to monitor EVERY key (every named query) in the WAVEHOUSE_PIPES bucket.
watcher, err := s.kv.WatchAll(ctx)
if err != nil {
s.logger.Error("failed to start kv watcher for pipes", "error", err)
return
}
defer func() {
if err := watcher.Stop(); err != nil {
s.logger.Error("failed to stop watcher", "error", err)
}
}()

for {
select {
case <-ctx.Done():
return
case entry := <-watcher.Updates():
if entry == nil {
continue
}

switch entry.Operation() {
case jetstream.KeyValuePut:
var q NamedQuery
if err := json.Unmarshal(entry.Value(), &q); err != nil {
s.logger.Error("failed to unmarshal pipe from kv watch", "key", entry.Key(), "error", err)
continue
}

s.mu.Lock()
s.cached[entry.Key()] = &q
s.mu.Unlock()

s.logger.Info("pipe updated via cluster sync", "name", entry.Key())

case jetstream.KeyValueDelete, jetstream.KeyValuePurge:
s.mu.Lock()
delete(s.cached, entry.Key())
s.mu.Unlock()

s.logger.Info("pipe deleted via cluster sync", "name", entry.Key())
}
}
}
}
127 changes: 127 additions & 0 deletions internal/pipes/pipes_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -2,10 +2,13 @@ package pipes

import (
"context"
"encoding/json"
"os"
"path/filepath"
"testing"
"time"

"github.com/nats-io/nats.go/jetstream"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
)
Expand Down Expand Up @@ -266,3 +269,127 @@ func TestStore_LoadFromDirectory_EmptyDirReturnsNil(t *testing.T) {
require.NoError(t, os.Mkdir(filepath.Join(dir, "sub"), 0o750))
assert.NoError(t, store.loadFromDirectory(ctx, dir))
}

// -----------------------------------------------------------------------------
// Watcher Tests & NATS Mocks
// -----------------------------------------------------------------------------

// mockWatcher implements jetstream.KeyWatcher for testing
type mockWatcher struct {
updates chan jetstream.KeyValueEntry
}

func (m *mockWatcher) Updates() <-chan jetstream.KeyValueEntry {
return m.updates
}

func (m *mockWatcher) Stop() error {
return nil
}

// mockKVEntry implements jetstream.KeyValueEntry for testing
type mockKVEntry struct {
key string
val []byte
op jetstream.KeyValueOp
rev uint64 // Keeps track of any updates to a pipe (version number)
delta uint64 // Keeps track of how many more updates are waiting in the queue behind the current one
}

func (m *mockKVEntry) Bucket() string {
return kvBucket
}

func (m *mockKVEntry) Key() string {
return m.key
}

func (m *mockKVEntry) Value() []byte {
return m.val
}

func (m *mockKVEntry) Revision() uint64 {
return m.rev
}

func (m *mockKVEntry) Created() time.Time {
return time.Now()
}

func (m *mockKVEntry) Delta() uint64 {
return m.delta
}

func (m *mockKVEntry) Operation() jetstream.KeyValueOp {
return m.op
}

// mockKV implements a subset of jetstream.KeyValue for testing
type mockKV struct {
jetstream.KeyValue // embed to satisfy interface for unused methods
watcher *mockWatcher
}

func (m *mockKV) WatchAll(ctx context.Context, opts ...jetstream.WatchOpt) (jetstream.KeyWatcher, error) {
return m.watcher, nil
}

func TestStore_Watch_SyncsCluster(t *testing.T) {
t.Parallel()

// Setup the mock NATS watcher
updatesCh := make(chan jetstream.KeyValueEntry)
watcher := &mockWatcher{updates: updatesCh}
kv := &mockKV{watcher: watcher}

// Create a Store and inject our mock NATS KV
store := NewMemoryStore()
store.kv = kv // Override the nil kv with our mock

ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// Start the watcher in the background
go store.Watch(ctx)

// Simulate NATS sending a "Put" (A new Pipe was created)
newPipe := NamedQuery{Name: "test_pipe", SQL: "SELECT 1"}
data, err := json.Marshal(newPipe)
require.NoError(t, err)

updatesCh <- &mockKVEntry{
key: "test_pipe",
val: data,
op: jetstream.KeyValuePut,
}

require.Eventually(t, func() bool {
return store.Get("test_pipe") != nil
}, 1*time.Second, 10*time.Millisecond, "Pipe should have been added to cache by watcher")

// Verify the pipe is now in local memory
cached := store.Get("test_pipe")
assert.Equal(t, "SELECT 1", cached.SQL)

// Simulate NATS sending a "Delete" (A Pipe was removed)
updatesCh <- &mockKVEntry{
key: "test_pipe",
op: jetstream.KeyValueDelete,
}

require.Eventually(t, func() bool {
return store.Get("test_pipe") == nil
}, 1*time.Second, 10*time.Millisecond, "Pipe should have been removed from cache by watcher")
}

func TestStore_Watch_NilKV(t *testing.T) {
t.Parallel()

// If the Store is purely in-memory (no NATS configured), Watch should exit cleanly.
store := NewMemoryStore()
ctx, cancel := context.WithCancel(context.Background())
defer cancel()

// This would panic if the guard clause `if s.kv == nil` wasn't there
store.Watch(ctx)
}
Loading