Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 2 additions & 0 deletions controller/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,6 +33,8 @@ const (
failureReasonSend = "send"
// Diffing two target graphs failed.
failureReasonCompare = "compare"
// Reading a stored treehash from storage failed (not a cache miss).
failureReasonTreehashRead = "treehash_read"
// The caller's context was cancelled before the RPC completed.
failureReasonCancelled = "cancelled"
// The caller's context deadline elapsed before the RPC completed.
Expand Down
57 changes: 40 additions & 17 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -75,12 +75,22 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}

// Try to serve from cache first using the stored treehashes for both revisions.
// readTreehash returns "" on a cache miss (and logs any real storage error) so
// we skip the cache when either treehash is not yet available.
// readTreehash returns ("", nil) on a cache miss (skip cache, recompute) but any
// real storage error surfaces here so an infra failure that disables the cache
// (e.g. a missing-deadline "missing TTL" reject) becomes a visible request failure
// rather than silent degradation.
if !request.GetBypassCache() {
cacheStart := time.Now()
treehash1 := readTreehash(ctx, c.storage, logger, request.GetFirstRevision())
treehash2 := readTreehash(ctx, c.storage, logger, request.GetSecondRevision())
treehash1, err := readTreehash(ctx, c.storage, request.GetFirstRevision())
if err != nil {
logger.Error("GetChangedTargets: Failed to read first revision treehash", zap.Error(err))
return common.WithReason(failureReasonTreehashRead, common.ErrorTypeInfra, err)
}
treehash2, err := readTreehash(ctx, c.storage, request.GetSecondRevision())
if err != nil {
logger.Error("GetChangedTargets: Failed to read second revision treehash", zap.Error(err))
return common.WithReason(failureReasonTreehashRead, common.ErrorTypeInfra, err)
}
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
cachedReader, cacheErr := storage.NewChangedTargetsReader(ctx, c.storage, cacheKey)
Expand Down Expand Up @@ -280,8 +290,19 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
// is cancelled on shutdown. Per-operation deadlines are the storage
// backend's responsibility — the controller is backend-agnostic and
// must not encode any one implementation's I/O budget.
treehash1 := readTreehash(c.appCtx, c.storage, logger, request.GetFirstRevision())
treehash2 := readTreehash(c.appCtx, c.storage, logger, request.GetSecondRevision())
treehash1, err := readTreehash(c.appCtx, c.storage, request.GetFirstRevision())
if err != nil {
// Goroutine outlives the handler so we can't return; log loudly and
// abandon the cache write. Surfacing infra failures matters more than
// a missed cache opportunity.
logger.Error("GetChangedTargets: skipping cache write, failed to read first revision treehash", zap.Error(err))
return
}
treehash2, err := readTreehash(c.appCtx, c.storage, request.GetSecondRevision())
if err != nil {
logger.Error("GetChangedTargets: skipping cache write, failed to read second revision treehash", zap.Error(err))
return
}
if treehash1 != "" && treehash2 != "" {
cacheKey := common.GetComparedTargetsCachePath(request.GetFirstRevision().GetRemote(), treehash1, treehash2, request.GetRequestOptions())
if writeErr := storage.WriteChangedTargetsStream(c.appCtx, c.storage, cacheKey, changedTargetsResponses); writeErr != nil {
Expand Down Expand Up @@ -1053,23 +1074,25 @@ func validateGetChangedTargetsRequest(request *pb.GetChangedTargetsRequest) erro
}

// readTreehash fetches the treehash stored at GetTreehashCachePath for the given build description.
// Returns an empty string on any error or cache miss so callers can treat it as an optional optimistic lookup.
// A genuine cache miss (not-found) is silent; any other storage error is logged so an
// infra failure that disables the cache (e.g. a missing-deadline "missing TTL" reject) is visible.
func readTreehash(ctx context.Context, st storage.Storage, logger *zap.Logger, buildDescription *pb.BuildDescription) string {
// Returns ("", nil) on a cache miss (not-found is the normal "not yet computed" state).
// Returns ("", err) on any other storage or read failure so callers can decide whether to
// surface the error or fall back. Returns (treehash, nil) on a successful read.
func readTreehash(ctx context.Context, st storage.Storage, buildDescription *pb.BuildDescription) (string, error) {
key := common.GetTreehashCachePath(buildDescription)
resp, err := st.Get(ctx, storage.DownloadRequest{Key: key})
if err != nil || resp == nil || resp.ReadCloser == nil {
if err != nil && !storage.IsNotFound(err) {
logger.Warn("readTreehash: treehash read failed", zap.String("key", key), zap.Error(err))
if err != nil {
if storage.IsNotFound(err) {
return "", nil
}
return ""
return "", fmt.Errorf("treehash read failed for key %q: %w", key, err)
}
if resp == nil || resp.ReadCloser == nil {
return "", fmt.Errorf("treehash read returned nil body for key %q", key)
}
defer resp.ReadCloser.Close()
b, err := io.ReadAll(resp.ReadCloser)
if err != nil {
logger.Warn("readTreehash: treehash read body failed", zap.String("key", key), zap.Error(err))
return ""
return "", fmt.Errorf("treehash body read failed for key %q: %w", key, err)
}
return string(b)
return string(b), nil
}
70 changes: 64 additions & 6 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
gogio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/tango/core/common"
"github.com/uber/tango/core/storage"
storagemock "github.com/uber/tango/core/storage/storagemock"
orchestratormock "github.com/uber/tango/orchestrator/orchestratormock"
Expand Down Expand Up @@ -211,17 +212,19 @@ func TestGetChangedTargets_CacheHit(t *testing.T) {
require.NoError(t, err)
}

func TestGetChangedTargets_GetGraphError(t *testing.T) {
func TestGetChangedTargets_TreehashReadError(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetChangedTargetsYARPCServer(ctrl)
stream.EXPECT().Context().Return(context.Background())

storagemock := storagemock.NewMockStorage(ctrl)
// First two Gets are treehash pre-reads (both return error -> treated as cache miss, skip
// comparison cache check). The next two are the goroutine treehash lookups, which also fail
// with a non-NotFound error so getGraph propagates the error for both revisions.
// A non-NotFound storage error on the first treehash read must surface as
// a failed request (with failureReasonTreehashRead) rather than be silently
// treated as a cache miss. The handler returns before any second treehash
// read or graph fetch happens, so this is the only Get expected.
injected := errors.New("storage exploded")
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, errors.New("graph error")).Times(4)
Return(nil, injected).Times(1)

c := NewController(context.Background(), Params{
Logger: zap.NewNop(),
Expand All @@ -236,7 +239,62 @@ func TestGetChangedTargets_GetGraphError(t *testing.T) {
}

err := c.GetChangedTargets(request, stream)
assert.Error(t, err)
require.Error(t, err)
require.ErrorIs(t, err, injected)
var ce common.ClassifiedError
require.True(t, errors.As(err, &ce), "expected ClassifiedError, got %T", err)
assert.Equal(t, failureReasonTreehashRead, ce.Reason())
assert.Equal(t, common.ErrorTypeInfra, ce.Type())
}

func TestReadTreehash(t *testing.T) {
bd := &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha1"}

t.Run("cache miss returns empty and no error", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, &storage.NotFoundError{Path: "missing"})

val, err := readTreehash(context.Background(), st, bd)
require.NoError(t, err)
assert.Empty(t, val)
})

t.Run("storage error surfaces", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
injected := errors.New("infra down")
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, injected)

val, err := readTreehash(context.Background(), st, bd)
require.Error(t, err)
assert.ErrorIs(t, err, injected)
assert.Empty(t, val)
})

t.Run("nil response surfaces as error", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, nil)

val, err := readTreehash(context.Background(), st, bd)
require.Error(t, err)
assert.Empty(t, val)
})

t.Run("success returns treehash", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(strings.NewReader("deadbeef"))}, nil)

val, err := readTreehash(context.Background(), st, bd)
require.NoError(t, err)
assert.Equal(t, "deadbeef", val)
})
}

func TestGetChangedTargets_StreamSendError(t *testing.T) {
Expand Down
Loading