Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
24 changes: 15 additions & 9 deletions cmd/import.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,10 +16,13 @@
package cmd

import (
"context"
"fmt"
"os"
"os/signal"
"strconv"
"strings"
"syscall"

"github.com/microcks/microcks-cli/pkg/config"
"github.com/microcks/microcks-cli/pkg/connectors"
Expand Down Expand Up @@ -132,7 +135,7 @@ func NewImportCommand(globalClientOpts *connectors.ClientOptions) *cobra.Command
}

// Try uploading this artifact.
msg, err := mc.UploadArtifact(f, mainArtifact)
msg, err := mc.UploadArtifact(context.Background(), f, mainArtifact)
if err != nil {
fmt.Printf("Got error when invoking Microcks client importing Artifact: %s", err)
os.Exit(1)
Expand Down Expand Up @@ -173,16 +176,19 @@ func NewImportCommand(globalClientOpts *connectors.ClientOptions) *cobra.Command
}

// Start watcher if --watch flag is provided.
if watch {
watchFile, err := config.DefaultLocalWatchPath()
errors.CheckError(err)
if watch {
watchFile, err := config.DefaultLocalWatchPath()
errors.CheckError(err)

wm, err := watcher.NewWatchManger(watchFile)
errors.CheckError(err)
ctx, stop := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM)
defer stop()

fmt.Println("Watch mode enabled - microcks-watcher started...")
wm.Run()
}
wm, err := watcher.NewWatchManger(ctx, watchFile)
errors.CheckError(err)

fmt.Println("Watch mode enabled - microcks-watcher started...")
wm.Run()
}
},
}

