diff --git a/cmd/import.go b/cmd/import.go index c1a74c4..8bf869b 100644 --- a/cmd/import.go +++ b/cmd/import.go @@ -16,10 +16,13 @@ package cmd import ( + "context" "fmt" "os" + "os/signal" "strconv" "strings" + "syscall" "github.com/microcks/microcks-cli/pkg/config" "github.com/microcks/microcks-cli/pkg/connectors" @@ -132,7 +135,7 @@ func NewImportCommand(globalClientOpts *connectors.ClientOptions) *cobra.Command } // Try uploading this artifact. - msg, err := mc.UploadArtifact(f, mainArtifact) + msg, err := mc.UploadArtifact(context.Background(), f, mainArtifact) if err != nil { fmt.Printf("Got error when invoking Microcks client importing Artifact: %s", err) os.Exit(1) @@ -173,16 +176,19 @@ func NewImportCommand(globalClientOpts *connectors.ClientOptions) *cobra.Command } // Start watcher if --watch flag is provided. - if watch { - watchFile, err := config.DefaultLocalWatchPath() - errors.CheckError(err) + if watch { + watchFile, err := config.DefaultLocalWatchPath() + errors.CheckError(err) - wm, err := watcher.NewWatchManger(watchFile) - errors.CheckError(err) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() - fmt.Println("Watch mode enabled - microcks-watcher started...") - wm.Run() - } + wm, err := watcher.NewWatchManger(ctx, watchFile) + errors.CheckError(err) + + fmt.Println("Watch mode enabled - microcks-watcher started...") + wm.Run() + } }, } diff --git a/cmd/importDir.go b/cmd/importDir.go index 16a9157..b1d66d8 100644 --- a/cmd/importDir.go +++ b/cmd/importDir.go @@ -16,6 +16,7 @@ package cmd import ( + "context" "fmt" "os" "path/filepath" @@ -28,7 +29,7 @@ import ( // MicrocksClient interface for dependency injection type MicrocksClient interface { - UploadArtifact(file string, main bool) (string, error) + UploadArtifact(ctx context.Context, file string, main bool) (string, error) } type FileType struct { @@ -231,7 +232,7 @@ func ImportDirectory(client MicrocksClient, fs FileSystem, dirPath string, confi for _, file := range files { fileType := detectFileType(file) - msg, err := client.UploadArtifact(file, fileType.IsPrimary) + msg, err := client.UploadArtifact(context.Background(), file, fileType.IsPrimary) if err != nil { result.FailedCount++ result.FailedFiles = append(result.FailedFiles, file) diff --git a/cmd/importDir_test.go b/cmd/importDir_test.go index 5693880..2b990ec 100644 --- a/cmd/importDir_test.go +++ b/cmd/importDir_test.go @@ -16,6 +16,7 @@ package cmd import ( + "context" "fmt" "os" "path/filepath" @@ -33,7 +34,7 @@ type MockMicrocksClient struct { UploadCalls int } -func (m *MockMicrocksClient) UploadArtifact(file string, main bool) (string, error) { +func (m *MockMicrocksClient) UploadArtifact(ctx context.Context, file string, main bool) (string, error) { m.UploadCalls++ m.Uploaded = append(m.Uploaded, file) diff --git a/pkg/connectors/microcks_client.go b/pkg/connectors/microcks_client.go index f76884b..a82a088 100644 --- a/pkg/connectors/microcks_client.go +++ b/pkg/connectors/microcks_client.go @@ -50,7 +50,7 @@ type MicrocksClient interface { SetOAuthToken(oauthToken string) CreateTestResult(serviceID string, testEndpoint string, runnerType string, secretName string, timeout int64, filteredOperations string, operationsHeaders string, oAuth2Context string) (string, error) GetTestResult(testResultID string) (*TestResultSummary, error) - UploadArtifact(specificationFilePath string, mainArtifact bool) (string, error) + UploadArtifact(ctx context.Context, specificationFilePath string, mainArtifact bool) (string, error) DownloadArtifact(artifactURL string, mainArtifact bool, secret string) (string, error) } @@ -417,7 +417,7 @@ func (c *microcksClient) GetTestResult(testResultID string) (*TestResultSummary, return &result, nil } -func (c *microcksClient) UploadArtifact(specificationFilePath string, mainArtifact bool) (string, error) { +func (c *microcksClient) UploadArtifact(ctx context.Context, specificationFilePath string, mainArtifact bool) (string, error) { // Ensure file exists on fs. file, err := os.Open(specificationFilePath) if err != nil { @@ -445,18 +445,16 @@ func (c *microcksClient) UploadArtifact(specificationFilePath string, mainArtifa return "", err } - // Ensure we have a correct URL. rel := &url.URL{Path: "artifact/upload"} u := c.APIURL.ResolveReference(rel) - req, err := http.NewRequest("POST", u.String(), body) + req, err := http.NewRequestWithContext(ctx, "POST", u.String(), body) if err != nil { return "", err } req.Header.Set("Content-Type", writer.FormDataContentType()) req.Header.Set("Authorization", "Bearer "+c.AuthToken) - // Dump request if verbose required. config.DumpRequestIfRequired("Microcks for uploading artifact", req, true) resp, err := c.httpClient.Do(req) diff --git a/pkg/watcher/executor.go b/pkg/watcher/executor.go index 3347fc6..d17ad93 100644 --- a/pkg/watcher/executor.go +++ b/pkg/watcher/executor.go @@ -1,6 +1,7 @@ package watcher import ( + "context" "fmt" "os" @@ -8,21 +9,18 @@ import ( "github.com/microcks/microcks-cli/pkg/connectors" ) -func TriggerImport(entry config.WatchEntry) { - // Retrieve config to get client options. +func TriggerImport(ctx context.Context, entry config.WatchEntry) { cfgPath, err := config.DefaultLocalConfigPath() if err != nil { - fmt.Errorf("Error while loading config: %s", err.Error()) + fmt.Printf("[ERROR] Error while loading config: %v\n", err) + return } fmt.Println("[INFO] Re-importing changed file: " + entry.FilePath) for _, context := range entry.Context { - - // Prepare Microcks client. var mc connectors.MicrocksClient - // If config path exist, instantiate client with it. if _, err := os.Stat(cfgPath); err == nil { globalClientOpts := &connectors.ClientOptions{ ConfigPath: cfgPath, @@ -32,13 +30,13 @@ func TriggerImport(entry config.WatchEntry) { mc, err = connectors.NewClient(*globalClientOpts) if err != nil { fmt.Printf("[ERROR] Cannot connect to Microcks client: %v in context '%s'\n", err, context) + continue } } else { - // We have no config file, so just create a client with context as server URL. mc = connectors.NewMicrocksClient(context) } - _, err = mc.UploadArtifact(entry.FilePath, entry.MainArtifact) + _, err = mc.UploadArtifact(ctx, entry.FilePath, entry.MainArtifact) if err != nil { fmt.Printf("[WARN] Error re-importing %s: %v\n", entry.FilePath, err) } else { diff --git a/pkg/watcher/watchManager.go b/pkg/watcher/watchManager.go index 5b0f717..d9bfa8f 100644 --- a/pkg/watcher/watchManager.go +++ b/pkg/watcher/watchManager.go @@ -1,23 +1,32 @@ package watcher import ( + "context" "fmt" "log" "sync" + "time" "github.com/fsnotify/fsnotify" "github.com/microcks/microcks-cli/pkg/config" "github.com/microcks/microcks-cli/pkg/errors" ) +const debounceInterval = 300 * time.Millisecond + type WatchManager struct { fileWatcher *fsnotify.Watcher configPath string watchEntries map[string]config.WatchEntry lock sync.Mutex + ctx context.Context + cancel context.CancelFunc + pending map[string]*time.Timer + importQueue chan config.WatchEntry + triggerFunc func(ctx context.Context, entry config.WatchEntry) } -func NewWatchManger(configPath string) (*WatchManager, error) { +func NewWatchManger(ctx context.Context, configPath string) (*WatchManager, error) { fw, err := fsnotify.NewWatcher() if err != nil { return nil, err @@ -28,10 +37,17 @@ func NewWatchManger(configPath string) (*WatchManager, error) { return nil, err } + childCtx, cancel := context.WithCancel(ctx) + wm := &WatchManager{ fileWatcher: fw, configPath: configPath, watchEntries: make(map[string]config.WatchEntry), + ctx: childCtx, + cancel: cancel, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + triggerFunc: TriggerImport, } err = wm.Reload() @@ -53,15 +69,12 @@ func (wm *WatchManager) Reload() error { newFiles[entry.FilePath] = entry } - // Remove stale watchers for file := range wm.watchEntries { - if _, exists := newFiles[file]; !exists { wm.fileWatcher.Remove(file) } } - // Add new watchers for file := range newFiles { if _, exists := wm.watchEntries[file]; !exists { err := wm.fileWatcher.Add(file) @@ -77,30 +90,99 @@ func (wm *WatchManager) Reload() error { return nil } +func (wm *WatchManager) Stop() { + if wm.cancel != nil { + wm.cancel() + } +} + func (wm *WatchManager) Run() { + go wm.worker() + for { select { + case <-wm.ctx.Done(): + wm.drainPendingTimers() + close(wm.importQueue) + return case event := <-wm.fileWatcher.Events: - if event.Op&fsnotify.Write == fsnotify.Write { - if event.Name == wm.configPath { - fmt.Println("[INFO] Reloading config...") - wm.lock.Lock() - err := wm.Reload() - wm.lock.Unlock() - if err != nil { - errors.CheckError(err) - } - } else { - wm.lock.Lock() - entry, exists := wm.watchEntries[event.Name] - wm.lock.Unlock() - if exists { - go TriggerImport(entry) - } - } - } + wm.handleEvent(event) case err := <-wm.fileWatcher.Errors: log.Printf("[ERROR] Watcher error: %v", err) } } } + +func (wm *WatchManager) handleEvent(event fsnotify.Event) { + if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) == 0 { + return + } + + if event.Name == wm.configPath { + if event.Op&fsnotify.Write == fsnotify.Write { + fmt.Println("[INFO] Reloading config...") + wm.lock.Lock() + err := wm.Reload() + wm.lock.Unlock() + if err != nil { + errors.CheckError(err) + } + } + return + } + + wm.lock.Lock() + entry, exists := wm.watchEntries[event.Name] + wm.lock.Unlock() + + if !exists { + return + } + + if event.Op&(fsnotify.Create|fsnotify.Rename) != 0 { + wm.fileWatcher.Remove(event.Name) + if err := wm.fileWatcher.Add(event.Name); err != nil { + log.Printf("[WARN] Cannot re-watch file %s: %v", event.Name, err) + } + } + + wm.debounce(event.Name, entry) +} + +func (wm *WatchManager) debounce(path string, entry config.WatchEntry) { + wm.lock.Lock() + defer wm.lock.Unlock() + + if t, ok := wm.pending[path]; ok { + t.Stop() + delete(wm.pending, path) + } + + wm.pending[path] = time.AfterFunc(debounceInterval, func() { + wm.lock.Lock() + delete(wm.pending, path) + wm.lock.Unlock() + + select { + case <-wm.ctx.Done(): + return + case wm.importQueue <- entry: + } + }) +} + +func (wm *WatchManager) drainPendingTimers() { + wm.lock.Lock() + defer wm.lock.Unlock() + + for path, t := range wm.pending { + t.Stop() + delete(wm.pending, path) + } +} + +func (wm *WatchManager) worker() { + for entry := range wm.importQueue { + wm.triggerFunc(wm.ctx, entry) + } +} diff --git a/pkg/watcher/watchManager_test.go b/pkg/watcher/watchManager_test.go new file mode 100644 index 0000000..34ce872 --- /dev/null +++ b/pkg/watcher/watchManager_test.go @@ -0,0 +1,223 @@ +package watcher + +import ( + "context" + "sync/atomic" + "testing" + "time" + + "github.com/fsnotify/fsnotify" + "github.com/microcks/microcks-cli/pkg/config" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDebounceCollapsesRapidEvents(t *testing.T) { + var callCount int32 + + wm := &WatchManager{ + watchEntries: map[string]config.WatchEntry{ + "/tmp/test-spec.yaml": {FilePath: "/tmp/test-spec.yaml", Context: []string{"ctx1"}}, + }, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: context.Background(), + triggerFunc: func(_ context.Context, _ config.WatchEntry) { + atomic.AddInt32(&callCount, 1) + }, + } + + go wm.worker() + + for i := 0; i < 5; i++ { + wm.debounce("/tmp/test-spec.yaml", config.WatchEntry{FilePath: "/tmp/test-spec.yaml"}) + } + + time.Sleep(500 * time.Millisecond) + + wm.Stop() + + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount), "debounce should collapse 5 rapid events into 1 import") +} + +func TestDebounceDifferentFilesQueued(t *testing.T) { + var calls []string + var callMu chan struct{} = make(chan struct{}, 10) + + wm := &WatchManager{ + watchEntries: map[string]config.WatchEntry{ + "/tmp/a.yaml": {FilePath: "/tmp/a.yaml", Context: []string{"ctx1"}}, + "/tmp/b.yaml": {FilePath: "/tmp/b.yaml", Context: []string{"ctx1"}}, + }, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: context.Background(), + triggerFunc: func(_ context.Context, entry config.WatchEntry) { + calls = append(calls, entry.FilePath) + callMu <- struct{}{} + }, + } + + go wm.worker() + + wm.debounce("/tmp/a.yaml", config.WatchEntry{FilePath: "/tmp/a.yaml"}) + wm.debounce("/tmp/b.yaml", config.WatchEntry{FilePath: "/tmp/b.yaml"}) + + time.Sleep(500 * time.Millisecond) + + assert.Len(t, calls, 2, "two different files should each trigger one import") + assert.Contains(t, calls, "/tmp/a.yaml") + assert.Contains(t, calls, "/tmp/b.yaml") + + wm.Stop() +} + +func TestContextCancellationStopsRun(t *testing.T) { + fw, err := fsnotify.NewWatcher() + require.NoError(t, err) + + ctx, cancel := context.WithCancel(context.Background()) + + wm := &WatchManager{ + fileWatcher: fw, + watchEntries: make(map[string]config.WatchEntry), + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: ctx, + cancel: cancel, + triggerFunc: func(_ context.Context, _ config.WatchEntry) {}, + } + + done := make(chan struct{}) + go func() { + wm.Run() + close(done) + }() + + cancel() + + select { + case <-done: + case <-time.After(2 * time.Second): + t.Fatal("Run() did not exit after context cancellation") + } +} + +func TestHandleEventIgnoresIrrelevantOps(t *testing.T) { + fw, err := fsnotify.NewWatcher() + require.NoError(t, err) + + var callCount int32 + + ctx, cancel := context.WithCancel(context.Background()) + + wm := &WatchManager{ + fileWatcher: fw, + watchEntries: map[string]config.WatchEntry{ + "/tmp/test.yaml": {FilePath: "/tmp/test.yaml", Context: []string{"ctx1"}}, + }, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: ctx, + cancel: cancel, + triggerFunc: func(_ context.Context, _ config.WatchEntry) { + atomic.AddInt32(&callCount, 1) + }, + } + + wm.handleEvent(fsnotify.Event{Name: "/tmp/test.yaml", Op: fsnotify.Chmod}) + + time.Sleep(100 * time.Millisecond) + + assert.Equal(t, int32(0), atomic.LoadInt32(&callCount), "Chmod events should be ignored") + + cancel() +} + +func TestHandleEventProcessesWrite(t *testing.T) { + var callCount int32 + + ctx, cancel := context.WithCancel(context.Background()) + + wm := &WatchManager{ + watchEntries: map[string]config.WatchEntry{ + "/tmp/test.yaml": {FilePath: "/tmp/test.yaml", Context: []string{"ctx1"}}, + }, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: ctx, + cancel: cancel, + triggerFunc: func(_ context.Context, _ config.WatchEntry) { + atomic.AddInt32(&callCount, 1) + }, + } + + go wm.worker() + + wm.handleEvent(fsnotify.Event{Name: "/tmp/test.yaml", Op: fsnotify.Write}) + + time.Sleep(500 * time.Millisecond) + + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount), "Write event should trigger import") + + wm.Stop() +} + +func TestHandleEventProcessesCreateAndRename(t *testing.T) { + var callCount int32 + + ctx, cancel := context.WithCancel(context.Background()) + + fw, err := fsnotify.NewWatcher() + require.NoError(t, err) + + wm := &WatchManager{ + fileWatcher: fw, + watchEntries: map[string]config.WatchEntry{ + "/tmp/test.yaml": {FilePath: "/tmp/test.yaml", Context: []string{"ctx1"}}, + }, + pending: make(map[string]*time.Timer), + importQueue: make(chan config.WatchEntry, 1), + ctx: ctx, + cancel: cancel, + triggerFunc: func(_ context.Context, _ config.WatchEntry) { + atomic.AddInt32(&callCount, 1) + }, + } + + go wm.worker() + + wm.handleEvent(fsnotify.Event{Name: "/tmp/test.yaml", Op: fsnotify.Create}) + wm.handleEvent(fsnotify.Event{Name: "/tmp/test.yaml", Op: fsnotify.Rename}) + + time.Sleep(500 * time.Millisecond) + + assert.Equal(t, int32(1), atomic.LoadInt32(&callCount), "Create+Rename bursts should be debounced to 1 import") + + wm.Stop() +} + +func TestDrainPendingTimers(t *testing.T) { + wm := &WatchManager{ + pending: make(map[string]*time.Timer), + } + + wm.pending["a"] = time.AfterFunc(10*time.Second, func() {}) + wm.pending["b"] = time.AfterFunc(10*time.Second, func() {}) + + wm.drainPendingTimers() + + assert.Len(t, wm.pending, 0, "all pending timers should be drained") +} + +func TestStopCancelsContext(t *testing.T) { + ctx, cancel := context.WithCancel(context.Background()) + wm := &WatchManager{ + ctx: ctx, + cancel: cancel, + } + + wm.Stop() + + assert.Error(t, wm.ctx.Err(), context.Canceled, "Stop() should cancel the context") +} diff --git a/watcher/main.go b/watcher/main.go index 54f06cd..48a64c4 100644 --- a/watcher/main.go +++ b/watcher/main.go @@ -1,7 +1,10 @@ package main import ( + "context" "fmt" + "os/signal" + "syscall" "github.com/microcks/microcks-cli/pkg/config" "github.com/microcks/microcks-cli/pkg/errors" @@ -12,7 +15,10 @@ func main() { watchFile, err := config.DefaultLocalWatchPath() errors.CheckError(err) - wm, err := watcher.NewWatchManger(watchFile) + ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stop() + + wm, err := watcher.NewWatchManger(ctx, watchFile) errors.CheckError(err) fmt.Println("[INFO] microcks-watcher started...")