From f222bda70c2c104eaccd54711579d22bcca77b4f Mon Sep 17 00:00:00 2001 From: Justin Won Date: Mon, 22 Jun 2026 17:29:38 -0700 Subject: [PATCH] fix(controller): classify getGraph errors and consolidate failure reasons MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Consolidate shared failure reason constants (cancelled, unknown, storage, validation, deadline_exceeded) into core/common so they can be reused across controller, orchestrator, and external orchestrator implementations. Classify previously-unclassified error paths in getGraph: - context cancellation during graph read → cancelled/user - context cancellation during orchestrator call → cancelled/user - graph blob NotFound after treehash hit → falls through to compute - non-NotFound storage errors → graph_fetch/infra - unclassified orchestrator errors → graph_fetch/infra Co-Authored-By: Claude Opus 4.6 (1M context) --- controller/errors.go | 15 ++--- controller/getchangedtargets.go | 8 +-- controller/gettargetgraph.go | 38 ++++++++---- controller/gettargetgraph_test.go | 89 ++++++++++++++++++++++++++++- core/common/errors.go | 9 +++ orchestrator/errors.go | 3 +- orchestrator/native_orchestrator.go | 10 ++-- 7 files changed, 136 insertions(+), 36 deletions(-) diff --git a/controller/errors.go b/controller/errors.go index aa39cc8..7a0d069 100644 --- a/controller/errors.go +++ b/controller/errors.go @@ -24,9 +24,8 @@ import ( // failure_reason tag values for errors that originate in the controller itself. // Errors from the orchestrator carry their own reason via common.ClassifiedError. +// Shared reasons live in core/common as common.FailureReason*. const ( - // Request failed input validation before any work was attempted. - failureReasonValidation = "validation" // Reading a cached target graph from storage failed. failureReasonGraphFetch = "graph_fetch" // Streaming a response message back to the client failed. @@ -35,12 +34,6 @@ const ( failureReasonCompare = "compare" // Reading a stored treehash from storage failed (not a cache miss). failureReasonTreehashRead = "treehash_read" - // The caller's context was cancelled before the RPC completed. - failureReasonCancelled = "cancelled" - // The caller's context deadline elapsed before the RPC completed. - failureReasonDeadlineExceeded = "deadline_exceeded" - // Catch-all for errors that did not classify themselves. - failureReasonUnknown = "unknown" ) // emitFailureMetric tags the failure counter with the reason and type from the @@ -52,11 +45,11 @@ func emitFailureMetric(scope tally.Scope, err error) { case errors.As(err, &ce): // already classified — use the error's own reason and type case errors.Is(err, context.Canceled): - ce = common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err) + ce = common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err) case errors.Is(err, context.DeadlineExceeded): - ce = common.WithReason(failureReasonDeadlineExceeded, common.ErrorTypeUser, err) + ce = common.WithReason(common.FailureReasonDeadlineExceeded, common.ErrorTypeUser, err) default: - ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, err) + ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, err) } scope.Tagged(map[string]string{ "failure_type": ce.Type(), diff --git a/controller/getchangedtargets.go b/controller/getchangedtargets.go index f131c97..92807fe 100644 --- a/controller/getchangedtargets.go +++ b/controller/getchangedtargets.go @@ -53,7 +53,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str }() if err := validateGetChangedTargetsRequest(request); err != nil { c.logger.Error("GetChangedTargets: Invalid request", zap.Error(err)) - return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err) + return common.WithReason(common.FailureReasonValidation, common.ErrorTypeUser, err) } scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())}) ctx, cancelLink := c.linkRequestCtx(stream.Context()) @@ -106,7 +106,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str if err := ctx.Err(); err != nil { cachedReader.Close() // Client gave up while we were draining the cache. Surface as a user-cancelled error. - return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err) + return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err) } var resp *pb.GetChangedTargetsResponse resp, readErr = cachedReader.Read() @@ -236,7 +236,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str if ctx.Err() != nil { // If the context was cancelled by the upstream, just return the original error without additional augmentation - return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) } // Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled @@ -267,7 +267,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str secondGraph = nil if err != nil { if ctx.Err() != nil { - return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) } logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err)) return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err)) diff --git a/controller/gettargetgraph.go b/controller/gettargetgraph.go index 03567b7..3ead896 100644 --- a/controller/gettargetgraph.go +++ b/controller/gettargetgraph.go @@ -123,18 +123,25 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes storageStart := time.Now() graphReader, err := storage.NewGraphReader(ctx, c.storage, treehashPath) if err != nil { - logger.Error("getGraph: Error reading graph from Storage", zap.Error(err)) - return nil, err + if ctx.Err() != nil { + return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + } + if !storage.IsNotFound(err) { + logger.Error("getGraph: Error reading graph from Storage", zap.Error(err)) + return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) + } + logger.Warn("getGraph: graph not found at treehash path", zap.Error(err)) + } else { + logger.Info("getGraph: loaded graph from storage", + zap.Duration("storage_duration", time.Since(storageStart)), + zap.Duration("total_duration", time.Since(start)), + ) + scope := c.scope.SubScope("get_graph") + scope.Counter("graph_cache_hit").Inc(1) + scope.Timer("storage_duration").Record(time.Since(storageStart)) + scope.Timer("total_duration").Record(time.Since(start)) + return graphReader, nil } - logger.Info("getGraph: loaded graph from storage", - zap.Duration("storage_duration", time.Since(storageStart)), - zap.Duration("total_duration", time.Since(start)), - ) - scope := c.scope.SubScope("get_graph") - scope.Counter("graph_cache_hit").Inc(1) - scope.Timer("storage_duration").Record(time.Since(storageStart)) - scope.Timer("total_duration").Record(time.Since(start)) - return graphReader, nil } } else { logger.Info("getGraph: bypass_cache=true, skipping cache lookup") @@ -142,7 +149,14 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes computeStart := time.Now() graphReader, err := c.orchestrator.GetTargetGraph(ctx, orchestrator.GetTargetGraphParam{Req: &pb.GetTargetGraphRequest{BuildDescription: buildDescription, OutputConfig: outputConfig, RequestOptions: requestOptions}, BypassCache: bypassCache}) if err != nil { - return nil, err + if ctx.Err() != nil { + return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err()) + } + var ce common.ClassifiedError + if errors.As(err, &ce) { + return nil, err + } + return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err) } logger.Info("getGraph: computed target graph", zap.Duration("compute_duration", time.Since(computeStart)), diff --git a/controller/gettargetgraph_test.go b/controller/gettargetgraph_test.go index fe2d535..e62856f 100644 --- a/controller/gettargetgraph_test.go +++ b/controller/gettargetgraph_test.go @@ -24,6 +24,7 @@ import ( gogio "github.com/gogo/protobuf/io" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "github.com/uber/tango/core/common" "github.com/uber/tango/core/storage" storagemock "github.com/uber/tango/core/storage/storagemock" orchestratormock "github.com/uber/tango/orchestrator/orchestratormock" @@ -229,7 +230,7 @@ func TestGetTargetGraph_TreehashReadError(t *testing.T) { assert.Error(t, err) } -// New coverage: graph fetch returns error -> error returned. +// New coverage: graph fetch returns error -> classified as graph_fetch/infra. func TestGetTargetGraph_GraphFetchError(t *testing.T) { ctrl := gomock.NewController(t) stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) @@ -246,7 +247,11 @@ func TestGetTargetGraph_GraphFetchError(t *testing.T) { err := c.GetTargetGraph(&pb.GetTargetGraphRequest{ BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, }, stream) - assert.Error(t, err) + require.Error(t, err) + var ce common.ClassifiedError + require.True(t, errors.As(err, &ce)) + assert.Equal(t, failureReasonGraphFetch, ce.Reason()) + assert.Equal(t, common.ErrorTypeInfra, ce.Type()) } // New coverage: io.ReadFrom fails on graph read -> error returned. @@ -297,6 +302,86 @@ func TestGetTargetGraph_StreamSendError(t *testing.T) { assert.Error(t, err) } +func TestGetTargetGraph_GraphNotFound_FallsThrough(t *testing.T) { + ctrl := gomock.NewController(t) + stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) + stream.EXPECT().Context().Return(context.Background()) + stream.EXPECT().Send(gomock.Any()).Return(nil) + + store := storagemock.NewMockStorage(ctrl) + gomock.InOrder( + store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), + store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "graphs/abc"}), + ) + orch := orchestratormock.NewMockOrchestrator(ctrl) + graphReader := storagemock.NewMockGraphReader(ctrl) + graphReader.EXPECT().Read().Return(&pb.GetTargetGraphResponse{ + Item: &pb.GetTargetGraphResponse_Targets{Targets: &pb.OptimizedTargets{}}, + }, nil).Times(1) + graphReader.EXPECT().Read().Return(nil, io.EOF).Times(1) + graphReader.EXPECT().Close().Return(nil) + orch.EXPECT().GetTargetGraph(gomock.Any(), gomock.Any()).Return(graphReader, nil) + c := NewController(context.Background(), Params{ + Logger: zaptest.NewLogger(t), + Storage: store, + Orchestrator: orch, + }) + err := c.GetTargetGraph(&pb.GetTargetGraphRequest{ + BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, + }, stream) + require.NoError(t, err) +} + +func TestGetTargetGraph_GraphReadCancelled(t *testing.T) { + ctrl := gomock.NewController(t) + stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + stream.EXPECT().Context().Return(ctx) + store := storagemock.NewMockStorage(ctrl) + gomock.InOrder( + store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil), + store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, errors.New("context canceled")), + ) + c := NewController(context.Background(), Params{ + Logger: zaptest.NewLogger(t), + Storage: store, + }) + err := c.GetTargetGraph(&pb.GetTargetGraphRequest{ + BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, + }, stream) + require.Error(t, err) + var ce common.ClassifiedError + require.True(t, errors.As(err, &ce)) + assert.Equal(t, common.FailureReasonCancelled, ce.Reason()) + assert.Equal(t, common.ErrorTypeUser, ce.Type()) +} + +func TestGetTargetGraph_OrchestratorCancelled(t *testing.T) { + ctrl := gomock.NewController(t) + stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl) + ctx, cancel := context.WithCancel(context.Background()) + cancel() + stream.EXPECT().Context().Return(ctx) + store := storagemock.NewMockStorage(ctrl) + store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "x"}) + orch := orchestratormock.NewMockOrchestrator(ctrl) + orch.EXPECT().GetTargetGraph(gomock.Any(), gomock.Any()).Return(nil, errors.New("context canceled")) + c := NewController(context.Background(), Params{ + Logger: zaptest.NewLogger(t), + Storage: store, + Orchestrator: orch, + }) + err := c.GetTargetGraph(&pb.GetTargetGraphRequest{ + BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"}, + }, stream) + require.Error(t, err) + var ce common.ClassifiedError + require.True(t, errors.As(err, &ce)) + assert.Equal(t, common.FailureReasonCancelled, ce.Reason()) + assert.Equal(t, common.ErrorTypeUser, ce.Type()) +} + func newMockReadCloser(data []byte) io.ReadCloser { if data == nil { return nil diff --git a/core/common/errors.go b/core/common/errors.go index 6b4bc49..3870c07 100644 --- a/core/common/errors.go +++ b/core/common/errors.go @@ -20,6 +20,15 @@ const ( ErrorTypeInfra = "infra" ) +// Common failure_reason tag values shared across packages. +const ( + FailureReasonCancelled = "cancelled" + FailureReasonDeadlineExceeded = "deadline_exceeded" + FailureReasonUnknown = "unknown" + FailureReasonStorage = "storage" + FailureReasonValidation = "validation" +) + // ClassifiedError is an error that carries an explicit failure reason and type // for metrics classification. External clients can implement this interface so // that classifyError in the controller picks it up automatically via errors.As, diff --git a/orchestrator/errors.go b/orchestrator/errors.go index 61a3f82..dd82381 100644 --- a/orchestrator/errors.go +++ b/orchestrator/errors.go @@ -15,6 +15,7 @@ package orchestrator // failure_reason tag values emitted by the orchestrator. +// Shared reasons live in core/common as common.FailureReason*. const ( failureReasonConfigParse = "config_parse" failureReasonNoRepoConfig = "no_repo_config" @@ -26,6 +27,4 @@ const ( failureReasonBazelClient = "bazel_client" failureReasonGraphCompute = "graph_compute" failureReasonGraphConvert = "graph_convert" - failureReasonStorage = "storage" - failureReasonUnknown = "unknown" ) diff --git a/orchestrator/native_orchestrator.go b/orchestrator/native_orchestrator.go index e3741ed..c3e2cc1 100644 --- a/orchestrator/native_orchestrator.go +++ b/orchestrator/native_orchestrator.go @@ -100,7 +100,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget scope.Counter("failure").Inc(1) var ce common.ClassifiedError if !errors.As(retErr, &ce) { - ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, retErr) + ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, retErr) } scope.Tagged(map[string]string{ "failure_type": ce.Type(), @@ -182,7 +182,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget } if !storage.IsNotFound(err) { logger.Errorw("GetTargetGraph: Storage error", zap.Error(err)) - return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) + return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Treehash not found, computing target graph", zap.String("treehash", treehash)) } else { @@ -223,19 +223,19 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget err = storage.WriteGraphStream(ctx, b.storage, treehashPath, responses) if err != nil { logger.Errorw("GetTargetGraph: Error writing target graph to storage", zap.Error(err)) - return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) + return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err) } treehashCachePath := common.GetTreehashCachePath(param.Req.BuildDescription) treehashReader := bytes.NewReader([]byte(treehash)) err = b.storage.Put(ctx, storage.UploadRequest{Key: treehashCachePath, Reader: treehashReader}) if err != nil { logger.Errorw("GetTargetGraph: Error storing treehash mapping", zap.Error(err)) - return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) + return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err) } graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath) if err != nil { logger.Errorw("GetTargetGraph: Error creating graph reader", zap.Error(err)) - return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err) + return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err) } logger.Infow("GetTargetGraph: Done computing and storing target graph", zap.String("treehash", treehash)) return graphReader, nil