Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
9 changes: 8 additions & 1 deletion acceptance/acceptance_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -744,7 +744,7 @@ func runTest(t *testing.T,
args := []string{"bash", "-euo", "pipefail", EntryPointScript}
cmd := exec.CommandContext(ctx, args[0], args[1:]...)

cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir)
cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, testEnv)
testdiff.PrepareReplacementsUser(t, &repls, user)
testdiff.PrepareReplacementsWorkspaceConfig(t, &repls, cfg)

Expand Down Expand Up @@ -825,6 +825,13 @@ func runTest(t *testing.T,
cmd.Env = addEnvVar(t, cmd.Env, &repls, key, value, config.EnvRepl, defaultRepl)
}

// When the testserver simulates eventual consistency locally, the direct engine
// retries reads on a 404. Keep that retry near-instant so tests don't sleep. This
// is local-only: on cloud the real propagation delay needs the real interval.
if !isRunningOnCloud && internal.StaleOnceEnabled(testEnv) && !hasKey(testEnv, "DATABRICKS_BUNDLE_RETRY_INTERVAL_MS") {
cmd.Env = append(cmd.Env, "DATABRICKS_BUNDLE_RETRY_INTERVAL_MS=1")
}

absDir, err := filepath.Abs(dir)
require.NoError(t, err)
cmd.Env = append(cmd.Env, "TESTDIR="+absDir)
Expand Down
52 changes: 52 additions & 0 deletions acceptance/bin/retry.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,52 @@
#!/usr/bin/env python3
"""Retry a command until it succeeds and its output matches expectations.

Usage: retry.py [--until SUBSTR] [--until-not SUBSTR] CMD [ARGS...]

Retries CMD up to 5 times (configurable via RETRY_MAX_ATTEMPTS env var),
sleeping RETRY_INTERVAL_MS milliseconds (default 500) between attempts.
An attempt is considered successful when the command exits with code 0 and:
--until SUBSTR SUBSTR appears in stdout
--until-not SUBSTR SUBSTR does not appear in stdout
"""

import argparse
import os
import subprocess
import sys
import time


def main():
parser = argparse.ArgumentParser(prog="retry.py")
parser.add_argument("--until")
parser.add_argument("--until-not")
parser.add_argument("cmd", nargs=argparse.REMAINDER)
args = parser.parse_args()
if not args.cmd:
parser.error("no command given")
until = args.until
until_not = args.until_not
argv = args.cmd

interval = float(os.environ.get("RETRY_INTERVAL_MS", "500")) / 1000.0
max_attempts = int(os.environ.get("RETRY_MAX_ATTEMPTS", "5"))

result = subprocess.run(argv, capture_output=True)
for _ in range(1, max_attempts):
success = (
result.returncode == 0
and (until is None or until.encode() in result.stdout)
and (until_not is None or until_not.encode() not in result.stdout)
)
if success:
break
time.sleep(interval)
result = subprocess.run(argv, capture_output=True)

sys.stdout.buffer.write(result.stdout)
sys.stderr.buffer.write(result.stderr)
sys.exit(result.returncode)


main()
6 changes: 6 additions & 0 deletions acceptance/bundle/invariant/no_drift/test.toml
Original file line number Diff line number Diff line change
@@ -1 +1,7 @@
EnvMatrix.READPLAN = ["", "1"]

