Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 0 additions & 3 deletions controller/getchangedtargets.go
Original file line number Diff line number Diff line change
Expand Up @@ -1086,9 +1086,6 @@ func readTreehash(ctx context.Context, st storage.Storage, buildDescription *pb.
}
return "", fmt.Errorf("treehash read failed for key %q: %w", key, err)
}
if resp == nil || resp.ReadCloser == nil {
return "", fmt.Errorf("treehash read returned nil body for key %q", key)
}
defer resp.ReadCloser.Close()
b, err := io.ReadAll(resp.ReadCloser)
if err != nil {
Expand Down
65 changes: 27 additions & 38 deletions controller/getchangedtargets_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -187,11 +187,11 @@ func TestGetChangedTargets_CacheHit(t *testing.T) {
// First two Gets resolve the treehashes, third gets the cached comparison result.
gomock.InOrder(
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(cachedBytes))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(cachedBytes))}, nil),
)

stream.EXPECT().Send(gomock.Any()).Return(nil).Times(2)
Expand Down Expand Up @@ -224,7 +224,7 @@ func TestGetChangedTargets_TreehashReadError(t *testing.T) {
// read or graph fetch happens, so this is the only Get expected.
injected := errors.New("storage exploded")
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, injected).Times(1)
Return(storage.DownloadResponse{}, injected).Times(1)

c := NewController(context.Background(), Params{
Logger: zap.NewNop(),
Expand Down Expand Up @@ -254,7 +254,7 @@ func TestReadTreehash(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, &storage.NotFoundError{Path: "missing"})
Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "missing"})

val, err := readTreehash(context.Background(), st, bd)
require.NoError(t, err)
Expand All @@ -266,30 +266,19 @@ func TestReadTreehash(t *testing.T) {
st := storagemock.NewMockStorage(ctrl)
injected := errors.New("infra down")
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, injected)
Return(storage.DownloadResponse{}, injected)

val, err := readTreehash(context.Background(), st, bd)
require.Error(t, err)
assert.ErrorIs(t, err, injected)
assert.Empty(t, val)
})

t.Run("nil response surfaces as error", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(nil, nil)

val, err := readTreehash(context.Background(), st, bd)
require.Error(t, err)
assert.Empty(t, val)
})

t.Run("success returns treehash", func(t *testing.T) {
ctrl := gomock.NewController(t)
st := storagemock.NewMockStorage(ctrl)
st.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(strings.NewReader("deadbeef"))}, nil)
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(strings.NewReader("deadbeef"))}, nil)

val, err := readTreehash(context.Background(), st, bd)
require.NoError(t, err)
Expand All @@ -309,14 +298,14 @@ func TestGetChangedTargets_StreamSendError(t *testing.T) {
gogio.NewDelimitedWriter(&buf).WriteMsg(&pb.GetTargetGraphResponse{
Item: &pb.GetTargetGraphResponse_Targets{Targets: &pb.OptimizedTargets{}},
})
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) {
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(func(_ context.Context, req storage.DownloadRequest) (storage.DownloadResponse, error) {
if strings.Contains(req.Key, "compared-targets") {
return nil, &storage.NotFoundError{Path: req.Key}
return storage.DownloadResponse{}, &storage.NotFoundError{Path: req.Key}
}
if strings.Contains(req.Key, "th") {
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(buf.Bytes()))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(buf.Bytes()))}, nil
}
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("th")))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("th")))}, nil
}).AnyTimes()

