From ed8a2e36dc19c50b2072c6d3182fbe0659237590 Mon Sep 17 00:00:00 2001 From: sergeyb Date: Fri, 12 Jun 2026 19:07:12 +0000 Subject: [PATCH] Pass app-lifetime context to controller, orchestrator, repomanager MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The async cache-write goroutine in GetChangedTargets used context.Background(), so it had no way to honor server shutdown — a rolling restart could leak in-flight cache writes past the point where storage was being torn down. Streaming handlers also only watched the gRPC stream context, so an in-flight request kept running until the client disconnected even after the server began shutting down. Make all three top-level constructors (NewController, NewNativeOrchestrator, NewRepoManager) accept an appCtx context.Context as their first argument. The controller routes its async cache write through appCtx; the orchestrator and repomanager store it for future fire-and-forget goroutines, with comments documenting the convention. Add a controller.linkRequestCtx(reqCtx) helper that returns a context cancelled by either the request's stream context (client disconnect) or the controller's appCtx (server shutdown), backed by context.AfterFunc so the watcher handle is released on the request's defer. Each streaming handler now derives its working context from this helper and threads it through downstream calls. example/main.go wires appCtx to signal.NotifyContext on SIGINT/SIGTERM and propagates it to all three constructors, so background work and in-flight requests are cancelled at process shutdown rather than left to leak. --- controller/BUILD.bazel | 1 + controller/controller.go | 34 +++++- controller/controller_test.go | 103 ++++++++++++++++++ controller/getchangedtargets.go | 22 ++-- controller/getchangedtargets_test.go | 129 +++++++++++++++++++++-- controller/gettargetgraph.go | 3 +- controller/gettargetgraph_test.go | 22 ++-- controller/testhelper_test.go | 3 + core/repomanager/repo_manager.go | 18 +++- core/repomanager/repo_manager_test.go | 22 ++-- example/main.go | 22 ++-- orchestrator/native_orchestrator.go | 18 +++- orchestrator/native_orchestrator_test.go | 8 +- 13 files changed, 350 insertions(+), 55 deletions(-) create mode 100644 controller/controller_test.go diff --git a/controller/BUILD.bazel b/controller/BUILD.bazel index 6335f2f..581ba3b 100644 --- a/controller/BUILD.bazel +++ b/controller/BUILD.bazel @@ -28,6 +28,7 @@ go_library( go_test( name = "controller_test", srcs = [ + "controller_test.go", "distance_filter_test.go", "getchangedtargets_test.go", "gettargetgraph_test.go", diff --git a/controller/controller.go b/controller/controller.go index 5cacf81..34ca33c 100644 --- a/controller/controller.go +++ b/controller/controller.go @@ -15,6 +15,7 @@ package controller import ( + "context" "time" "github.com/uber-go/tally" @@ -49,10 +50,16 @@ type controller struct { changedTargetChunkSize int metadataMapChunkSize int totalDurationBuckets tally.Buckets + + // appCtx is the application lifetime; cancel it on process shutdown. + // Used by linkRequestCtx and any fire-and-forget goroutines so they + // abort instead of leaking past server teardown. + appCtx context.Context } -// NewController creates a new controller. -func NewController(p Params) pb.TangoYARPCServer { +// NewController creates a new controller. appCtx is cancelled on process +// shutdown to abort background work. +func NewController(appCtx context.Context, p Params) pb.TangoYARPCServer { scope := p.Scope if scope == nil { scope = tally.NoopScope @@ -78,5 +85,28 @@ func NewController(p Params) pb.TangoYARPCServer { changedTargetChunkSize: changedTargetChunkSize, metadataMapChunkSize: metadataMapChunkSize, totalDurationBuckets: _totalDurationBuckets, + appCtx: appCtx, + } +} + +// linkRequestCtx returns a context derived from reqCtx that is also cancelled +// when c.appCtx is cancelled. Use it at the top of every streaming handler +// (and pass the returned context to all downstream calls) so a request is +// aborted both by the client disconnecting (reqCtx) and by the server +// beginning to shut down (appCtx). +// +// The returned cancel function MUST be deferred; it releases the +// context.AfterFunc handle so we do not leak a watcher past the request. +func (c *controller) linkRequestCtx(reqCtx context.Context) (context.Context, context.CancelFunc) { + // Derive a per-request ctx whose cancel only affects this ctx and its + // children — it never propagates up to reqCtx. + ctx, cancel := context.WithCancel(reqCtx) + // Register a one-shot watcher that cancels the derived ctx if appCtx fires. + // AfterFunc only observes appCtx; it never cancels it. stop() deregisters + // the watcher so the closure is not retained past the request. + stop := context.AfterFunc(c.appCtx, cancel) + return ctx, func() { + stop() + cancel() } } diff --git a/controller/controller_test.go b/controller/controller_test.go new file mode 100644 index 0000000..b891f87 --- /dev/null +++ b/controller/controller_test.go @@ -0,0 +1,103 @@ +// Copyright (c) 2026 Uber Technologies, Inc. +// +// Licensed under the Apache License, Version 2.0 (the "License"); +// you may not use this file except in compliance with the License. +// You may obtain a copy of the License at +// +// http://www.apache.org/licenses/LICENSE-2.0 +// +// Unless required by applicable law or agreed to in writing, software +// distributed under the License is distributed on an "AS IS" BASIS, +// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. +// See the License for the specific language governing permissions and +// limitations under the License. + +package controller + +import ( + "context" + "testing" + + "github.com/stretchr/testify/assert" + orchestratormock "github.com/uber/tango/orchestrator/orchestratormock" + "go.uber.org/mock/gomock" + "go.uber.org/zap" +) + +// TestNewController_StoresAppContext verifies the caller-supplied context is +// retained and is the one observed by background goroutines. +func TestNewController_StoresAppContext(t *testing.T) { + ctrl := gomock.NewController(t) + appCtx, cancel := context.WithCancel(context.Background()) + defer cancel() + + c := NewController(appCtx, Params{ + Logger: zap.NewNop(), + Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), + }).(*controller) + + assert.Same(t, appCtx, c.appCtx) + assert.NoError(t, c.appCtx.Err()) + + cancel() + assert.ErrorIs(t, c.appCtx.Err(), context.Canceled) +} + +// TestLinkRequestCtx_CancelsOnAppCtx verifies that the linked context is +// cancelled when the controller's appCtx is cancelled, even if the request +// context is still live. +func TestLinkRequestCtx_CancelsOnAppCtx(t *testing.T) { + appCtx, cancelApp := context.WithCancel(context.Background()) + defer cancelApp() + c := &controller{appCtx: appCtx} + + reqCtx, cancelReq := context.WithCancel(context.Background()) + defer cancelReq() + + linked, cancelLink := c.linkRequestCtx(reqCtx) + defer cancelLink() + assert.NoError(t, linked.Err()) + + cancelApp() + <-linked.Done() + assert.ErrorIs(t, linked.Err(), context.Canceled) + assert.NoError(t, reqCtx.Err(), "linkRequestCtx must not cancel the request ctx") +} + +// TestLinkRequestCtx_CancelsOnRequestCtx verifies that cancellation of the +// request context propagates to the linked context. +func TestLinkRequestCtx_CancelsOnRequestCtx(t *testing.T) { + appCtx, cancelApp := context.WithCancel(context.Background()) + defer cancelApp() + c := &controller{appCtx: appCtx} + + reqCtx, cancelReq := context.WithCancel(context.Background()) + linked, cancelLink := c.linkRequestCtx(reqCtx) + defer cancelLink() + assert.NoError(t, linked.Err()) + + cancelReq() + <-linked.Done() + assert.ErrorIs(t, linked.Err(), context.Canceled) + assert.NoError(t, appCtx.Err(), "linkRequestCtx must not cancel the app ctx") +} + +// TestLinkRequestCtx_CancelReleasesAfterFunc verifies that calling the returned +// cancel func stops the appCtx watcher so cancelling appCtx afterwards does +// not affect the now-detached linked context. +func TestLinkRequestCtx_CancelReleasesAfterFunc(t *testing.T) { + appCtx, cancelApp := context.WithCancel(context.Background()) + defer cancelApp() + c := &controller{appCtx: appCtx} + + linked, cancelLink := c.linkRequestCtx(context.Background()) + cancelLink() + <-linked.Done() + firstErr := linked.Err() + assert.ErrorIs(t, firstErr, context.Canceled) + + // Cancelling appCtx after the linked ctx is already cancelled must be a + // no-op (the AfterFunc handle should have been released by cancelLink). + cancelApp() + assert.Equal(t, firstErr, linked.Err(), "linked.Err() must not change after cancelLink") +} diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index 55b9cbf..4374779 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -56,7 +56,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err) } scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())}) - ctx := stream.Context() + ctx, cancelLink := c.linkRequestCtx(stream.Context()) + defer cancelLink() start := time.Now() logger := c.logger.With( zap.Any("first_revision", request.GetFirstRevision()), @@ -272,17 +273,18 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str // during computation. Both the goroutine and the send loop below only read // changedTargetsResponses, so concurrent access is safe. go func() { - // Detach cancellation so the cache write survives client disconnect, - // but preserve request values (tracing/identity) by deriving from ctx. - // Per-operation deadlines are the storage backend's responsibility — - // the controller is backend-agnostic and must not encode any one - // implementation's I/O budget. - cacheCtx := context.WithoutCancel(ctx) - treehash1 := readTreehash(cacheCtx, c.storage, logger, request.GetFirstRevision()) - treehash2 := readTreehash(cacheCtx, c.storage, logger, request.GetSecondRevision()) + // Use c.appCtx directly: the cache write is fire-and-forget and must + // outlive the request (so a client disconnect doesn't abort it) but + // must NOT outlive the server (so it doesn't leak past shutdown). + // c.appCtx fits both: it's never cancelled by client disconnect and + // is cancelled on shutdown. Per-operation deadlines are the storage + // backend's responsibility — the controller is backend-agnostic and + // must not encode any one implementation's I/O budget. + treehash1 := readTreehash(c.appCtx, c.storage, logger, request.GetFirstRevision()) + treehash2 := readTreehash(c.appCtx, c.storage, logger, request.GetSecondRevision()) if treehash1 != "" && treehash2 != "" { cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions()) - if writeErr := storage.WriteChangedTargetsStream(cacheCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil { + if writeErr := storage.WriteChangedTargetsStream(c.appCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil { logger.Warn("GetChangedTargets: Failed to cache result", zap.Error(writeErr)) } } else { diff --git a/controller/getchangedtargets_test.go b/controller/getchangedtargets_test.go index f72bef0..f058a88 100644 --- a/controller/getchangedtargets_test.go +++ b/controller/getchangedtargets_test.go @@ -154,7 +154,7 @@ func TestGetChangedTargets_ValidationError(t *testing.T) { ctrl := gomock.NewController(t) stream := tangomock.NewMockTangoServiceGetChangedTargetsYARPCServer(ctrl) - c := NewController(Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)}) + c := NewController(context.Background(), Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)}) err := c.GetChangedTargets(nil, stream) assert.EqualError(t, err, "request cannot be nil") @@ -195,7 +195,7 @@ func TestGetChangedTargets_CacheHit(t *testing.T) { stream.EXPECT().Send(gomock.Any()).Return(nil).Times(2) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), @@ -223,7 +223,7 @@ func TestGetChangedTargets_GetGraphError(t *testing.T) { storagemock.EXPECT().Get(gomock.Any(), gomock.Any()). Return(nil, errors.New("graph error")).Times(4) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zap.NewNop(), Storage: storagemock, Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), @@ -268,7 +268,7 @@ func TestGetChangedTargets_StreamSendError(t *testing.T) { return nil }) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), @@ -375,7 +375,7 @@ func TestGetChangedTargets_streamChunks(t *testing.T) { return nil }) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), @@ -410,6 +410,123 @@ func TestGetChangedTargets_streamChunks(t *testing.T) { assert.Equal(t, "//app:target2", metadata.GetTargetIdMapping()[targetID]) } +// TestGetChangedTargets_CacheWriteUsesAppCtx verifies the cache-write +// goroutine passes c.appCtx to storage (so a client disconnect does not abort +// the write, but server shutdown does). Drives a successful GetChangedTargets +// pipeline so the goroutine runs, captures the context the storage backend +// sees inside Put, and asserts each cancellation source independently. +func TestGetChangedTargets_CacheWriteUsesAppCtx(t *testing.T) { + ctrl := gomock.NewController(t) + stream := tangomock.NewMockTangoServiceGetChangedTargetsYARPCServer(ctrl) + reqCtx, cancelReq := context.WithCancel(context.Background()) + defer cancelReq() + stream.EXPECT().Context().Return(reqCtx) + stream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes() + + storagemock := storagemock.NewMockStorage(ctrl) + + // Minimal single-chunk graph so the comparison succeeds and the cache + // goroutine runs. Both revisions share the same target so there are no + // diffs to send beyond the metadata chunk. + var graphBuf bytes.Buffer + w := gogio.NewDelimitedWriter(&graphBuf) + w.WriteMsg(&pb.GetTargetGraphResponse{ + Item: &pb.GetTargetGraphResponse_Targets{ + Targets: &pb.OptimizedTargets{ + Targets: []*pb.OptimizedTarget{{Id: 1, Hash: "h1", RuleType: 100}}, + }, + }, + }) + w.WriteMsg(&pb.GetTargetGraphResponse{ + Item: &pb.GetTargetGraphResponse_Metadata{ + Metadata: &pb.Metadata{ + TargetIdMapping: map[int32]string{1: "//app:t1"}, + RuleTypeMapping: map[int32]string{100: "go_library"}, + }, + }, + }) + graphBytes := graphBuf.Bytes() + + storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn( + func(_ context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) { + switch { + case strings.Contains(req.Key, "compared-targets"): + return nil, &storage.NotFoundError{Path: req.Key} + case strings.Contains(req.Key, "sha1"): + return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil + case strings.Contains(req.Key, "sha2"): + return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil + case strings.Contains(req.Key, "treehash"): + return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graphBytes))}, nil + default: + return nil, fmt.Errorf("unexpected key: %s", req.Key) + } + }).AnyTimes() + + // Put captures the context the cache-write goroutine passes to storage + // and blocks until that context is cancelled, mimicking a slow backend. + cacheCtxCh := make(chan context.Context, 1) + storagemock.EXPECT().Put(gomock.Any(), gomock.Any()).DoAndReturn( + func(ctx context.Context, _ storage.UploadRequest) error { + cacheCtxCh <- ctx + <-ctx.Done() + return ctx.Err() + }) + + appCtx, cancelApp := context.WithCancel(context.Background()) + defer cancelApp() + c := NewController(appCtx, Params{ + Logger: zaptest.NewLogger(t), + Storage: storagemock, + Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), + }) + + request := &pb.GetChangedTargetsRequest{ + FirstRevision: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha1"}, + SecondRevision: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha2"}, + OutputConfig: &pb.OutputConfig{MaxDistance: -1}, + } + + handlerDone := make(chan error, 1) + go func() { handlerDone <- c.GetChangedTargets(request, stream) }() + + var cacheCtx context.Context + select { + case cacheCtx = <-cacheCtxCh: + case <-time.After(2 * time.Second): + t.Fatal("cache-write goroutine never reached storage.Put") + } + + // Handler should be free to return regardless of the still-running + // goroutine — that is the whole point of the detached cache write. + select { + case err := <-handlerDone: + require.NoError(t, err) + case <-time.After(time.Second): + t.Fatal("GetChangedTargets did not return while cache write was in flight") + } + + // Cancelling the request ctx (client disconnect) must NOT cancel the + // cache write. Give the cancellation a beat to propagate if it were + // going to. + cancelReq() + select { + case <-cacheCtx.Done(): + t.Fatal("cache-write ctx was cancelled by request ctx; should only follow appCtx") + case <-time.After(50 * time.Millisecond): + } + + // Cancelling appCtx (server shutdown) MUST cancel the cache write so + // the goroutine doesn't outlive the process. + cancelApp() + select { + case <-cacheCtx.Done(): + assert.ErrorIs(t, cacheCtx.Err(), context.Canceled) + case <-time.After(time.Second): + t.Fatal("cache-write ctx was not cancelled by appCtx") + } +} + func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) { c := newTestController(zaptest.NewLogger(t)) @@ -994,7 +1111,7 @@ func TestGetChangedTargets_CacheHitWithDistanceFilter(t *testing.T) { return nil }).Times(2) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, Orchestrator: orchestratormock.NewMockOrchestrator(ctrl), diff --git a/controller/gettargetgraph.go b/controller/gettargetgraph.go index b1b6516..ff853c8 100644 --- a/controller/gettargetgraph.go +++ b/controller/gettargetgraph.go @@ -42,7 +42,8 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb } }() start := time.Now() - ctx := stream.Context() + ctx, cancelLink := c.linkRequestCtx(stream.Context()) + defer cancelLink() logger := c.logger.With( zap.Any("build_description", request.GetBuildDescription()), ) diff --git a/controller/gettargetgraph_test.go b/controller/gettargetgraph_test.go index 218f805..c614f87 100644 --- a/controller/gettargetgraph_test.go +++ b/controller/gettargetgraph_test.go @@ -45,7 +45,7 @@ func TestGetTargetGraph_CacheMiss_NoSend(t *testing.T) { store.EXPECT().Get(gomock.Any(), gomock.Any()). Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(nil)}, nil), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -70,7 +70,7 @@ func TestGetTargetGraph_StorageError_Propagates(t *testing.T) { stream.EXPECT().Context().Return(context.Background()) storagemock := storagemock.NewMockStorage(ctrl) storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, expected) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, }) @@ -96,7 +96,7 @@ func TestGetTargetGraph_DecodeError_ReturnsError(t *testing.T) { storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("bad-bytes"))}, nil), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, }) @@ -129,7 +129,7 @@ func TestGetTargetGraph_SendsWhenItemPresent(t *testing.T) { store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-xyz"))}, nil), store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -151,7 +151,7 @@ func TestGetTargetGraph_BuildDescriptionMissingRequiredFields_ReturnsError(t *te stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) stream.EXPECT().Context().Return(context.Background()) store := storagemock.NewMockStorage(ctrl) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -172,7 +172,7 @@ func TestGetTargetGraph_MissingBuildDescription_ReturnsError(t *testing.T) { stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) stream.EXPECT().Context().Return(context.Background()) store := storagemock.NewMockStorage(ctrl) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -201,7 +201,7 @@ func TestGetTargetGraph_TreehashNotFound_NoError(t *testing.T) { graphReader.EXPECT().Read().Return(nil, io.EOF).Times(1) graphReader.EXPECT().Close().Return(nil) orchestrator.EXPECT().GetTargetGraph(gomock.Any(), gomock.Any()).Return(graphReader, nil) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, Orchestrator: orchestrator, @@ -219,7 +219,7 @@ func TestGetTargetGraph_TreehashReadError(t *testing.T) { stream.EXPECT().Context().Return(context.Background()) store := storagemock.NewMockStorage(ctrl) store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -239,7 +239,7 @@ func TestGetTargetGraph_GraphFetchError(t *testing.T) { store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, errors.New("graph error")), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -259,7 +259,7 @@ func TestGetTargetGraph_GraphReadError(t *testing.T) { store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: store, }) @@ -287,7 +287,7 @@ func TestGetTargetGraph_StreamSendError(t *testing.T) { storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil), ) - c := NewController(Params{ + c := NewController(context.Background(), Params{ Logger: zaptest.NewLogger(t), Storage: storagemock, }) diff --git a/controller/testhelper_test.go b/controller/testhelper_test.go index 448a6e8..56ff439 100644 --- a/controller/testhelper_test.go +++ b/controller/testhelper_test.go @@ -15,6 +15,8 @@ package controller import ( + "context" + "github.com/uber-go/tally" "github.com/uber/tango/core/common" "go.uber.org/zap" @@ -28,5 +30,6 @@ func newTestController(logger *zap.Logger) *controller { changedTargetChunkSize: common.DefaultChangedTargetChunkSize, metadataMapChunkSize: common.DefaultMetadataMapChunkSize, totalDurationBuckets: _totalDurationBuckets, + appCtx: context.Background(), } } diff --git a/core/repomanager/repo_manager.go b/core/repomanager/repo_manager.go index 2aea031..2bd5f60 100644 --- a/core/repomanager/repo_manager.go +++ b/core/repomanager/repo_manager.go @@ -42,6 +42,17 @@ type repoManager struct { mu sync.Mutex pools map[string]*workerPool + + // appCtx represents the app's overall lifetime. It is passed in by the + // caller at construction and is expected to be cancelled when the whole + // application is shutting down (e.g. on SIGTERM/SIGINT). Any future + // fire-and-forget goroutines this manager starts should use this context + // instead of context.Background() so they abort promptly on shutdown + // rather than running unbounded past server teardown. + // + // Per-request cancellation should still use the request's own context; + // appCtx is only for work that intentionally outlives the request. + appCtx context.Context } // workerPool manages a fixed set of worker slots for a single repo. @@ -71,7 +82,11 @@ type Params struct { } // NewRepoManager creates a new repo manager with pooled worker workspaces. -func NewRepoManager(p Params) RepoManager { +// +// appCtx is the application-lifetime context. Cancel it when the process is +// shutting down (e.g. wire it to SIGTERM/SIGINT in main) to abort any +// background goroutines the manager spawns. +func NewRepoManager(appCtx context.Context, p Params) RepoManager { return &repoManager{ git: p.Git, repoManagerClonePath: p.RepoManagerClonePath, @@ -79,6 +94,7 @@ func NewRepoManager(p Params) RepoManager { logger: p.Logger, poolSize: p.PoolSize, pools: make(map[string]*workerPool), + appCtx: appCtx, } } diff --git a/core/repomanager/repo_manager_test.go b/core/repomanager/repo_manager_test.go index 1da9cca..1d0b951 100644 --- a/core/repomanager/repo_manager_test.go +++ b/core/repomanager/repo_manager_test.go @@ -43,7 +43,7 @@ func TestLease_ClonesOriginAndCreatesWorker(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Equal(t, workerDir, ws.Path()) @@ -65,7 +65,7 @@ func TestLease_SkipsOriginClone_WhenExists(t *testing.T) { // Only worker clone expected g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Equal(t, workerDir, ws.Path()) @@ -86,7 +86,7 @@ func TestLease_ReusesWorker_AfterRelease(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -115,7 +115,7 @@ func TestLease_CreatesMultipleWorkers(t *testing.T) { g.EXPECT().Clone(gomock.Any(), originDir, dir, "--local", "-c", "gc.auto=0").Return(nil) } - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 2}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 2}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -141,7 +141,7 @@ func TestLease_BlocksUntilReturn(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() ws1, err := rm.Lease(ctx, tangopb.BuildDescription{Remote: remote}) @@ -187,7 +187,7 @@ func TestLease_CtxCanceled(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws1, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) @@ -209,7 +209,7 @@ func TestLease_OriginCloneFails(t *testing.T) { remote := "git@github.com:org/repo" g.EXPECT().Clone(gomock.Any(), remote, filepath.Join(root, "org/repo"), "-c", "gc.auto=0").Return(assert.AnError) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) _, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.Error(t, err) assert.Contains(t, err.Error(), "clone origin") @@ -228,7 +228,7 @@ func TestLease_WorkerCloneFails(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote, originDir, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(assert.AnError) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) _, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.Error(t, err) assert.Contains(t, err.Error(), "create worker") @@ -247,7 +247,7 @@ func TestLease_DiscoversExistingWorker(t *testing.T) { require.NoError(t, os.MkdirAll(filepath.Join(root, ".workers", "org/repo", "worker-1", ".git"), 0o755)) // No Clone calls — everything already exists - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ws, err := rm.Lease(context.Background(), tangopb.BuildDescription{Remote: remote}) require.NoError(t, err) assert.Contains(t, ws.Path(), "worker-1") @@ -273,7 +273,7 @@ func TestLease_DifferentRepos_IndependentPools(t *testing.T) { g.EXPECT().Clone(gomock.Any(), remote2, origin2, "-c", "gc.auto=0").Return(nil) g.EXPECT().Clone(gomock.Any(), origin2, worker2, "--local", "-c", "gc.auto=0").Return(nil) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() // Both repos can be leased concurrently even with pool size 1 @@ -306,7 +306,7 @@ func TestLease_WorkerCloneFails_SlotReturnedToPool(t *testing.T) { g.EXPECT().Clone(gomock.Any(), originDir, workerDir, "--local", "-c", "gc.auto=0").Return(nil), ) - rm := NewRepoManager(Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) + rm := NewRepoManager(context.Background(), Params{Git: g, Logger: zap.NewNop().Sugar(), RepoManagerClonePath: root, WorkerRootPath: filepath.Join(root, ".workers"), PoolSize: 1}) ctx := context.Background() // First attempt fails diff --git a/example/main.go b/example/main.go index b01f437..2f4077a 100644 --- a/example/main.go +++ b/example/main.go @@ -15,6 +15,7 @@ package main import ( + "context" "fmt" "net" "os" @@ -48,6 +49,12 @@ func run() error { defer zl.Sync() logger := zl.Sugar() + // appCtx is the application-lifetime context. It is cancelled on + // SIGINT/SIGTERM so background work (e.g. the controller's async + // cache write) is aborted instead of leaking past process exit. + appCtx, stopSignals := signal.NotifyContext(context.Background(), syscall.SIGINT, syscall.SIGTERM) + defer stopSignals() + configFilePath := filepath.Join("example", "tango-config.yaml") cfg, err := config.Parse(configFilePath) if err != nil { @@ -72,14 +79,14 @@ func run() error { } defer os.RemoveAll(workerRootPath) - rm := repomanager.NewRepoManager(repomanager.Params{ + rm := repomanager.NewRepoManager(appCtx, repomanager.Params{ Git: git.New(repoManagerClonePath), Logger: logger, RepoManagerClonePath: repoManagerClonePath, WorkerRootPath: workerRootPath, PoolSize: cfg.Service.WorkerPoolSize, }) - orch := orchestrator.NewNativeOrchestrator(orchestrator.Params{ + orch := orchestrator.NewNativeOrchestrator(appCtx, orchestrator.Params{ Storage: store, RepoManager: rm, Logger: logger, @@ -87,8 +94,9 @@ func run() error { ConfigFilePath: configFilePath, }) - // Controller (YARPC server implementation) - ctrl := controller.NewController(controller.Params{ + // Controller (YARPC server implementation). appCtx is forwarded so the + // controller's background goroutines are tied to process lifetime. + ctrl := controller.NewController(appCtx, controller.Params{ Logger: zl, Storage: store, Orchestrator: orch, @@ -119,10 +127,8 @@ func run() error { logger.Infof("Tango server is running:") logger.Infof("- gRPC inbound: %s", port) logger.Infof("Press Ctrl+C to stop.") - // Wait for interrupt - sigCh := make(chan os.Signal, 1) - signal.Notify(sigCh, syscall.SIGINT, syscall.SIGTERM) - <-sigCh + // Block until SIGINT/SIGTERM cancels appCtx. + <-appCtx.Done() return nil } diff --git a/orchestrator/native_orchestrator.go b/orchestrator/native_orchestrator.go index b384dfe..e3741ed 100644 --- a/orchestrator/native_orchestrator.go +++ b/orchestrator/native_orchestrator.go @@ -45,6 +45,17 @@ type nativeOrchestrator struct { gitFactory func(directory string) git.Interface graphRunner graphrunner.GraphRunner configFilePath string + + // appCtx represents the app's overall lifetime. It is passed in by the + // caller at construction and is expected to be cancelled when the whole + // application is shutting down (e.g. on SIGTERM/SIGINT). Any future + // fire-and-forget goroutines this orchestrator starts should use this + // context instead of context.Background() so they abort promptly on + // shutdown rather than running unbounded past server teardown. + // + // Per-request cancellation should still use the request's own context; + // appCtx is only for work that intentionally outlives the request. + appCtx context.Context } type Params struct { @@ -58,7 +69,11 @@ type Params struct { } // NewNativeOrchestrator creates a new native orchestrator with the given parameters. -func NewNativeOrchestrator(p Params) Orchestrator { +// +// appCtx is the application-lifetime context. Cancel it when the process is +// shutting down (e.g. wire it to SIGTERM/SIGINT in main) to abort any +// background goroutines the orchestrator spawns. +func NewNativeOrchestrator(appCtx context.Context, p Params) Orchestrator { scope := p.Scope if scope == nil { scope = tally.NoopScope @@ -71,6 +86,7 @@ func NewNativeOrchestrator(p Params) Orchestrator { gitFactory: p.GitFactory, graphRunner: p.GraphRunner, configFilePath: p.ConfigFilePath, + appCtx: appCtx, } } diff --git a/orchestrator/native_orchestrator_test.go b/orchestrator/native_orchestrator_test.go index 6ee3601..257b6e8 100644 --- a/orchestrator/native_orchestrator_test.go +++ b/orchestrator/native_orchestrator_test.go @@ -63,7 +63,7 @@ func TestNative_GetTargetGraph_Success(t *testing.T) { rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(Params{ + o := NewNativeOrchestrator(context.Background(), Params{ Storage: st, RepoManager: rm, Logger: zaptest.NewLogger(t).Sugar(), @@ -119,7 +119,7 @@ func TestNative_GetTargetGraph_TreehashNotFound_NoError(t *testing.T) { RuleType: "go_library", }, }}, nil) - o := NewNativeOrchestrator(Params{ + o := NewNativeOrchestrator(context.Background(), Params{ Storage: st, RepoManager: rm, Logger: zaptest.NewLogger(t).Sugar(), @@ -151,7 +151,7 @@ func TestNative_GetTargetGraph_RevParseError_Propagates(t *testing.T) { ws.EXPECT().Release().Return(nil) rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(Params{ + o := NewNativeOrchestrator(context.Background(), Params{ Storage: st, RepoManager: rm, Logger: zaptest.NewLogger(t).Sugar(), @@ -190,7 +190,7 @@ func TestNative_GetTargetGraph_AppliesGitHubPR(t *testing.T) { ws.EXPECT().Release().Return(nil) rm := repomanagermock.NewMockRepoManager(ctrl) rm.EXPECT().Lease(gomock.Any(), gomock.Any()).Return(ws, nil) - o := NewNativeOrchestrator(Params{ + o := NewNativeOrchestrator(context.Background(), Params{ Storage: st, RepoManager: rm, Logger: zaptest.NewLogger(t).Sugar(),