# Simulate eventual consistency (first GET after create returns 404) for the
# direct engine, exercising its retry-on-missing path. Only no_drift runs a pure
# current-CLI direct flow; migrate/continue_293 invoke terraform or the old CLI,
# which do not retry, so EC is scoped to this directory.
Env.TESTS_STALE_ONCE = "1"
5 changes: 3 additions & 2 deletions acceptance/bundle/resources/dashboards/detect-change/script
Original file line number Diff line number Diff line change
Expand Up @@ -31,8 +31,9 @@ $CLI lakeview get "${DASHBOARD_ID}" | jq '{display_name,page_display_name: (.ser
title "Make an out of band modification to the dashboard and confirm that it is detected:\n"
RESOURCE_ID=$($CLI workspace get-status "${DASHBOARD_PATH}" | jq -r '.resource_id')
DASHBOARD_JSON="{\"serialized_dashboard\": \"{}\", \"warehouse_id\": \"$TEST_DEFAULT_WAREHOUSE_ID\"}"
$CLI lakeview update "${RESOURCE_ID}" --json "${DASHBOARD_JSON}" | jq '{lifecycle_state}'
echo "$($CLI lakeview get "$DASHBOARD_ID" | jq -r '.etag'):ETAG_2" >> ACC_REPLS
UPDATE_RESP=$($CLI lakeview update "${RESOURCE_ID}" --json "${DASHBOARD_JSON}")
echo "$UPDATE_RESP" | jq '{lifecycle_state}'
echo "$(echo "$UPDATE_RESP" | jq -r '.etag'):ETAG_2" >> ACC_REPLS

title "Try to redeploy the bundle and confirm that the out of band modification is detected:"
trace $CLI bundle plan
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -12,7 +12,8 @@ unset MSYS_NO_PATHCONV
trace $CLI bundle deploy
replace_ids.py
DASHBOARD_ID=$($CLI bundle summary --output json | jq -r '.resources.dashboards.dashboard1.id')
add_repl.py "$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_1
ETAG_1=$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag')
add_repl.py "$ETAG_1" ETAG_1
trace $CLI lakeview get $DASHBOARD_ID | jq '{display_name, etag}'
trace $CLI lakeview get-published $DASHBOARD_ID | jq '{display_name}'
trace $CLI bundle plan -o json | gron.py | grep -E "etag|published"
Expand All @@ -28,7 +29,8 @@ update_file.py databricks.yml "my dashboard" "my dashboard renamed"
# SaveState is only called on success, so state retains the pre-PATCH etag.
errcode trace $CLI bundle deploy
trace print_requests.py //lakeview/dashboards
add_repl.py "$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_2
# The PATCH bumped the remote etag to ETAG_2; retry until it is visible (eventual consistency).
add_repl.py "$(retry.py --until-not "$ETAG_1" $CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_2
trace $CLI lakeview get $DASHBOARD_ID | jq '{display_name, etag}'
trace $CLI lakeview get-published $DASHBOARD_ID | jq '{display_name}'
trace $CLI bundle plan -o json | gron.py | grep -E "etag|published"
Expand Down
4 changes: 4 additions & 0 deletions acceptance/bundle/resources/dashboards/test.toml
Original file line number Diff line number Diff line change
Expand Up @@ -10,3 +10,7 @@ RequiresWarehouse = true
# C:/Program Files/Git/Users/$username/UNIQUE_NAME before passing it to the CLI
# Setting this environment variable prevents that conversion on windows.
MSYS_NO_PATHCONV = "1"

# Simulate eventual consistency (first GET after create returns 404) for the
# direct engine, exercising its retry-on-missing path. Ignored on terraform.
TESTS_STALE_ONCE = "1"
32 changes: 31 additions & 1 deletion acceptance/internal/prepare_server.go
Original file line number Diff line number Diff line change
Expand Up @@ -63,7 +63,32 @@ func isTruePtr(value *bool) bool {
return value != nil && *value
}