// Put is launched in a goroutine — use a channel to wait for it before the test ends.
Expand Down Expand Up @@ -409,20 +398,20 @@ func TestGetChangedTargets_streamChunks(t *testing.T) {

// Each revision needs: treehash lookup + graph lookup. Plus one initial cache miss.
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) {
func(_ context.Context, req storage.DownloadRequest) (storage.DownloadResponse, error) {
switch {
case strings.Contains(req.Key, "compared-targets"):
return nil, &storage.NotFoundError{Path: req.Key}
return storage.DownloadResponse{}, &storage.NotFoundError{Path: req.Key}
case strings.Contains(req.Key, "sha1"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil
case strings.Contains(req.Key, "sha2"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil
case strings.Contains(req.Key, "treehash1"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graph1Bytes))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graph1Bytes))}, nil
case strings.Contains(req.Key, "treehash2"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graph2Bytes))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graph2Bytes))}, nil
default:
return nil, fmt.Errorf("unexpected key: %s", req.Key)
return storage.DownloadResponse{}, fmt.Errorf("unexpected key: %s", req.Key)
}
// readTreehash (×2 pre) + comparison cache miss (×1) + graph computation (×4) + readTreehash (×2 post) = 9
}).Times(9)
Expand Down Expand Up @@ -506,18 +495,18 @@ func TestGetChangedTargets_CacheWriteUsesAppCtx(t *testing.T) {
graphBytes := graphBuf.Bytes()

storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).DoAndReturn(
func(_ context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) {
func(_ context.Context, req storage.DownloadRequest) (storage.DownloadResponse, error) {
switch {
case strings.Contains(req.Key, "compared-targets"):
return nil, &storage.NotFoundError{Path: req.Key}
return storage.DownloadResponse{}, &storage.NotFoundError{Path: req.Key}
case strings.Contains(req.Key, "sha1"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil
case strings.Contains(req.Key, "sha2"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil
case strings.Contains(req.Key, "treehash"):
return &storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graphBytes))}, nil
return storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(graphBytes))}, nil
default:
return nil, fmt.Errorf("unexpected key: %s", req.Key)
return storage.DownloadResponse{}, fmt.Errorf("unexpected key: %s", req.Key)
}
}).AnyTimes()

Expand Down Expand Up @@ -1156,11 +1145,11 @@ func TestGetChangedTargets_CacheHitWithDistanceFilter(t *testing.T) {
storagemock := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash1")))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader([]byte("treehash2")))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(cachedBytes))}, nil),
Return(storage.DownloadResponse{ReadCloser: io.NopCloser(bytes.NewReader(cachedBytes))}, nil),
)

