Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
15 changes: 4 additions & 11 deletions controller/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,8 @@ import (

// failure_reason tag values for errors that originate in the controller itself.
// Errors from the orchestrator carry their own reason via common.ClassifiedError.
// Shared reasons live in core/common as common.FailureReason*.
const (
// Request failed input validation before any work was attempted.
failureReasonValidation = "validation"
// Reading a cached target graph from storage failed.
failureReasonGraphFetch = "graph_fetch"
// Streaming a response message back to the client failed.
Expand All @@ -35,12 +34,6 @@ const (
failureReasonCompare = "compare"
// Reading a stored treehash from storage failed (not a cache miss).
failureReasonTreehashRead = "treehash_read"
// The caller's context was cancelled before the RPC completed.
failureReasonCancelled = "cancelled"
// The caller's context deadline elapsed before the RPC completed.
failureReasonDeadlineExceeded = "deadline_exceeded"
// Catch-all for errors that did not classify themselves.
failureReasonUnknown = "unknown"
)

// emitFailureMetric tags the failure counter with the reason and type from the
Expand All @@ -52,11 +45,11 @@ func emitFailureMetric(scope tally.Scope, err error) {
case errors.As(err, &ce):
// already classified — use the error's own reason and type
case errors.Is(err, context.Canceled):
ce = common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err)
ce = common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err)
case errors.Is(err, context.DeadlineExceeded):
ce = common.WithReason(failureReasonDeadlineExceeded, common.ErrorTypeUser, err)
ce = common.WithReason(common.FailureReasonDeadlineExceeded, common.ErrorTypeUser, err)
default:
ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, err)
ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, err)
}
scope.Tagged(map[string]string{
"failure_type": ce.Type(),
Expand Down
8 changes: 4 additions & 4 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
}()
if err := validateGetChangedTargetsRequest(request); err != nil {
c.logger.Error("GetChangedTargets: Invalid request", zap.Error(err))
return common.WithReason(failureReasonValidation, common.ErrorTypeUser, err)
return common.WithReason(common.FailureReasonValidation, common.ErrorTypeUser, err)
}
scope = scope.Tagged(map[string]string{"repo": common.ToShortRemote(request.GetFirstRevision().GetRemote())})
ctx, cancelLink := c.linkRequestCtx(stream.Context())
Expand Down Expand Up @@ -106,7 +106,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
if err := ctx.Err(); err != nil {
cachedReader.Close()
// Client gave up while we were draining the cache. Surface as a user-cancelled error.
return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, err)
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, err)
}
var resp *pb.GetChangedTargetsResponse
resp, readErr = cachedReader.Read()
Expand Down Expand Up @@ -236,7 +236,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str

if ctx.Err() != nil {
// If the context was cancelled by the upstream, just return the original error without additional augmentation
return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err())
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}