Expand Down
5 changes: 3 additions & 2 deletions cmd/importDir.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package cmd

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -28,7 +29,7 @@ import (

// MicrocksClient interface for dependency injection
type MicrocksClient interface {
UploadArtifact(file string, main bool) (string, error)
UploadArtifact(ctx context.Context, file string, main bool) (string, error)
}

type FileType struct {
Expand Down Expand Up @@ -231,7 +232,7 @@ func ImportDirectory(client MicrocksClient, fs FileSystem, dirPath string, confi
for _, file := range files {
fileType := detectFileType(file)

msg, err := client.UploadArtifact(file, fileType.IsPrimary)
msg, err := client.UploadArtifact(context.Background(), file, fileType.IsPrimary)
if err != nil {
result.FailedCount++
result.FailedFiles = append(result.FailedFiles, file)
Expand Down
3 changes: 2 additions & 1 deletion cmd/importDir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,7 @@
package cmd

import (
"context"
"fmt"
"os"
"path/filepath"
Expand All @@ -33,7 +34,7 @@ type MockMicrocksClient struct {
UploadCalls int
}

func (m *MockMicrocksClient) UploadArtifact(file string, main bool) (string, error) {
func (m *MockMicrocksClient) UploadArtifact(ctx context.Context, file string, main bool) (string, error) {
m.UploadCalls++
m.Uploaded = append(m.Uploaded, file)

Expand Down
8 changes: 3 additions & 5 deletions pkg/connectors/microcks_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,7 +50,7 @@ type MicrocksClient interface {
SetOAuthToken(oauthToken string)
CreateTestResult(serviceID string, testEndpoint string, runnerType string, secretName string, timeout int64, filteredOperations string, operationsHeaders string, oAuth2Context string) (string, error)
GetTestResult(testResultID string) (*TestResultSummary, error)
UploadArtifact(specificationFilePath string, mainArtifact bool) (string, error)
UploadArtifact(ctx context.Context, specificationFilePath string, mainArtifact bool) (string, error)
DownloadArtifact(artifactURL string, mainArtifact bool, secret string) (string, error)
}

Expand Down Expand Up @@ -417,7 +417,7 @@ func (c *microcksClient) GetTestResult(testResultID string) (*TestResultSummary,
return &result, nil
}

func (c *microcksClient) UploadArtifact(specificationFilePath string, mainArtifact bool) (string, error) {
func (c *microcksClient) UploadArtifact(ctx context.Context, specificationFilePath string, mainArtifact bool) (string, error) {
// Ensure file exists on fs.
file, err := os.Open(specificationFilePath)
if err != nil {
Expand Down Expand Up @@ -445,18 +445,16 @@ func (c *microcksClient) UploadArtifact(specificationFilePath string, mainArtifa
return "", err
}

// Ensure we have a correct URL.
rel := &url.URL{Path: "artifact/upload"}
u := c.APIURL.ResolveReference(rel)

req, err := http.NewRequest("POST", u.String(), body)
req, err := http.NewRequestWithContext(ctx, "POST", u.String(), body)
if err != nil {
return "", err
}
req.Header.Set("Content-Type", writer.FormDataContentType())
req.Header.Set("Authorization", "Bearer "+c.AuthToken)

// Dump request if verbose required.
config.DumpRequestIfRequired("Microcks for uploading artifact", req, true)

resp, err := c.httpClient.Do(req)
Expand Down
14 changes: 6 additions & 8 deletions pkg/watcher/executor.go
Original file line number Diff line number Diff line change
@@ -1,28 +1,26 @@
package watcher

import (
"context"
"fmt"
"os"

"github.com/microcks/microcks-cli/pkg/config"
"github.com/microcks/microcks-cli/pkg/connectors"
)

func TriggerImport(entry config.WatchEntry) {
// Retrieve config to get client options.
func TriggerImport(ctx context.Context, entry config.WatchEntry) {
cfgPath, err := config.DefaultLocalConfigPath()
if err != nil {
fmt.Errorf("Error while loading config: %s", err.Error())
fmt.Printf("[ERROR] Error while loading config: %v\n", err)
return
}

fmt.Println("[INFO] Re-importing changed file: " + entry.FilePath)

for _, context := range entry.Context {

// Prepare Microcks client.
var mc connectors.MicrocksClient

// If config path exist, instantiate client with it.
if _, err := os.Stat(cfgPath); err == nil {
globalClientOpts := &connectors.ClientOptions{
ConfigPath: cfgPath,
Expand All @@ -32,13 +30,13 @@ func TriggerImport(entry config.WatchEntry) {
mc, err = connectors.NewClient(*globalClientOpts)
if err != nil {
fmt.Printf("[ERROR] Cannot connect to Microcks client: %v in context '%s'\n", err, context)
continue
}
} else {
// We have no config file, so just create a client with context as server URL.
mc = connectors.NewMicrocksClient(context)
}

_, err = mc.UploadArtifact(entry.FilePath, entry.MainArtifact)
_, err = mc.UploadArtifact(ctx, entry.FilePath, entry.MainArtifact)
if err != nil {
fmt.Printf("[WARN] Error re-importing %s: %v\n", entry.FilePath, err)
} else {
Expand Down
126 changes: 104 additions & 22 deletions pkg/watcher/watchManager.go
Original file line number Diff line number Diff line change
@@ -1,23 +1,32 @@
package watcher

import (
"context"
"fmt"
"log"
"sync"
"time"

"github.com/fsnotify/fsnotify"
"github.com/microcks/microcks-cli/pkg/config"
"github.com/microcks/microcks-cli/pkg/errors"
)

const debounceInterval = 300 * time.Millisecond

type WatchManager struct {
fileWatcher *fsnotify.Watcher
configPath string
watchEntries map[string]config.WatchEntry
lock sync.Mutex
ctx context.Context
cancel context.CancelFunc
pending map[string]*time.Timer
importQueue chan config.WatchEntry
triggerFunc func(ctx context.Context, entry config.WatchEntry)
}

func NewWatchManger(configPath string) (*WatchManager, error) {
func NewWatchManger(ctx context.Context, configPath string) (*WatchManager, error) {
fw, err := fsnotify.NewWatcher()
if err != nil {
return nil, err
Expand All @@ -28,10 +37,17 @@ func NewWatchManger(configPath string) (*WatchManager, error) {
return nil, err
}

childCtx, cancel := context.WithCancel(ctx)

wm := &WatchManager{
fileWatcher: fw,
configPath: configPath,
watchEntries: make(map[string]config.WatchEntry),
ctx: childCtx,
cancel: cancel,
pending: make(map[string]*time.Timer),
importQueue: make(chan config.WatchEntry, 1),
triggerFunc: TriggerImport,
}

err = wm.Reload()
Expand All @@ -53,15 +69,12 @@ func (wm *WatchManager) Reload() error {
newFiles[entry.FilePath] = entry
}

// Remove stale watchers
for file := range wm.watchEntries {

if _, exists := newFiles[file]; !exists {
wm.fileWatcher.Remove(file)
}
}

// Add new watchers
for file := range newFiles {
if _, exists := wm.watchEntries[file]; !exists {
err := wm.fileWatcher.Add(file)
Expand All @@ -77,30 +90,99 @@ func (wm *WatchManager) Reload() error {
return nil
}

func (wm *WatchManager) Stop() {
if wm.cancel != nil {
wm.cancel()
}
}

func (wm *WatchManager) Run() {
go wm.worker()

for {
select {
case <-wm.ctx.Done():
wm.drainPendingTimers()
close(wm.importQueue)
return
case event := <-wm.fileWatcher.Events:
if event.Op&fsnotify.Write == fsnotify.Write {
if event.Name == wm.configPath {
fmt.Println("[INFO] Reloading config...")
wm.lock.Lock()
err := wm.Reload()
wm.lock.Unlock()
if err != nil {
errors.CheckError(err)
}
} else {
wm.lock.Lock()
entry, exists := wm.watchEntries[event.Name]
wm.lock.Unlock()
if exists {
go TriggerImport(entry)
}
}
}
wm.handleEvent(event)
case err := <-wm.fileWatcher.Errors:
log.Printf("[ERROR] Watcher error: %v", err)
}
}
}

func (wm *WatchManager) handleEvent(event fsnotify.Event) {
if event.Op&(fsnotify.Write|fsnotify.Create|fsnotify.Rename) == 0 {
return
}

if event.Name == wm.configPath {
if event.Op&fsnotify.Write == fsnotify.Write {
fmt.Println("[INFO] Reloading config...")
wm.lock.Lock()
err := wm.Reload()
wm.lock.Unlock()
if err != nil {
errors.CheckError(err)
}
}
return
}

wm.lock.Lock()
entry, exists := wm.watchEntries[event.Name]
wm.lock.Unlock()

if !exists {
return
}

if event.Op&(fsnotify.Create|fsnotify.Rename) != 0 {
wm.fileWatcher.Remove(event.Name)
if err := wm.fileWatcher.Add(event.Name); err != nil {
log.Printf("[WARN] Cannot re-watch file %s: %v", event.Name, err)
}
}

wm.debounce(event.Name, entry)
}

func (wm *WatchManager) debounce(path string, entry config.WatchEntry) {
wm.lock.Lock()
defer wm.lock.Unlock()

if t, ok := wm.pending[path]; ok {
t.Stop()
delete(wm.pending, path)
}

wm.pending[path] = time.AfterFunc(debounceInterval, func() {
wm.lock.Lock()
delete(wm.pending, path)
wm.lock.Unlock()

select {
case <-wm.ctx.Done():
return
case wm.importQueue <- entry:
}
})
}

func (wm *WatchManager) drainPendingTimers() {
wm.lock.Lock()
defer wm.lock.Unlock()

for path, t := range wm.pending {
t.Stop()
delete(wm.pending, path)
}
}

func (wm *WatchManager) worker() {
for entry := range wm.importQueue {
wm.triggerFunc(wm.ctx, entry)
}
}
Loading
Loading