var sent []*pb.GetChangedTargetsResponse
Expand Down
4 changes: 0 additions & 4 deletions controller/gettargetgraph.go
Original file line number Diff line number Diff line change
Expand Up @@ -110,10 +110,6 @@ func (c *controller) getGraph(ctx context.Context, buildDescription *pb.BuildDes
logger.Error("getGraph: Storage error", zap.Error(err))
return nil, err
}
} else if treehashResponse == nil || treehashResponse.ReadCloser == nil {
// This shouldn't happen with valid Storage implementation, but handle gracefully
logger.Info("getGraph: Empty response from Storage")
return nil, nil // Return nil to indicate no send should happen
} else {
defer treehashResponse.ReadCloser.Close()
treehashBytes, err := io.ReadAll(treehashResponse.ReadCloser)
Expand Down
30 changes: 15 additions & 15 deletions controller/gettargetgraph_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,9 +41,9 @@ func TestGetTargetGraph_CacheMiss_NoSend(t *testing.T) {
// Return a valid treehash, then an empty graph blob (no messages).
gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-empty"))}, nil),
Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-empty"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).
Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(nil)}, nil),
Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte{})}, nil),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand All @@ -69,7 +69,7 @@ func TestGetTargetGraph_StorageError_Propagates(t *testing.T) {
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
stream.EXPECT().Context().Return(context.Background())
storagemock := storagemock.NewMockStorage(ctrl)
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, expected)
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, expected)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: storagemock,
Expand All @@ -93,8 +93,8 @@ func TestGetTargetGraph_DecodeError_ReturnsError(t *testing.T) {
stream.EXPECT().Context().Return(context.Background())
storagemock := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("bad-bytes"))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("bad-bytes"))}, nil),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand Down Expand Up @@ -126,8 +126,8 @@ func TestGetTargetGraph_SendsWhenItemPresent(t *testing.T) {
require.NoError(t, err)

gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-xyz"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-xyz"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand Down Expand Up @@ -188,7 +188,7 @@ func TestGetTargetGraph_TreehashNotFound_NoError(t *testing.T) {
stream.EXPECT().Send(gomock.Any()).Return(nil)

store := storagemock.NewMockStorage(ctrl)
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, &storage.NotFoundError{Path: "x"})
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, &storage.NotFoundError{Path: "x"})
orchestrator := orchestratormock.NewMockOrchestrator(ctrl)
// Provide a fake GraphReader that yields one message then EOF
graphReader := storagemock.NewMockGraphReader(ctrl)
Expand Down Expand Up @@ -218,7 +218,7 @@ func TestGetTargetGraph_TreehashReadError(t *testing.T) {
stream := tangomock.NewMockTangoServiceGetTargetGraphYARPCServer(ctrl)
stream.EXPECT().Context().Return(context.Background())
store := storagemock.NewMockStorage(ctrl)
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil)
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Storage: store,
Expand All @@ -236,8 +236,8 @@ func TestGetTargetGraph_GraphFetchError(t *testing.T) {
stream.EXPECT().Context().Return(context.Background())
store := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(nil, errors.New("graph error")),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{}, errors.New("graph error")),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand All @@ -256,8 +256,8 @@ func TestGetTargetGraph_GraphReadError(t *testing.T) {
stream.EXPECT().Context().Return(context.Background())
store := storagemock.NewMockStorage(ctrl)
gomock.InOrder(
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
store.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: &errReadCloser{err: errors.New("readfail")}}, nil),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand All @@ -284,8 +284,8 @@ func TestGetTargetGraph_StreamSendError(t *testing.T) {

stream.EXPECT().Send(gomock.Any()).Return(errors.New("send fail"))
gomock.InOrder(
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(&storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser([]byte("treehash-abc"))}, nil),
storagemock.EXPECT().Get(gomock.Any(), gomock.Any()).Return(storage.DownloadResponse{ReadCloser: newMockReadCloser(buf.Bytes())}, nil),
)
c := NewController(context.Background(), Params{
Logger: zaptest.NewLogger(t),
Expand Down
3 changes: 0 additions & 3 deletions core/storage/changedtargetsreader.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,9 +58,6 @@ func NewChangedTargetsReader(ctx context.Context, st Storage, key string) (Chang
if err != nil {
return nil, err
}
if resp == nil || resp.ReadCloser == nil {
return nil, nil
}
return &changedTargetsReaderCloser{
reader: gogio.NewDelimitedReader(resp.ReadCloser, 32<<20),
}, nil
Expand Down
30 changes: 21 additions & 9 deletions core/storage/disk/disk.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,6 +21,7 @@ import (
"io"
"os"
"path/filepath"
"strings"

"github.com/uber/tango/core/storage"
)
Expand All @@ -42,19 +43,19 @@ func New(rootDir string) (*diskStorage, error) {
}

// Get retrieves a blob by key. Returns storage.NotFoundError if not found.
func (d *diskStorage) Get(ctx context.Context, req storage.DownloadRequest) (*storage.DownloadResponse, error) {
func (d *diskStorage) Get(ctx context.Context, req storage.DownloadRequest) (storage.DownloadResponse, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
return storage.DownloadResponse{}, ctx.Err()
}
path := filepath.Join(d.rootDir, req.Key)
file, err := os.Open(path)
if err != nil {
if os.IsNotExist(err) {
return nil, &storage.NotFoundError{Path: req.Key}
return storage.DownloadResponse{}, &storage.NotFoundError{Path: req.Key}
}
return nil, err
return storage.DownloadResponse{}, err
}
return &storage.DownloadResponse{ReadCloser: file}, nil
return storage.DownloadResponse{ReadCloser: file}, nil
}

// Put stores a blob with the given key.
Expand Down Expand Up @@ -104,14 +105,22 @@ func (d *diskStorage) Exists(ctx context.Context, key string) (bool, error) {
return false, err
}

// List returns the relative paths of all regular files under the given directory prefix.
func (d *diskStorage) List(ctx context.Context, dir string) ([]string, error) {
// List returns all keys whose name starts with the given prefix.
//
// To honor the literal-prefix contract without walking the entire rootDir, this
// walks the longest path-prefix of `prefix` ending in "/" (or rootDir if none)
// and filters entries by strings.HasPrefix on the full key.
func (d *diskStorage) List(ctx context.Context, prefix string) ([]string, error) {
if ctx.Err() != nil {
return nil, ctx.Err()
}
root := filepath.Join(d.rootDir, dir)
walkSubdir := ""
if idx := strings.LastIndex(prefix, "/"); idx >= 0 {
walkSubdir = prefix[:idx+1]
}
walkRoot := filepath.Join(d.rootDir, walkSubdir)
var keys []string
err := filepath.Walk(root, func(path string, info os.FileInfo, err error) error {
err := filepath.Walk(walkRoot, func(path string, info os.FileInfo, err error) error {
if err != nil {
if os.IsNotExist(err) {
return nil
Expand All @@ -125,6 +134,9 @@ func (d *diskStorage) List(ctx context.Context, dir string) ([]string, error) {
if err != nil {
return err
}
if !strings.HasPrefix(rel, prefix) {
return nil
}
keys = append(keys, rel)
return nil
})
Expand Down
Loading
Loading