func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string) (*sdkconfig.Config, iam.User) {
// StaleOnceEnabled reports whether the testserver should simulate eventual
// consistency (the first GET after a create returns 404). It is opt-in via
// TESTS_STALE_ONCE=1 and only applies to the direct engine, which retries reads
// on a 404; the terraform provider does not, so it is skipped there.
//
// testEnv carries the per-variant EnvMatrix values, which are not visible via
// os/env because matrix variants run in parallel and only reach the CLI subprocess.
func StaleOnceEnabled(testEnv []string) bool {
if v, _ := lookupEnv(testEnv, "TESTS_STALE_ONCE"); v != "1" {
return false
}
engine, _ := lookupEnv(testEnv, "DATABRICKS_BUNDLE_ENGINE")
return engine == "direct"
}

func lookupEnv(testEnv []string, key string) (string, bool) {
prefix := key + "="
for _, kv := range testEnv {
if v, ok := strings.CutPrefix(kv, prefix); ok {
return v, true
}
}
return "", false
}

func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string, testEnv []string) (*sdkconfig.Config, iam.User) {
cloudEnv := env.Get(t.Context(), "CLOUD_ENV")
recordRequests := isTruePtr(config.RecordRequests)

Expand All @@ -76,6 +101,11 @@ func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, o
if isTruePtr(config.IsServicePrincipal) {
token = testserver.ServicePrincipalTokenPrefix + tokenSuffix
testUser = testserver.TestUserSP
} else if StaleOnceEnabled(testEnv) {
// Use the eventual-consistency token so DashboardGet returns 404 on
// the first GET after a create, matching real cloud propagation delays.
token = testserver.EventualConsistencyTokenPrefix + tokenSuffix
testUser = testserver.TestUser
} else {
token = testserver.UserNameTokenPrefix + tokenSuffix
testUser = testserver.TestUser
Expand Down
12 changes: 8 additions & 4 deletions bundle/direct/apply.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,7 +80,8 @@ func (d *DeploymentUnit) Create(ctx context.Context, db *dstate.DeploymentState,
return fmt.Errorf("saving state after creating id=%s: %w", newID, err)
}

waitRemoteState, err := retryOnTransient(ctx, func() (any, error) {
// The resource may not be immediately visible after creation (eventual consistency).
waitRemoteState, err := retryOnTransientOrMissing(ctx, func() (any, error) {
return d.Adapter.WaitAfterCreate(ctx, newID, newState)
})
if err != nil {
Expand All @@ -106,7 +107,7 @@ func (d *DeploymentUnit) Recreate(ctx context.Context, db *dstate.DeploymentStat
// replace_existing=true will reconfigure the parent-managed resource in
// place, matching the Terraform provider's recreate behaviour.
err = retryOnTransientErr(ctx, func() error { return d.Adapter.DoDelete(ctx, oldID, oldState) })
if err != nil && !isResourceGone(err) && !isManagedByParent(err) {
if err != nil && !apierr.IsMissing(err) && !isManagedByParent(err) {
return fmt.Errorf("deleting old id=%s: %w", oldID, err)
}

Expand Down Expand Up @@ -218,7 +219,7 @@ func (d *DeploymentUnit) Delete(ctx context.Context, db *dstate.DeploymentState,
}

err = d.Adapter.DoDelete(ctx, oldID, oldState)
if err != nil && !isResourceGone(err) && !isManagedByParent(err) {
if err != nil && !apierr.IsMissing(err) && !isManagedByParent(err) {
// Rather than failing delete and requiring user to unbind, we perform unbind automatically there.
// Some services, e.g. jobs, return 403 for missing resources if caller did not have permissions to it when job existed.
// In those cases 403 hides 404. In other cases, user not having permissions to resource but having in the bundle might
Expand Down Expand Up @@ -291,7 +292,10 @@ func (d *DeploymentUnit) refreshRemoteState(ctx context.Context, id string) erro
if d.RemoteState != nil {
return nil
}
remoteState, err := retryOnTransient(ctx, func() (any, error) {
// Retry on 404: the resource may not be visible yet after a recent create
// (eventual consistency). The engine knows the resource should exist because
// it has the ID on record.
remoteState, err := retryOnTransientOrMissing(ctx, func() (any, error) {
return d.Adapter.DoRead(ctx, id)
})
if err != nil {
Expand Down
5 changes: 3 additions & 2 deletions bundle/direct/bundle_plan.go
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@ import (
"github.com/databricks/cli/libs/structs/structpath"
"github.com/databricks/cli/libs/structs/structvar"
"github.com/databricks/databricks-sdk-go"
"github.com/databricks/databricks-sdk-go/apierr"
)

var errDelayed = errors.New("must be resolved after apply")
Expand Down Expand Up @@ -161,7 +162,7 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks
return adapter.DoRead(ctx, id)
})
if err != nil {
if isResourceGone(err) {
if apierr.IsMissing(err) {
// no such resource
plan.RemoveEntry(resourceKey)
} else {
Expand Down Expand Up @@ -218,7 +219,7 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks
return adapter.DoRead(ctx, dbentry.ID)
})
if err != nil {
if isResourceGone(err) {
if apierr.IsMissing(err) {
remoteState = nil
} else {
logdiag.LogError(ctx, fmt.Errorf("%s: reading id=%q: %w", errorPrefix, dbentry.ID, err))
Expand Down
8 changes: 8 additions & 0 deletions bundle/direct/dresources/dashboard.go
Original file line number Diff line number Diff line change
Expand Up @@ -358,6 +358,14 @@ func (r *ResourceDashboard) DoUpdate(ctx context.Context, id string, config *Das
return responseToState(updateResp, publishResp, dashboard.SerializedDashboard, config.Published), nil
}

// WaitAfterCreate reads the dashboard back after creation. Under eventual
// consistency the first GET can 404; the read is retried at the apply layer
// (retryOnTransientOrMissing), so the post-create stale is consumed inside
// deploy rather than surfacing on the next read.
func (r *ResourceDashboard) WaitAfterCreate(ctx context.Context, id string, _ *DashboardState) (*DashboardState, error) {
return r.DoRead(ctx, id)
}

func (r *ResourceDashboard) DoDelete(ctx context.Context, id string, _ *DashboardState) error {
return r.client.Lakeview.Trash(ctx, dashboards.TrashDashboardRequest{
DashboardId: id,
Expand Down
16 changes: 15 additions & 1 deletion bundle/direct/retry.go
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,13 @@ func retryWith[T any](ctx context.Context, check func(error) bool, fn func() (T,
}
msg = fmt.Sprintf("retrying after %d %s%s", apiErr.StatusCode, http.StatusText(apiErr.StatusCode), endpoint)
}
log.Warnf(ctx, "%s", msg)
// A 404 right after a write is expected under eventual consistency and not
// user-actionable, so log it at debug. Transient 5xx are worth a warning.
if apierr.IsMissing(err) {
log.Debugf(ctx, "%s", msg)
} else {
log.Warnf(ctx, "%s", msg)
}
select {
case <-ctx.Done():
var zero T
Expand All @@ -76,6 +82,14 @@ func retryOnTransient[T any](ctx context.Context, fn func() (T, error)) (T, erro
return retryWith(ctx, func(err error) bool { return isTransient(ctx, err) }, fn)
}

// retryOnTransientOrMissing retries fn on transient errors and on 404 (resource not yet
// visible due to eventual consistency after a recent create).
func retryOnTransientOrMissing[T any](ctx context.Context, fn func() (T, error)) (T, error) {
return retryWith(ctx, func(err error) bool {
return isTransient(ctx, err) || apierr.IsMissing(err)
}, fn)
}

// retryOnTransientErr wraps retryOnTransient for functions that return only an error.
func retryOnTransientErr(ctx context.Context, fn func() error) error {
_, err := retryOnTransient(ctx, func() (struct{}, error) {
Expand Down
4 changes: 0 additions & 4 deletions bundle/direct/util.go
Original file line number Diff line number Diff line change
Expand Up @@ -6,10 +6,6 @@ import (
"github.com/databricks/databricks-sdk-go/apierr"
)

func isResourceGone(err error) bool {
return errors.Is(err, apierr.ErrResourceDoesNotExist) || errors.Is(err, apierr.ErrNotFound)
}

// isManagedByParent reports whether err is an API error carrying the
// declarative_context=MANAGED_BY_PARENT marker in ErrorInfo.metadata. The
// server uses this to signal that a resource's lifecycle is owned by a
Expand Down
Loading
Loading