// Process errors, only aggregating the ones that are original ones and not a result of the other job being cancelled
Expand Down Expand Up @@ -267,7 +267,7 @@ func (c *controller) GetChangedTargets(request *pb.GetChangedTargetsRequest, str
secondGraph = nil
if err != nil {
if ctx.Err() != nil {
return common.WithReason(failureReasonCancelled, common.ErrorTypeUser, ctx.Err())
return common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}
logger.Error("GetChangedTargets: Failed to compare target graphs", zap.Error(err))
return common.WithReason(failureReasonCompare, common.ErrorTypeInfra, fmt.Errorf("failed to compare target graphs: %w", err))
Expand Down
38 changes: 26 additions & 12 deletions controller/gettargetgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -123,26 +123,40 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes
storageStart := time.Now()
graphReader, err := storage.NewGraphReader(ctx, c.storage, treehashPath)
if err != nil {
logger.Error("getGraph: Error reading graph from Storage", zap.Error(err))
return nil, err
if ctx.Err() != nil {
return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}
if !storage.IsNotFound(err) {
logger.Error("getGraph: Error reading graph from Storage", zap.Error(err))
return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err)
}
logger.Warn("getGraph: graph not found at treehash path", zap.Error(err))
} else {
logger.Info("getGraph: loaded graph from storage",
zap.Duration("storage_duration", time.Since(storageStart)),
zap.Duration("total_duration", time.Since(start)),
)
scope := c.scope.SubScope("get_graph")
scope.Counter("graph_cache_hit").Inc(1)
scope.Timer("storage_duration").Record(time.Since(storageStart))
scope.Timer("total_duration").Record(time.Since(start))
return graphReader, nil
}
logger.Info("getGraph: loaded graph from storage",
zap.Duration("storage_duration", time.Since(storageStart)),
zap.Duration("total_duration", time.Since(start)),
)
scope := c.scope.SubScope("get_graph")
scope.Counter("graph_cache_hit").Inc(1)
scope.Timer("storage_duration").Record(time.Since(storageStart))
scope.Timer("total_duration").Record(time.Since(start))
return graphReader, nil
}
} else {
logger.Info("getGraph: bypass_cache=true, skipping cache lookup")
}
computeStart := time.Now()
graphReader, err := c.orchestrator.GetTargetGraph(ctx, orchestrator.GetTargetGraphParam{Req: &pb.GetTargetGraphRequest{BuildDescription: buildDescription, OutputConfig: outputConfig, RequestOptions: requestOptions}, BypassCache: bypassCache})
if err != nil {
return nil, err
if ctx.Err() != nil {
return nil, common.WithReason(common.FailureReasonCancelled, common.ErrorTypeUser, ctx.Err())
}
var ce common.ClassifiedError
if errors.As(err, &ce) {
return nil, err
}
return nil, common.WithReason(failureReasonGraphFetch, common.ErrorTypeInfra, err)
}
logger.Info("getGraph: computed target graph",
zap.Duration("compute_duration", time.Since(computeStart)),
Expand Down
89 changes: 87 additions & 2 deletions controller/gettargetgraph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
gogio "github.com/gogo/protobuf/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"github.com/uber/tango/core/common"
"github.com/uber/tango/core/storage"
storagemock "github.com/uber/tango/core/storage/storagemock"
orchestratormock "github.com/uber/tango/orchestrator/orchestratormock"
Expand Down Expand Up @@ -229,7 +230,7 @@ func TestGetTargetGraph_TreehashReadError(t *testing.T) {
assert.Error(t, err)
}

// New coverage: graph fetch returns error -> error returned.
// New coverage: graph fetch returns error -> classified as graph_fetch/infra.
func TestGetTargetGraph_GraphFetchError(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
Expand All @@ -246,7 +247,11 @@ func TestGetTargetGraph_GraphFetchError(t *testing.T) {
err := c.GetTargetGraph(&pb.GetTargetGraphRequest{
BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"},
}, stream)
assert.Error(t, err)
require.Error(t, err)
var ce common.ClassifiedError
require.True(t, errors.As(err, &ce))
assert.Equal(t, failureReasonGraphFetch, ce.Reason())
assert.Equal(t, common.ErrorTypeInfra, ce.Type())
}

// New coverage: io.ReadFrom fails on graph read -> error returned.
Expand Down Expand Up @@ -297,6 +302,86 @@ func TestGetTargetGraph_StreamSendError(t *testing.T) {
assert.Error(t, err)
}

func TestGetTargetGraph_GraphNotFound_FallsThrough(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
stream.EXPECT().Context().Return(context.Background())
stream.EXPECT().Send(gomock.Any()).Return(nil)

store := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "graphs/abc"}),
)
orch := orchestratormock.NewMockOrchestrator(ctrl)
graphReader := storagemock.NewMockGraphReader(ctrl)
graphReader.EXPECT().Read().Return(&pb.GetTargetGraphResponse{
Item: &pb.GetTargetGraphResponse_Targets{Targets: &pb.OptimizedTargets{}},
}, nil).Times(1)
graphReader.EXPECT().Read().Return(nil, io.EOF).Times(1)
graphReader.EXPECT().Close().Return(nil)
orch.EXPECT().GetTargetGraph(gomock.Any(), gomock.Any()).Return(graphReader, nil)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: store,
Orchestrator: orch,
})
err := c.GetTargetGraph(&pb.GetTargetGraphRequest{
BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"},
}, stream)
require.NoError(t, err)
}

func TestGetTargetGraph_GraphReadCancelled(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
ctx, cancel := context.WithCancel(context.Background())
cancel()
stream.EXPECT().Context().Return(ctx)
store := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, errors.New("context canceled")),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: store,
})
err := c.GetTargetGraph(&pb.GetTargetGraphRequest{
BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"},
}, stream)
require.Error(t, err)
var ce common.ClassifiedError
require.True(t, errors.As(err, &ce))
assert.Equal(t, common.FailureReasonCancelled, ce.Reason())
assert.Equal(t, common.ErrorTypeUser, ce.Type())
}

