diff --git a/acceptance/acceptance_test.go b/acceptance/acceptance_test.go index d05035652ec..a6bb60d929a 100644 --- a/acceptance/acceptance_test.go +++ b/acceptance/acceptance_test.go @@ -744,7 +744,7 @@ func runTest(t *testing.T, args := []string{"bash", "-euo", "pipefail", EntryPointScript} cmd := exec.CommandContext(ctx, args[0], args[1:]...) - cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir) + cfg, user := internal.PrepareServerAndClient(t, config, LogRequests, tmpDir, testEnv) testdiff.PrepareReplacementsUser(t, &repls, user) testdiff.PrepareReplacementsWorkspaceConfig(t, &repls, cfg) @@ -825,6 +825,13 @@ func runTest(t *testing.T, cmd.Env = addEnvVar(t, cmd.Env, &repls, key, value, config.EnvRepl, defaultRepl) } + // When the testserver simulates eventual consistency locally, the direct engine + // retries reads on a 404. Keep that retry near-instant so tests don't sleep. This + // is local-only: on cloud the real propagation delay needs the real interval. + if !isRunningOnCloud && internal.StaleOnceEnabled(testEnv) && !hasKey(testEnv, "DATABRICKS_BUNDLE_RETRY_INTERVAL_MS") { + cmd.Env = append(cmd.Env, "DATABRICKS_BUNDLE_RETRY_INTERVAL_MS=1") + } + absDir, err := filepath.Abs(dir) require.NoError(t, err) cmd.Env = append(cmd.Env, "TESTDIR="+absDir) diff --git a/acceptance/bin/retry.py b/acceptance/bin/retry.py new file mode 100755 index 00000000000..f39f7a1e896 --- /dev/null +++ b/acceptance/bin/retry.py @@ -0,0 +1,52 @@ +#!/usr/bin/env python3 +"""Retry a command until it succeeds and its output matches expectations. + +Usage: retry.py [--until SUBSTR] [--until-not SUBSTR] CMD [ARGS...] + +Retries CMD up to 5 times (configurable via RETRY_MAX_ATTEMPTS env var), +sleeping RETRY_INTERVAL_MS milliseconds (default 500) between attempts. +An attempt is considered successful when the command exits with code 0 and: + --until SUBSTR SUBSTR appears in stdout + --until-not SUBSTR SUBSTR does not appear in stdout +""" + +import argparse +import os +import subprocess +import sys +import time + + +def main(): + parser = argparse.ArgumentParser(prog="retry.py") + parser.add_argument("--until") + parser.add_argument("--until-not") + parser.add_argument("cmd", nargs=argparse.REMAINDER) + args = parser.parse_args() + if not args.cmd: + parser.error("no command given") + until = args.until + until_not = args.until_not + argv = args.cmd + + interval = float(os.environ.get("RETRY_INTERVAL_MS", "500")) / 1000.0 + max_attempts = int(os.environ.get("RETRY_MAX_ATTEMPTS", "5")) + + result = subprocess.run(argv, capture_output=True) + for _ in range(1, max_attempts): + success = ( + result.returncode == 0 + and (until is None or until.encode() in result.stdout) + and (until_not is None or until_not.encode() not in result.stdout) + ) + if success: + break + time.sleep(interval) + result = subprocess.run(argv, capture_output=True) + + sys.stdout.buffer.write(result.stdout) + sys.stderr.buffer.write(result.stderr) + sys.exit(result.returncode) + + +main() diff --git a/acceptance/bundle/invariant/no_drift/test.toml b/acceptance/bundle/invariant/no_drift/test.toml index ff8a66c196e..3201ef41d3a 100644 --- a/acceptance/bundle/invariant/no_drift/test.toml +++ b/acceptance/bundle/invariant/no_drift/test.toml @@ -1 +1,7 @@ EnvMatrix.READPLAN = ["", "1"] + +# Simulate eventual consistency (first GET after create returns 404) for the +# direct engine, exercising its retry-on-missing path. Only no_drift runs a pure +# current-CLI direct flow; migrate/continue_293 invoke terraform or the old CLI, +# which do not retry, so EC is scoped to this directory. +Env.TESTS_STALE_ONCE = "1" diff --git a/acceptance/bundle/resources/dashboards/detect-change/script b/acceptance/bundle/resources/dashboards/detect-change/script index 36649c77cad..71cd75ab974 100644 --- a/acceptance/bundle/resources/dashboards/detect-change/script +++ b/acceptance/bundle/resources/dashboards/detect-change/script @@ -31,8 +31,9 @@ $CLI lakeview get "${DASHBOARD_ID}" | jq '{display_name,page_display_name: (.ser title "Make an out of band modification to the dashboard and confirm that it is detected:\n" RESOURCE_ID=$($CLI workspace get-status "${DASHBOARD_PATH}" | jq -r '.resource_id') DASHBOARD_JSON="{\"serialized_dashboard\": \"{}\", \"warehouse_id\": \"$TEST_DEFAULT_WAREHOUSE_ID\"}" -$CLI lakeview update "${RESOURCE_ID}" --json "${DASHBOARD_JSON}" | jq '{lifecycle_state}' -echo "$($CLI lakeview get "$DASHBOARD_ID" | jq -r '.etag'):ETAG_2" >> ACC_REPLS +UPDATE_RESP=$($CLI lakeview update "${RESOURCE_ID}" --json "${DASHBOARD_JSON}") +echo "$UPDATE_RESP" | jq '{lifecycle_state}' +echo "$(echo "$UPDATE_RESP" | jq -r '.etag'):ETAG_2" >> ACC_REPLS title "Try to redeploy the bundle and confirm that the out of band modification is detected:" trace $CLI bundle plan diff --git a/acceptance/bundle/resources/dashboards/publish-failure-stale-content/script b/acceptance/bundle/resources/dashboards/publish-failure-stale-content/script index 500d87c5d76..cc7551e9b68 100644 --- a/acceptance/bundle/resources/dashboards/publish-failure-stale-content/script +++ b/acceptance/bundle/resources/dashboards/publish-failure-stale-content/script @@ -12,7 +12,8 @@ unset MSYS_NO_PATHCONV trace $CLI bundle deploy replace_ids.py DASHBOARD_ID=$($CLI bundle summary --output json | jq -r '.resources.dashboards.dashboard1.id') -add_repl.py "$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_1 +ETAG_1=$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag') +add_repl.py "$ETAG_1" ETAG_1 trace $CLI lakeview get $DASHBOARD_ID | jq '{display_name, etag}' trace $CLI lakeview get-published $DASHBOARD_ID | jq '{display_name}' trace $CLI bundle plan -o json | gron.py | grep -E "etag|published" @@ -28,7 +29,8 @@ update_file.py databricks.yml "my dashboard" "my dashboard renamed" # SaveState is only called on success, so state retains the pre-PATCH etag. errcode trace $CLI bundle deploy trace print_requests.py //lakeview/dashboards -add_repl.py "$($CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_2 +# The PATCH bumped the remote etag to ETAG_2; retry until it is visible (eventual consistency). +add_repl.py "$(retry.py --until-not "$ETAG_1" $CLI lakeview get $DASHBOARD_ID | jq -r '.etag')" ETAG_2 trace $CLI lakeview get $DASHBOARD_ID | jq '{display_name, etag}' trace $CLI lakeview get-published $DASHBOARD_ID | jq '{display_name}' trace $CLI bundle plan -o json | gron.py | grep -E "etag|published" diff --git a/acceptance/bundle/resources/dashboards/test.toml b/acceptance/bundle/resources/dashboards/test.toml index a0d0d5ea3f1..fa7f1bba1d6 100644 --- a/acceptance/bundle/resources/dashboards/test.toml +++ b/acceptance/bundle/resources/dashboards/test.toml @@ -10,3 +10,7 @@ RequiresWarehouse = true # C:/Program Files/Git/Users/$username/UNIQUE_NAME before passing it to the CLI # Setting this environment variable prevents that conversion on windows. MSYS_NO_PATHCONV = "1" + +# Simulate eventual consistency (first GET after create returns 404) for the +# direct engine, exercising its retry-on-missing path. Ignored on terraform. +TESTS_STALE_ONCE = "1" diff --git a/acceptance/internal/prepare_server.go b/acceptance/internal/prepare_server.go index 1f39bc323b1..5ab51729592 100644 --- a/acceptance/internal/prepare_server.go +++ b/acceptance/internal/prepare_server.go @@ -63,7 +63,32 @@ func isTruePtr(value *bool) bool { return value != nil && *value } -func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string) (*sdkconfig.Config, iam.User) { +// StaleOnceEnabled reports whether the testserver should simulate eventual +// consistency (the first GET after a create returns 404). It is opt-in via +// TESTS_STALE_ONCE=1 and only applies to the direct engine, which retries reads +// on a 404; the terraform provider does not, so it is skipped there. +// +// testEnv carries the per-variant EnvMatrix values, which are not visible via +// os/env because matrix variants run in parallel and only reach the CLI subprocess. +func StaleOnceEnabled(testEnv []string) bool { + if v, _ := lookupEnv(testEnv, "TESTS_STALE_ONCE"); v != "1" { + return false + } + engine, _ := lookupEnv(testEnv, "DATABRICKS_BUNDLE_ENGINE") + return engine == "direct" +} + +func lookupEnv(testEnv []string, key string) (string, bool) { + prefix := key + "=" + for _, kv := range testEnv { + if v, ok := strings.CutPrefix(kv, prefix); ok { + return v, true + } + } + return "", false +} + +func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, outputDir string, testEnv []string) (*sdkconfig.Config, iam.User) { cloudEnv := env.Get(t.Context(), "CLOUD_ENV") recordRequests := isTruePtr(config.RecordRequests) @@ -76,6 +101,11 @@ func PrepareServerAndClient(t *testing.T, config TestConfig, logRequests bool, o if isTruePtr(config.IsServicePrincipal) { token = testserver.ServicePrincipalTokenPrefix + tokenSuffix testUser = testserver.TestUserSP + } else if StaleOnceEnabled(testEnv) { + // Use the eventual-consistency token so DashboardGet returns 404 on + // the first GET after a create, matching real cloud propagation delays. + token = testserver.EventualConsistencyTokenPrefix + tokenSuffix + testUser = testserver.TestUser } else { token = testserver.UserNameTokenPrefix + tokenSuffix testUser = testserver.TestUser diff --git a/bundle/direct/apply.go b/bundle/direct/apply.go index a4a61f727f2..95b70d5f8cc 100644 --- a/bundle/direct/apply.go +++ b/bundle/direct/apply.go @@ -80,7 +80,8 @@ func (d *DeploymentUnit) Create(ctx context.Context, db *dstate.DeploymentState, return fmt.Errorf("saving state after creating id=%s: %w", newID, err) } - waitRemoteState, err := retryOnTransient(ctx, func() (any, error) { + // The resource may not be immediately visible after creation (eventual consistency). + waitRemoteState, err := retryOnTransientOrMissing(ctx, func() (any, error) { return d.Adapter.WaitAfterCreate(ctx, newID, newState) }) if err != nil { @@ -106,7 +107,7 @@ func (d *DeploymentUnit) Recreate(ctx context.Context, db *dstate.DeploymentStat // replace_existing=true will reconfigure the parent-managed resource in // place, matching the Terraform provider's recreate behaviour. err = retryOnTransientErr(ctx, func() error { return d.Adapter.DoDelete(ctx, oldID, oldState) }) - if err != nil && !isResourceGone(err) && !isManagedByParent(err) { + if err != nil && !apierr.IsMissing(err) && !isManagedByParent(err) { return fmt.Errorf("deleting old id=%s: %w", oldID, err) } @@ -218,7 +219,7 @@ func (d *DeploymentUnit) Delete(ctx context.Context, db *dstate.DeploymentState, } err = d.Adapter.DoDelete(ctx, oldID, oldState) - if err != nil && !isResourceGone(err) && !isManagedByParent(err) { + if err != nil && !apierr.IsMissing(err) && !isManagedByParent(err) { // Rather than failing delete and requiring user to unbind, we perform unbind automatically there. // Some services, e.g. jobs, return 403 for missing resources if caller did not have permissions to it when job existed. // In those cases 403 hides 404. In other cases, user not having permissions to resource but having in the bundle might @@ -291,7 +292,10 @@ func (d *DeploymentUnit) refreshRemoteState(ctx context.Context, id string) erro if d.RemoteState != nil { return nil } - remoteState, err := retryOnTransient(ctx, func() (any, error) { + // Retry on 404: the resource may not be visible yet after a recent create + // (eventual consistency). The engine knows the resource should exist because + // it has the ID on record. + remoteState, err := retryOnTransientOrMissing(ctx, func() (any, error) { return d.Adapter.DoRead(ctx, id) }) if err != nil { diff --git a/bundle/direct/bundle_plan.go b/bundle/direct/bundle_plan.go index a9527ce56d5..be08c2a3f22 100644 --- a/bundle/direct/bundle_plan.go +++ b/bundle/direct/bundle_plan.go @@ -25,6 +25,7 @@ import ( "github.com/databricks/cli/libs/structs/structpath" "github.com/databricks/cli/libs/structs/structvar" "github.com/databricks/databricks-sdk-go" + "github.com/databricks/databricks-sdk-go/apierr" ) var errDelayed = errors.New("must be resolved after apply") @@ -161,7 +162,7 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return adapter.DoRead(ctx, id) }) if err != nil { - if isResourceGone(err) { + if apierr.IsMissing(err) { // no such resource plan.RemoveEntry(resourceKey) } else { @@ -218,7 +219,7 @@ func (b *DeploymentBundle) CalculatePlan(ctx context.Context, client *databricks return adapter.DoRead(ctx, dbentry.ID) }) if err != nil { - if isResourceGone(err) { + if apierr.IsMissing(err) { remoteState = nil } else { logdiag.LogError(ctx, fmt.Errorf("%s: reading id=%q: %w", errorPrefix, dbentry.ID, err)) diff --git a/bundle/direct/dresources/dashboard.go b/bundle/direct/dresources/dashboard.go index bb3b3921a43..b19b40a9301 100644 --- a/bundle/direct/dresources/dashboard.go +++ b/bundle/direct/dresources/dashboard.go @@ -358,6 +358,14 @@ func (r *ResourceDashboard) DoUpdate(ctx context.Context, id string, config *Das return responseToState(updateResp, publishResp, dashboard.SerializedDashboard, config.Published), nil } +// WaitAfterCreate reads the dashboard back after creation. Under eventual +// consistency the first GET can 404; the read is retried at the apply layer +// (retryOnTransientOrMissing), so the post-create stale is consumed inside +// deploy rather than surfacing on the next read. +func (r *ResourceDashboard) WaitAfterCreate(ctx context.Context, id string, _ *DashboardState) (*DashboardState, error) { + return r.DoRead(ctx, id) +} + func (r *ResourceDashboard) DoDelete(ctx context.Context, id string, _ *DashboardState) error { return r.client.Lakeview.Trash(ctx, dashboards.TrashDashboardRequest{ DashboardId: id, diff --git a/bundle/direct/retry.go b/bundle/direct/retry.go index 43f5d6f0a4b..ee0ff69c27f 100644 --- a/bundle/direct/retry.go +++ b/bundle/direct/retry.go @@ -61,7 +61,13 @@ func retryWith[T any](ctx context.Context, check func(error) bool, fn func() (T, } msg = fmt.Sprintf("retrying after %d %s%s", apiErr.StatusCode, http.StatusText(apiErr.StatusCode), endpoint) } - log.Warnf(ctx, "%s", msg) + // A 404 right after a write is expected under eventual consistency and not + // user-actionable, so log it at debug. Transient 5xx are worth a warning. + if apierr.IsMissing(err) { + log.Debugf(ctx, "%s", msg) + } else { + log.Warnf(ctx, "%s", msg) + } select { case <-ctx.Done(): var zero T @@ -76,6 +82,14 @@ func retryOnTransient[T any](ctx context.Context, fn func() (T, error)) (T, erro return retryWith(ctx, func(err error) bool { return isTransient(ctx, err) }, fn) } +// retryOnTransientOrMissing retries fn on transient errors and on 404 (resource not yet +// visible due to eventual consistency after a recent create). +func retryOnTransientOrMissing[T any](ctx context.Context, fn func() (T, error)) (T, error) { + return retryWith(ctx, func(err error) bool { + return isTransient(ctx, err) || apierr.IsMissing(err) + }, fn) +} + // retryOnTransientErr wraps retryOnTransient for functions that return only an error. func retryOnTransientErr(ctx context.Context, fn func() error) error { _, err := retryOnTransient(ctx, func() (struct{}, error) { diff --git a/bundle/direct/util.go b/bundle/direct/util.go index f3a16692860..1ec180fd20e 100644 --- a/bundle/direct/util.go +++ b/bundle/direct/util.go @@ -6,10 +6,6 @@ import ( "github.com/databricks/databricks-sdk-go/apierr" ) -func isResourceGone(err error) bool { - return errors.Is(err, apierr.ErrResourceDoesNotExist) || errors.Is(err, apierr.ErrNotFound) -} - // isManagedByParent reports whether err is an API error carrying the // declarative_context=MANAGED_BY_PARENT marker in ErrorInfo.metadata. The // server uses this to signal that a resource's lifecycle is owned by a diff --git a/libs/testserver/dashboards.go b/libs/testserver/dashboards.go index 420eb04c307..70378f4f799 100644 --- a/libs/testserver/dashboards.go +++ b/libs/testserver/dashboards.go @@ -81,6 +81,28 @@ func transformSerializedDashboard(serializedDashboard, datasetCatalog, datasetSc return result } +func (s *FakeWorkspace) DashboardGet(req Request) Response { + defer s.LockUnlock()() + + dashboardId := req.Vars["dashboard_id"] + ev, ok := s.Dashboards[dashboardId] + if !ok { + return Response{StatusCode: 404} + } + // When eventual consistency is enabled, the first GET after a create returns nil + // (404) to simulate propagation delay. Updates are immediately visible. + var ptr *fakeDashboard + if s.EventualConsistency { + ptr = ev.ReadEventual() + } else { + ptr = ev.ReadStrong() + } + if ptr == nil { + return Response{StatusCode: 404} + } + return Response{Body: *ptr} +} + func (s *FakeWorkspace) DashboardCreate(req Request) Response { defer s.LockUnlock()() @@ -144,10 +166,14 @@ func (s *FakeWorkspace) DashboardCreate(req Request) Response { } dashboard.Etag = "80611980" - s.Dashboards[dashboard.DashboardId] = fakeDashboard{ + fd := &fakeDashboard{ Dashboard: dashboard, InputSerializedDashboard: inputSerializedDashboard, } + // Write via EventualValue so the first GET returns 404 (eventual consistency). + ev := &EventualValue[*fakeDashboard]{} + ev.Write(fd) + s.Dashboards[dashboard.DashboardId] = ev workspacePath := path.Join("/Workspace", dashboard.Path) s.files[workspacePath] = FileEntry{ @@ -176,50 +202,60 @@ func (s *FakeWorkspace) DashboardUpdate(req Request) Response { } dashboardId := req.Vars["dashboard_id"] - dashboard, ok := s.Dashboards[dashboardId] + ev, ok := s.Dashboards[dashboardId] if !ok { return Response{ StatusCode: 404, } } + dashboard := ev.ReadStrong() + if dashboard == nil { + return Response{ + StatusCode: 404, + } + } + + updated := *dashboard // Bump etag on every write, matching cloud behavior. - prevEtag, err := strconv.Atoi(dashboard.Etag) + prevEtag, err := strconv.Atoi(updated.Etag) if err != nil { return Response{ Body: map[string]string{ - "message": "Invalid etag: " + dashboard.Etag, + "message": "Invalid etag: " + updated.Etag, }, StatusCode: 400, } } - dashboard.Etag = strconv.Itoa(prevEtag + 1) + updated.Etag = strconv.Itoa(prevEtag + 1) - if updateReq.SerializedDashboard != dashboard.InputSerializedDashboard { - dashboard.InputSerializedDashboard = updateReq.SerializedDashboard + if updateReq.SerializedDashboard != updated.InputSerializedDashboard { + updated.InputSerializedDashboard = updateReq.SerializedDashboard } // Update the dashboard. - dashboard.LifecycleState = dashboards.LifecycleStateActive + updated.LifecycleState = dashboards.LifecycleStateActive if updateReq.DisplayName != "" { - dashboard.DisplayName = updateReq.DisplayName - dir := path.Dir(dashboard.Path) + updated.DisplayName = updateReq.DisplayName + dir := path.Dir(updated.Path) base := updateReq.DisplayName + ".lvdash.json" - dashboard.Path = path.Join(dir, base) + updated.Path = path.Join(dir, base) } if updateReq.SerializedDashboard != "" { // Extract dataset_catalog and dataset_schema from query parameters datasetCatalog := req.URL.Query().Get("dataset_catalog") datasetSchema := req.URL.Query().Get("dataset_schema") - dashboard.SerializedDashboard = transformSerializedDashboard(updateReq.SerializedDashboard, datasetCatalog, datasetSchema) + updated.SerializedDashboard = transformSerializedDashboard(updateReq.SerializedDashboard, datasetCatalog, datasetSchema) } - dashboard.WarehouseId = updateReq.WarehouseId - dashboard.UpdateTime = time.Now().UTC().Format(time.RFC3339) + updated.WarehouseId = updateReq.WarehouseId + updated.UpdateTime = time.Now().UTC().Format(time.RFC3339) - s.Dashboards[dashboardId] = dashboard + // Put (not Write): updates are immediately visible. Only creates use Write so + // that the first GET after create returns 404 (eventual-consistency simulation). + ev.Put(&updated) return Response{ - Body: dashboard, + Body: updated.Dashboard, } } @@ -234,12 +270,18 @@ func (s *FakeWorkspace) DashboardPublish(req Request) Response { } dashboardId := req.Vars["dashboard_id"] - dashboard, ok := s.Dashboards[dashboardId] + ev, ok := s.Dashboards[dashboardId] if !ok { return Response{ StatusCode: 404, } } + dashboard := ev.ReadStrong() + if dashboard == nil { + return Response{ + StatusCode: 404, + } + } publishedDashboard := dashboards.PublishedDashboard{ WarehouseId: dashboard.WarehouseId, @@ -273,28 +315,35 @@ func (s *FakeWorkspace) DashboardTrash(req Request) Response { defer s.LockUnlock()() dashboardId := req.Vars["dashboard_id"] - dashboard, ok := s.Dashboards[dashboardId] + ev, ok := s.Dashboards[dashboardId] if !ok { return Response{ StatusCode: 404, } } + dashboard := ev.ReadStrong() + if dashboard == nil { + return Response{ + StatusCode: 404, + } + } // The dashboard is marked as trashed and moved to the trash. - s.Dashboards[dashboardId] = fakeDashboard{ + // Put (not Write) so the trashed state is immediately visible. + ev.Put(&fakeDashboard{ Dashboard: dashboards.Dashboard{ Etag: dashboard.Etag, DashboardId: dashboardId, LifecycleState: dashboards.LifecycleStateTrashed, ParentPath: path.Join("/Users", s.CurrentUser().UserName, "Trash"), }, - } + }) // The published dashboard is deleted. delete(s.PublishedDashboards, dashboardId) return Response{ - Body: dashboard, + Body: *dashboard, } } @@ -302,8 +351,8 @@ func (s *FakeWorkspace) DashboardUnpublish(req Request) Response { defer s.LockUnlock()() dashboardId := req.Vars["dashboard_id"] - _, ok := s.Dashboards[dashboardId] - if !ok { + ev, ok := s.Dashboards[dashboardId] + if !ok || ev.ReadStrong() == nil { return Response{ StatusCode: 404, } diff --git a/libs/testserver/eventual.go b/libs/testserver/eventual.go new file mode 100644 index 00000000000..d0e5824efa2 --- /dev/null +++ b/libs/testserver/eventual.go @@ -0,0 +1,43 @@ +package testserver + +// EventualValue provides eventual-consistency read semantics for a single +// value. After Write, the first Read returns the pre-write value; subsequent +// Reads return the written value. +// +// Not safe for concurrent use; callers must hold the workspace lock. +type EventualValue[T any] struct { + current T + stale T + pending bool +} + +// Write stages v as the new value. The pre-write value is returned by the +// next Read call. +func (e *EventualValue[T]) Write(v T) { + e.stale = e.current + e.current = v + e.pending = true +} + +// ReadEventual returns the current value, applying eventual-consistency: the +// first call after a Write returns the pre-write (stale) value. +func (e *EventualValue[T]) ReadEventual() T { + if e.pending { + e.pending = false + return e.stale + } + return e.current +} + +// ReadStrong returns the current value without consuming the pending stale +// state. Use for write operations that need the latest committed value. +func (e *EventualValue[T]) ReadStrong() T { + return e.current +} + +// Put sets v as the current value immediately without staging a stale +// version. Any pending stale state is discarded. +func (e *EventualValue[T]) Put(v T) { + e.current = v + e.pending = false +} diff --git a/libs/testserver/fake_workspace.go b/libs/testserver/fake_workspace.go index d2925fb1e38..3af1725c35e 100644 --- a/libs/testserver/fake_workspace.go +++ b/libs/testserver/fake_workspace.go @@ -33,10 +33,13 @@ import ( const ( UserNameTokenPrefix = "dbapi0" ServicePrincipalTokenPrefix = "dbapi1" - UserID = "1000012345" - TestDefaultClusterId = "0123-456789-cluster0" - TestDefaultWarehouseId = "8ec9edc1-db0c-40df-af8d-7580020fe61e" - TestDefaultInstancePoolId = "0123-456789-pool0" + // EventualConsistencyTokenPrefix identifies workspaces that simulate eventual + // consistency: the first GET after a create returns 404 (not yet visible). + EventualConsistencyTokenPrefix = "dbapi2" + UserID = "1000012345" + TestDefaultClusterId = "0123-456789-cluster0" + TestDefaultWarehouseId = "8ec9edc1-db0c-40df-af8d-7580020fe61e" + TestDefaultInstancePoolId = "0123-456789-pool0" ) var TestUser = iam.User{ @@ -124,6 +127,9 @@ type FakeWorkspace struct { mu sync.Mutex url string isServicePrincipal bool + // EventualConsistency enables eventual-consistency simulation: the first + // GET after a create returns 404 (resource not yet visible). + EventualConsistency bool directories map[string]workspace.ObjectInfo files map[string]FileEntry @@ -139,7 +145,7 @@ type FakeWorkspace struct { Schemas map[string]catalog.SchemaInfo Grants map[string][]catalog.PrivilegeAssignment Volumes map[string]catalog.VolumeInfo - Dashboards map[string]fakeDashboard + Dashboards map[string]*EventualValue[*fakeDashboard] PublishedDashboards map[string]dashboards.PublishedDashboard GenieSpaces map[string]dashboards.GenieSpace SqlWarehouses map[string]sql.GetWarehouseResponse @@ -249,8 +255,9 @@ func MapDelete[K comparable, V any](w *FakeWorkspace, collection map[K]V, key K) func NewFakeWorkspace(url, token string) *FakeWorkspace { return &FakeWorkspace{ - url: url, - isServicePrincipal: strings.HasPrefix(token, ServicePrincipalTokenPrefix), + url: url, + isServicePrincipal: strings.HasPrefix(token, ServicePrincipalTokenPrefix), + EventualConsistency: strings.HasPrefix(token, EventualConsistencyTokenPrefix), directories: map[string]workspace.ObjectInfo{ "/Workspace": { ObjectType: "DIRECTORY", @@ -289,7 +296,7 @@ func NewFakeWorkspace(url, token string) *FakeWorkspace { Schemas: map[string]catalog.SchemaInfo{}, RegisteredModels: map[string]catalog.RegisteredModelInfo{}, Volumes: map[string]catalog.VolumeInfo{}, - Dashboards: map[string]fakeDashboard{}, + Dashboards: map[string]*EventualValue[*fakeDashboard]{}, PublishedDashboards: map[string]dashboards.PublishedDashboard{}, GenieSpaces: map[string]dashboards.GenieSpace{}, SqlWarehouses: map[string]sql.GetWarehouseResponse{ diff --git a/libs/testserver/handlers.go b/libs/testserver/handlers.go index ee56cefe3e6..c16e6b4b879 100644 --- a/libs/testserver/handlers.go +++ b/libs/testserver/handlers.go @@ -299,7 +299,7 @@ func AddDefaultHandlers(server *Server) { // Dashboards: server.Handle("GET", "/api/2.0/lakeview/dashboards/{dashboard_id}", func(req Request) any { - return MapGet(req.Workspace, req.Workspace.Dashboards, req.Vars["dashboard_id"]) + return req.Workspace.DashboardGet(req) }) server.Handle("POST", "/api/2.0/lakeview/dashboards", func(req Request) any { return req.Workspace.DashboardCreate(req)