Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions controller/BUILD.bazel
Original file line number Diff line number Diff line change
Expand Up @@ -28,6 +28,7 @@ go_library(
go_test(
name = "controller_test",
srcs = [
"controller_test.go",
"distance_filter_test.go",
"getchangedtargets_test.go",
"gettargetgraph_test.go",
Expand Down
34 changes: 32 additions & 2 deletions controller/controller.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package controller

import (
"context"
"time"

"github.com/uber-go/tally"
Expand Down Expand Up @@ -49,10 +50,16 @@ type controller struct {
changedTargetChunkSize int
metadataMapChunkSize int
totalDurationBuckets tally.Buckets

// appCtx is the application lifetime; cancel it on process shutdown.
// Used by linkRequestCtx and any fire-and-forget goroutines so they
// abort instead of leaking past server teardown.
appCtx context.Context
}

// NewController creates a new controller.
func NewController(p Params) pb.TangoYARPCServer {
// NewController creates a new controller. appCtx is cancelled on process
// shutdown to abort background work.
func NewController(appCtx context.Context, p Params) pb.TangoYARPCServer {
scope := p.Scope
if scope == nil {
scope = tally.NoopScope
Expand All @@ -78,5 +85,28 @@ func NewController(p Params) pb.TangoYARPCServer {
changedTargetChunkSize: changedTargetChunkSize,
metadataMapChunkSize: metadataMapChunkSize,
totalDurationBuckets: _totalDurationBuckets,
appCtx: appCtx,
}
}

// linkRequestCtx returns a context derived from reqCtx that is also cancelled
// when c.appCtx is cancelled. Use it at the top of every streaming handler
// (and pass the returned context to all downstream calls) so a request is
// aborted both by the client disconnecting (reqCtx) and by the server
// beginning to shut down (appCtx).
//
// The returned cancel function MUST be deferred; it releases the
// context.AfterFunc handle so we do not leak a watcher past the request.
func (c *controller) linkRequestCtx(reqCtx context.Context) (context.Context, context.CancelFunc) {
// Derive a per-request ctx whose cancel only affects this ctx and its
// children — it never propagates up to reqCtx.
ctx, cancel := context.WithCancel(reqCtx)
// Register a one-shot watcher that cancels the derived ctx if appCtx fires.
// AfterFunc only observes appCtx; it never cancels it. stop() deregisters
// the watcher so the closure is not retained past the request.
stop := context.AfterFunc(c.appCtx, cancel)
return ctx, func() {
stop()
cancel()
}
}
103 changes: 103 additions & 0 deletions controller/controller_test.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,103 @@
// Copyright (c) 2026 Uber Technologies, Inc.
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package controller

import (
"context"
"testing"

"github.com/stretchr/testify/assert"
orchestratormock "github.com/uber/tango/orchestrator/orchestratormock"
"go.uber.org/mock/gomock"
"go.uber.org/zap"
)

// TestNewController_StoresAppContext verifies the caller-supplied context is
// retained and is the one observed by background goroutines.
func TestNewController_StoresAppContext(t *testing.T) {
ctrl := gomock.NewController(t)
appCtx, cancel := context.WithCancel(context.Background())
defer cancel()

c := NewController(appCtx, Params{
Logger: zap.NewNop(),
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
}).(*controller)

assert.Same(t, appCtx, c.appCtx)
assert.NoError(t, c.appCtx.Err())

cancel()
assert.ErrorIs(t, c.appCtx.Err(), context.Canceled)
}

// TestLinkRequestCtx_CancelsOnAppCtx verifies that the linked context is
// cancelled when the controller's appCtx is cancelled, even if the request
// context is still live.
func TestLinkRequestCtx_CancelsOnAppCtx(t *testing.T) {
appCtx, cancelApp := context.WithCancel(context.Background())
defer cancelApp()
c := &controller{appCtx: appCtx}

reqCtx, cancelReq := context.WithCancel(context.Background())
defer cancelReq()

linked, cancelLink := c.linkRequestCtx(reqCtx)
defer cancelLink()
assert.NoError(t, linked.Err())

cancelApp()
<-linked.Done()
assert.ErrorIs(t, linked.Err(), context.Canceled)
assert.NoError(t, reqCtx.Err(), "linkRequestCtx must not cancel the request ctx")
}

// TestLinkRequestCtx_CancelsOnRequestCtx verifies that cancellation of the
// request context propagates to the linked context.
func TestLinkRequestCtx_CancelsOnRequestCtx(t *testing.T) {
appCtx, cancelApp := context.WithCancel(context.Background())
defer cancelApp()
c := &controller{appCtx: appCtx}

reqCtx, cancelReq := context.WithCancel(context.Background())
linked, cancelLink := c.linkRequestCtx(reqCtx)
defer cancelLink()
assert.NoError(t, linked.Err())

cancelReq()
<-linked.Done()
assert.ErrorIs(t, linked.Err(), context.Canceled)
assert.NoError(t, appCtx.Err(), "linkRequestCtx must not cancel the app ctx")
}

// TestLinkRequestCtx_CancelReleasesAfterFunc verifies that calling the returned
// cancel func stops the appCtx watcher so cancelling appCtx afterwards does
// not affect the now-detached linked context.
func TestLinkRequestCtx_CancelReleasesAfterFunc(t *testing.T) {
appCtx, cancelApp := context.WithCancel(context.Background())
defer cancelApp()
c := &controller{appCtx: appCtx}

linked, cancelLink := c.linkRequestCtx(context.Background())
cancelLink()
<-linked.Done()
firstErr := linked.Err()
assert.ErrorIs(t, firstErr, context.Canceled)

// Cancelling appCtx after the linked ctx is already cancelled must be a
// no-op (the AfterFunc handle should have been released by cancelLink).
cancelApp()
assert.Equal(t, firstErr, linked.Err(), "linked.Err() must not change after cancelLink")
}
22 changes: 12 additions & 10 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,8 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err)
}
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())})
ctx := stream.Context()
ctx, cancelLink := c.linkRequestCtx(stream.Context())
defer cancelLink()
start := time.Now()
logger := c.logger.With(
zap.Any("first_revision", request.GetFirstRevision()),
Expand Down Expand Up @@ -272,17 +273,18 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
// during computation. Both the goroutine and the send loop below only read
// changedTargetsResponses, so concurrent access is safe.
go func() {
// Detach cancellation so the cache write survives client disconnect,
// but preserve request values (tracing/identity) by deriving from ctx.
// Per-operation deadlines are the storage backend's responsibility —
// the controller is backend-agnostic and must not encode any one
// implementation's I/O budget.
cacheCtx := context.WithoutCancel(ctx)
treehash1 := readTreehash(cacheCtx, c.storage, logger, request.GetFirstRevision())
treehash2 := readTreehash(cacheCtx, c.storage, logger, request.GetSecondRevision())
// Use c.appCtx directly: the cache write is fire-and-forget and must
// outlive the request (so a client disconnect doesn't abort it) but
// must NOT outlive the server (so it doesn't leak past shutdown).
// c.appCtx fits both: it's never cancelled by client disconnect and
// is cancelled on shutdown. Per-operation deadlines are the storage
// backend's responsibility — the controller is backend-agnostic and
// must not encode any one implementation's I/O budget.
treehash1 := readTreehash(c.appCtx, c.storage, logger, request.GetFirstRevision())

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

the context here should be detached from existing one, so that when c.appctx is cancelled, the upload still continues on best effort

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

App context is cancelled when application shuts down as it is signalled by the host management platform. SIGKILL is very likely to follow soon after. How should "best effort" look like in this case?

@xytan0056 xytan0056 Jun 18, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I'm thinking best effort of uploading results to survive both client disconnect and app shutdown -- nothing cancels it, upload whatever it can.

Copy link
Copy Markdown
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

there are few severe anti-patterns in this decision as I see of.
https://dave.cheney.net/2016/12/22/never-start-a-goroutine-without-knowing-how-it-will-stop

treehash2 := readTreehash(c.appCtx, c.storage, logger, request.GetSecondRevision())
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
if writeErr := storage.WriteChangedTargetsStream(cacheCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil {
if writeErr := storage.WriteChangedTargetsStream(c.appCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil {
logger.Warn("GetChangedTargets: Failed to cache result", zap.Error(writeErr))
}
} else {
Expand Down
129 changes: 123 additions & 6 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -154,7 +154,7 @@ func TestGetChangedTargets_ValidationError(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetChangedTargetsYARPCServer(ctrl)

c := NewController(Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)})
c := NewController(context.Background(), Params{Logger: zap.NewNop(), Orchestrator: orchestratormock.NewMockOrchestrator(ctrl)})

err := c.GetChangedTargets(nil, stream)
assert.EqualError(t, err, "request cannot be nil")
Expand Down Expand Up @@ -195,7 +195,7 @@ func TestGetChangedTargets_CacheHit(t *testing.T) {

stream.EXPECT().Send(gomock.Any()).Return(nil).Times(2)

c := NewController(Params{
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
Expand Down Expand Up @@ -223,7 +223,7 @@ func TestGetChangedTargets_GetGraphError(t *testing.T) {
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, errors.New("graph error")).Times(4)

c := NewController(Params{
c := NewController(context.Background(), Params{
Logger: zap.NewNop(),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
Expand Down Expand Up @@ -268,7 +268,7 @@ func TestGetChangedTargets_StreamSendError(t *testing.T) {
return nil
})

c := NewController(Params{
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
Expand Down Expand Up @@ -375,7 +375,7 @@ func TestGetChangedTargets_streamChunks(t *testing.T) {
return nil
})

c := NewController(Params{
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
Expand Down Expand Up @@ -410,6 +410,123 @@ func TestGetChangedTargets_streamChunks(t *testing.T) {
assert.Equal(t, "//app:target2", metadata.GetTargetIdMapping()[targetID])
}

// TestGetChangedTargets_CacheWriteUsesAppCtx verifies the cache-write
// goroutine passes c.appCtx to storage (so a client disconnect does not abort
// the write, but server shutdown does). Drives a successful GetChangedTargets
// pipeline so the goroutine runs, captures the context the storage backend
// sees inside Put, and asserts each cancellation source independently.
func TestGetChangedTargets_CacheWriteUsesAppCtx(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetChangedTargetsYARPCServer(ctrl)
reqCtx, cancelReq := context.WithCancel(context.Background())
defer cancelReq()
stream.EXPECT().Context().Return(reqCtx)
stream.EXPECT().Send(gomock.Any()).Return(nil).AnyTimes()

storagemock := storagemock.NewMockStorage(ctrl)

// Minimal single-chunk graph so the comparison succeeds and the cache
// goroutine runs. Both revisions share the same target so there are no
// diffs to send beyond the metadata chunk.
var graphBuf bytes.Buffer
w := gogio.NewDelimitedWriter(&graphBuf)
w.WriteMsg(&pb.GetTargetGraphResponse{
Item: &pb.GetTargetGraphResponse_Targets{
Targets: &pb.OptimizedTargets{
Targets: []*pb.OptimizedTarget{{Id: 1, Hash: "h1", RuleType: 100}},
},
},
})
w.WriteMsg(&pb.GetTargetGraphResponse{
Item: &pb.GetTargetGraphResponse_Metadata{
Metadata: &pb.Metadata{
TargetIdMapping: map[int32]string{1: "//app:t1"},
RuleTypeMapping: map[int32]string{100: "go_library"},
},
},
})
graphBytes := graphBuf.Bytes()

storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) {
switch {
case strings.Contains(req.Key, "compared-targets"):
return nil, &storage.NotFoundError{Path: req.Key}
case strings.Contains(req.Key, "sha1"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil
case strings.Contains(req.Key, "sha2"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil
case strings.Contains(req.Key, "treehash"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graphBytes))}, nil
default:
return nil, fmt.Errorf("unexpected key: %s", req.Key)
}
}).AnyTimes()

// Put captures the context the cache-write goroutine passes to storage
// and blocks until that context is cancelled, mimicking a slow backend.
cacheCtxCh := make(chan context.Context, 1)
storagemock.EXPECT().Put(gomock.Any(), gomock.Any()).DoAndReturn(
func(ctx context.Context, _ storage.UploadRequest) error {
cacheCtxCh <- ctx
<-ctx.Done()
return ctx.Err()
})

appCtx, cancelApp := context.WithCancel(context.Background())
defer cancelApp()
c := NewController(appCtx, Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
})

request := &pb.GetChangedTargetsRequest{
FirstRevision: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha1"},
SecondRevision: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha2"},
OutputConfig: &pb.OutputConfig{MaxDistance: -1},
}

handlerDone := make(chan error, 1)
go func() { handlerDone <- c.GetChangedTargets(request, stream) }()

var cacheCtx context.Context
select {
case cacheCtx = <-cacheCtxCh:
case <-time.After(2 * time.Second):
t.Fatal("cache-write goroutine never reached storage.Put")
}

// Handler should be free to return regardless of the still-running
// goroutine — that is the whole point of the detached cache write.
select {
case err := <-handlerDone:
require.NoError(t, err)
case <-time.After(time.Second):
t.Fatal("GetChangedTargets did not return while cache write was in flight")
}

// Cancelling the request ctx (client disconnect) must NOT cancel the
// cache write. Give the cancellation a beat to propagate if it were
// going to.
cancelReq()
select {
case <-cacheCtx.Done():
t.Fatal("cache-write ctx was cancelled by request ctx; should only follow appCtx")
case <-time.After(50 * time.Millisecond):
}

// Cancelling appCtx (server shutdown) MUST cancel the cache write so
// the goroutine doesn't outlive the process.
cancelApp()
select {
case <-cacheCtx.Done():
assert.ErrorIs(t, cacheCtx.Err(), context.Canceled)
case <-time.After(time.Second):
t.Fatal("cache-write ctx was not cancelled by appCtx")
}
}

func TestCompareTargetGraphs_NewTarget_CanonicalIDs(t *testing.T) {
c := newTestController(zaptest.NewLogger(t))

Expand Down Expand Up @@ -994,7 +1111,7 @@ func TestGetChangedTargets_CacheHitWithDistanceFilter(t *testing.T) {
return nil
}).Times(2)

c := NewController(Params{
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Orchestrator: orchestratormock.NewMockOrchestrator(ctrl),
Expand Down
3 changes: 2 additions & 1 deletion controller/gettargetgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,7 +42,8 @@ func (c *controller) GetTargetGraph(request *pb.GetTargetGraphRequest, stream pb
}
}()
start := time.Now()
ctx := stream.Context()
ctx, cancelLink := c.linkRequestCtx(stream.Context())
defer cancelLink()
logger := c.logger.With(
zap.Any("build_description", request.GetBuildDescription()),
)
Expand Down
Loading
Loading