func TestGetTargetGraph_OrchestratorCancelled(t *testing.T) {
ctrl := gomock.NewController(t)
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
ctx, cancel := context.WithCancel(context.Background())
cancel()
stream.EXPECT().Context().Return(ctx)
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "x"})
orch := orchestratormock.NewMockOrchestrator(ctrl)
orch.EXPECT().GetTargetGraph(gomock.Any(), gomock.Any()).Return(nil, errors.New("context canceled"))
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: store,
Orchestrator: orch,
})
err := c.GetTargetGraph(&pb.GetTargetGraphRequest{
BuildDescription: &pb.BuildDescription{Remote: "repo:go-code", BaseSha: "sha"},
}, stream)
require.Error(t, err)
var ce common.ClassifiedError
require.True(t, errors.As(err, &ce))
assert.Equal(t, common.FailureReasonCancelled, ce.Reason())
assert.Equal(t, common.ErrorTypeUser, ce.Type())
}

func newMockReadCloser(data []byte) io.ReadCloser {
if data == nil {
return nil
Expand Down
9 changes: 9 additions & 0 deletions core/common/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,6 +20,15 @@ const (
ErrorTypeInfra = "infra"
)

// Common failure_reason tag values shared across packages.
const (
FailureReasonCancelled = "cancelled"
FailureReasonDeadlineExceeded = "deadline_exceeded"
FailureReasonUnknown = "unknown"
FailureReasonStorage = "storage"
FailureReasonValidation = "validation"
)

// ClassifiedError is an error that carries an explicit failure reason and type
// for metrics classification. External clients can implement this interface so
// that classifyError in the controller picks it up automatically via errors.As,
Expand Down
3 changes: 1 addition & 2 deletions orchestrator/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,7 @@
package orchestrator

// failure_reason tag values emitted by the orchestrator.
// Shared reasons live in core/common as common.FailureReason*.
const (
failureReasonConfigParse = "config_parse"
failureReasonNoRepoConfig = "no_repo_config"
Expand All @@ -26,6 +27,4 @@ const (
failureReasonBazelClient = "bazel_client"
failureReasonGraphCompute = "graph_compute"
failureReasonGraphConvert = "graph_convert"
failureReasonStorage = "storage"
failureReasonUnknown = "unknown"
)
10 changes: 5 additions & 5 deletions orchestrator/native_orchestrator.go
Original file line number Diff line number Diff line change
Expand Up @@ -100,7 +100,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget
scope.Counter("failure").Inc(1)
var ce common.ClassifiedError
if !errors.As(retErr, &ce) {
ce = common.WithReason(failureReasonUnknown, common.ErrorTypeInfra, retErr)
ce = common.WithReason(common.FailureReasonUnknown, common.ErrorTypeInfra, retErr)
}
scope.Tagged(map[string]string{
"failure_type": ce.Type(),
Expand Down Expand Up @@ -182,7 +182,7 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget
}
if !storage.IsNotFound(err) {
logger.Errorw("GetTargetGraph: Storage error", zap.Error(err))
return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err)
return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err)
}
logger.Infow("GetTargetGraph: Treehash not found, computing target graph", zap.String("treehash", treehash))
} else {
Expand Down Expand Up @@ -224,19 +224,19 @@ func (b *nativeOrchestrator) GetTargetGraph(ctx context.Context, param GetTarget
err = storage.WriteGraphStream(ctx, b.storage, treehashPath, responses)
if err != nil {
logger.Errorw("GetTargetGraph: Error writing target graph to storage", zap.Error(err))
return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err)
return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err)
}
treehashCachePath := common.GetTreehashCachePath(param.Req.BuildDescription)
treehashReader := bytes.NewReader([]byte(treehash))
err = b.storage.Put(ctx, storage.UploadRequest{Key: treehashCachePath, Reader: treehashReader})
if err != nil {
logger.Errorw("GetTargetGraph: Error storing treehash mapping", zap.Error(err))
return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err)
return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err)
}
graphReader, err := storage.NewGraphReader(ctx, b.storage, treehashPath)
if err != nil {
logger.Errorw("GetTargetGraph: Error creating graph reader", zap.Error(err))
return nil, common.WithReason(failureReasonStorage, common.ErrorTypeInfra, err)
return nil, common.WithReason(common.FailureReasonStorage, common.ErrorTypeInfra, err)
}
logger.Infow("GetTargetGraph: Done computing and storing target graph", zap.String("treehash", treehash))
return graphReader, nil
Expand Down
Loading