From 49b95043f2c63bc5db4f7423657c2e0e5ba8bd7b Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Tue, 23 Jun 2026 07:38:41 +0000 Subject: [PATCH 1/8] bundle/fuzz: add create-payload parity fuzz test for terraform vs direct Implements the first technique from DECO-25361: generate random job configs and check for differences in the create payload between the terraform and direct deploy engines. Both engines run the same `bundle deploy` pipeline in-process (via testcli) against a testserver, differing only in DATABRICKS_BUNDLE_ENGINE, and the POST /api/2.2/jobs/create body each sends is captured and diffed. Because only the engine differs, shared mutators cancel out and any remaining diff is a genuine engine divergence. The fuzzer already surfaced two real (benign) divergences, documented in DefaultIgnorePaths: - num_workers: 0 is sent explicitly by terraform but dropped by direct (omitempty). - the terraform provider strips the deprecated spark conf "spark.databricks.delta.preview.enabled"; direct forwards it. Run with: go test ./bundle/fuzz -run TestJobCreateParity (FUZZ_SEEDS overrides the seed count; auto-skips when terraform is not provisioned via acceptance/install_terraform.py). --- bundle/fuzz/capture.go | 59 +++++ bundle/fuzz/capture_deploy.go | 145 ++++++++++++ bundle/fuzz/capture_deploy_test.go | 35 +++ bundle/fuzz/compare.go | 204 +++++++++++++++++ bundle/fuzz/compare_test.go | 95 ++++++++ bundle/fuzz/fuzz_test.go | 68 ++++++ bundle/fuzz/generate.go | 349 +++++++++++++++++++++++++++++ bundle/fuzz/generate_test.go | 47 ++++ bundle/fuzz/rand.go | 47 ++++ 9 files changed, 1049 insertions(+) create mode 100644 bundle/fuzz/capture.go create mode 100644 bundle/fuzz/capture_deploy.go create mode 100644 bundle/fuzz/capture_deploy_test.go create mode 100644 bundle/fuzz/compare.go create mode 100644 bundle/fuzz/compare_test.go create mode 100644 bundle/fuzz/fuzz_test.go create mode 100644 bundle/fuzz/generate.go create mode 100644 bundle/fuzz/generate_test.go create mode 100644 bundle/fuzz/rand.go diff --git a/bundle/fuzz/capture.go b/bundle/fuzz/capture.go new file mode 100644 index 0000000000..330f485f82 --- /dev/null +++ b/bundle/fuzz/capture.go @@ -0,0 +1,59 @@ +package fuzz + +import ( + "encoding/json" + "sync" + + "github.com/databricks/cli/libs/testserver" +) + +// jobsCreatePath is the Jobs API route both engines must hit on create. The +// direct engine posts here via the SDK; the terraform provider is expected to +// post here too, and a mismatch (e.g. a different API version) is itself a +// divergence worth surfacing. +const jobsCreatePath = "/api/2.2/jobs/create" + +// CapturedRequest is a single mutating API request observed by the testserver. +type CapturedRequest struct { + Method string + Path string + Body json.RawMessage +} + +// recorder collects request bodies sent to a testserver. It is safe for +// concurrent use because the SDK and terraform may issue requests from multiple +// goroutines. +type recorder struct { + mu sync.Mutex + requests []CapturedRequest +} + +func (r *recorder) callback(req *testserver.Request) { + r.mu.Lock() + defer r.mu.Unlock() + + var body json.RawMessage + if json.Valid(req.Body) { + // Copy: testserver reuses the underlying buffer across requests. + body = append(json.RawMessage(nil), req.Body...) + } + + r.requests = append(r.requests, CapturedRequest{ + Method: req.Method, + Path: req.URL.Path, + Body: body, + }) +} + +// find returns the body of the first recorded request matching method and path. +func (r *recorder) find(method, path string) (json.RawMessage, bool) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, req := range r.requests { + if req.Method == method && req.Path == path { + return req.Body, true + } + } + return nil, false +} diff --git a/bundle/fuzz/capture_deploy.go b/bundle/fuzz/capture_deploy.go new file mode 100644 index 0000000000..6f06487bf3 --- /dev/null +++ b/bundle/fuzz/capture_deploy.go @@ -0,0 +1,145 @@ +package fuzz + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/internal/testcli" + "github.com/databricks/cli/libs/testserver" +) + +const ( + // bundleResourceKey is the map key the generated job is registered under. + bundleResourceKey = "fuzz_job" + fakeToken = "testtoken" +) + +// CaptureJobCreate deploys a bundle containing job through the given engine +// ("direct" or "terraform") and returns the create request body sent to the +// Jobs API. +// +// Both engines run the full `bundle deploy` pipeline against an in-process +// testserver, so the only difference between two captures with different engines +// is the engine itself. That is what makes the resulting payloads directly +// comparable: shared mutators (deployment metadata, presets, ...) are applied +// identically on both sides and cancel out in the diff. +// +// The terraform engine additionally requires DATABRICKS_TF_EXEC_PATH and +// DATABRICKS_TF_CLI_CONFIG_FILE to point at a provisioned terraform binary and +// provider mirror; see RequireTerraform. +func CaptureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, engine string) (json.RawMessage, error) { + rec := &recorder{} + server := testserver.New(t) + server.RequestCallback = rec.callback + testserver.AddDefaultHandlers(server) + + dir := t.TempDir() + if err := writeJobBundle(dir, server.URL, job); err != nil { + return nil, err + } + + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", fakeToken) + t.Setenv("DATABRICKS_BUNDLE_ENGINE", engine) + t.Chdir(dir) + + stdout, stderr, err := testcli.NewRunner(t, ctx, "bundle", "deploy").Run() + if err != nil { + return nil, fmt.Errorf("bundle deploy (engine=%s) failed: %w\nstdout:\n%s\nstderr:\n%s", + engine, err, stdout.String(), stderr.String()) + } + + body, ok := rec.find("POST", jobsCreatePath) + if !ok { + return nil, fmt.Errorf("engine=%s did not POST %s during deploy", engine, jobsCreatePath) + } + return body, nil +} + +// CompareJobEngines deploys job under both engines and returns the create-payload +// differences that are not covered by DefaultIgnorePaths. An empty result means +// the engines produced equivalent create payloads. +func CompareJobEngines(ctx context.Context, t *testing.T, job *resources.Job) ([]Difference, error) { + direct, err := CaptureJobCreate(ctx, t, job, "direct") + if err != nil { + return nil, fmt.Errorf("capturing direct payload: %w", err) + } + terraform, err := CaptureJobCreate(ctx, t, job, "terraform") + if err != nil { + return nil, fmt.Errorf("capturing terraform payload: %w", err) + } + return DiffPayloads(direct, terraform, DefaultIgnorePaths) +} + +// writeJobBundle writes a minimal databricks.yml describing a single job. The +// document is emitted as JSON, which is valid YAML, so we can reuse the job's +// own JSON marshaling (which honors ForceSendFields) without a YAML dependency. +func writeJobBundle(dir, host string, job *resources.Job) error { + jobJSON, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("marshaling job: %w", err) + } + + var jobMap map[string]any + if err := json.Unmarshal(jobJSON, &jobMap); err != nil { + return fmt.Errorf("unmarshaling job: %w", err) + } + + doc := map[string]any{ + "bundle": map[string]any{"name": "fuzz"}, + "workspace": map[string]any{"host": host}, + "resources": map[string]any{ + "jobs": map[string]any{bundleResourceKey: jobMap}, + }, + } + + data, err := json.MarshalIndent(doc, "", " ") + if err != nil { + return fmt.Errorf("marshaling bundle: %w", err) + } + + return os.WriteFile(filepath.Join(dir, "databricks.yml"), data, 0o600) +} + +// RequireTerraform points the terraform engine at the binary and provider mirror +// provisioned by acceptance/install_terraform.py into /build, and skips the +// test when they are absent so the suite still runs where terraform is not set up. +func RequireTerraform(t testing.TB) { + buildDir := filepath.Join(repoRoot(t), "build") + execPath := filepath.Join(buildDir, "terraform") + cfgFile := filepath.Join(buildDir, ".terraformrc") + + if _, err := os.Stat(execPath); err != nil { + t.Skipf("terraform not provisioned (%s); run: python3 acceptance/install_terraform.py --targetdir build", execPath) + } + + t.Setenv("DATABRICKS_TF_EXEC_PATH", execPath) + t.Setenv("DATABRICKS_TF_CLI_CONFIG_FILE", cfgFile) + t.Setenv("TF_CLI_CONFIG_FILE", cfgFile) + // Terraform phones home to checkpoint-api.hashicorp.com otherwise; disable it + // so the testserver/network isn't hit. See acceptance_test.go. + t.Setenv("CHECKPOINT_DISABLE", "1") +} + +// repoRoot returns the repository root by walking up from the current directory. +func repoRoot(t testing.TB) string { + dir, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %s", err) + } + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + parent := filepath.Dir(dir) + if parent == dir { + t.Fatal("could not locate repo root (go.mod not found)") + } + dir = parent + } +} diff --git a/bundle/fuzz/capture_deploy_test.go b/bundle/fuzz/capture_deploy_test.go new file mode 100644 index 0000000000..2518265d75 --- /dev/null +++ b/bundle/fuzz/capture_deploy_test.go @@ -0,0 +1,35 @@ +package fuzz + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCaptureJobCreateDirect(t *testing.T) { + job := GenerateJob(newRNG(1)) + + body, err := CaptureJobCreate(t.Context(), t, job, "direct") + require.NoError(t, err) + require.NotEmpty(t, body) + + var payload map[string]any + require.NoError(t, json.Unmarshal(body, &payload)) + assert.Equal(t, job.Name, payload["name"]) + assert.Contains(t, payload, "tasks") +} + +func TestCaptureJobCreateTerraform(t *testing.T) { + RequireTerraform(t) + job := GenerateJob(newRNG(1)) + + body, err := CaptureJobCreate(t.Context(), t, job, "terraform") + require.NoError(t, err) + require.NotEmpty(t, body) + + var payload map[string]any + require.NoError(t, json.Unmarshal(body, &payload)) + assert.Equal(t, job.Name, payload["name"]) +} diff --git a/bundle/fuzz/compare.go b/bundle/fuzz/compare.go new file mode 100644 index 0000000000..48b7d3e648 --- /dev/null +++ b/bundle/fuzz/compare.go @@ -0,0 +1,204 @@ +package fuzz + +import ( + "bytes" + "encoding/json" + "fmt" + "regexp" + "slices" + "strconv" + "strings" +) + +// Difference is a single mismatch between the two engines' create payloads, +// located by a JSON-ish path (e.g. "tasks[0].new_cluster.num_workers"). +type Difference struct { + Path string + Direct any + Terraform any +} + +func (d Difference) String() string { + return fmt.Sprintf("%s: direct=%s terraform=%s", d.Path, render(d.Direct), render(d.Terraform)) +} + +// missing marks a value that is absent on one side. +type missing struct{} + +func render(v any) string { + if _, ok := v.(missing); ok { + return "" + } + b, err := json.Marshal(v) + if err != nil { + return fmt.Sprintf("%v", v) + } + return string(b) +} + +// DiffPayloads decodes both create payloads and returns every difference whose +// path is not explicitly ignored. ignorePaths are matched exactly against the +// rendered path, with "[*]" standing in for any slice index. +func DiffPayloads(direct, terraform json.RawMessage, ignorePaths []string) ([]Difference, error) { + d, err := decode(direct) + if err != nil { + return nil, fmt.Errorf("decoding direct payload: %w", err) + } + tf, err := decode(terraform) + if err != nil { + return nil, fmt.Errorf("decoding terraform payload: %w", err) + } + + var diffs []Difference + diffValue("", d, tf, &diffs) + + ignore := make(map[string]bool, len(ignorePaths)) + for _, p := range ignorePaths { + ignore[p] = true + } + + filtered := diffs[:0] + for _, diff := range diffs { + if !ignore[normalizePath(diff.Path)] { + filtered = append(filtered, diff) + } + } + return filtered, nil +} + +// decode unmarshals JSON using UseNumber so large int64 values (e.g. job ids, +// spark_context_id) are not corrupted by float64 rounding. See the encoding rule +// in the repo style guide. +func decode(raw json.RawMessage) (any, error) { + if len(raw) == 0 { + return nil, nil + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + var v any + if err := dec.Decode(&v); err != nil { + return nil, err + } + return v, nil +} + +func diffValue(path string, a, b any, diffs *[]Difference) { + switch av := a.(type) { + case map[string]any: + bv, ok := b.(map[string]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + keys := unionKeys(av, bv) + for _, k := range keys { + achild, aok := av[k] + bchild, bok := bv[k] + child := joinKey(path, k) + switch { + case aok && bok: + diffValue(child, achild, bchild, diffs) + case aok: + *diffs = append(*diffs, Difference{Path: child, Direct: achild, Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bchild}) + } + } + case []any: + bv, ok := b.([]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + n := max(len(av), len(bv)) + for i := range n { + child := fmt.Sprintf("%s[%d]", path, i) + switch { + case i < len(av) && i < len(bv): + diffValue(child, av[i], bv[i], diffs) + case i < len(av): + *diffs = append(*diffs, Difference{Path: child, Direct: av[i], Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bv[i]}) + } + } + default: + if !scalarEqual(a, b) { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + } + } +} + +// scalarEqual compares two JSON scalars. json.Number is compared by its string +// form so 1 and 1.0 don't masquerade as equal across engines. +func scalarEqual(a, b any) bool { + an, aok := a.(json.Number) + bn, bok := b.(json.Number) + if aok && bok { + return an.String() == bn.String() + } + return a == b +} + +func unionKeys(a, b map[string]any) []string { + seen := map[string]bool{} + var keys []string + for k := range a { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + for k := range b { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + slices.Sort(keys) + return keys +} + +func joinKey(path, key string) string { + // Map keys can themselves contain dots or brackets (e.g. spark_conf entries + // like "spark.databricks.delta.preview.enabled"). Render those as bracketed, + // quoted segments so the path stays unambiguous and ignore entries can target + // a single key. + if key == "" || strings.ContainsAny(key, `.[]"`) { + return path + "[" + strconv.Quote(key) + "]" + } + if path == "" { + return key + } + return path + "." + key +} + +// indexRe matches numeric slice indices like "[12]" but not quoted string keys +// like ["spark.x"]. +var indexRe = regexp.MustCompile(`\[\d+\]`) + +// normalizePath replaces concrete slice indices with [*] so a single ignore +// entry can cover every element of a slice. +func normalizePath(path string) string { + return indexRe.ReplaceAllString(path, "[*]") +} + +// DefaultIgnorePaths lists create-payload paths that legitimately differ between +// the engines and are not parity bugs. Keep this list small and well-justified; +// every entry is a known, intentional divergence. +var DefaultIgnorePaths = []string{ + // num_workers is a zero-able int: when a cluster has num_workers: 0 the + // terraform provider serializes it explicitly while the direct engine drops + // it via omitempty. The backend treats absent and 0 identically, so this is a + // benign serialization difference. See the update_single_node acceptance test + // ("issues with zero conversion"). + "tasks[*].new_cluster.num_workers", + "job_clusters[*].new_cluster.num_workers", + + // The terraform provider strips the deprecated/ignored spark conf + // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while + // the direct engine forwards it verbatim. The backend ignores the key either + // way, so this is a benign provider-side filter rather than a parity bug. + `tasks[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, + `job_clusters[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, +} diff --git a/bundle/fuzz/compare_test.go b/bundle/fuzz/compare_test.go new file mode 100644 index 0000000000..ec5818468b --- /dev/null +++ b/bundle/fuzz/compare_test.go @@ -0,0 +1,95 @@ +package fuzz + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiffPayloads(t *testing.T) { + tests := []struct { + name string + direct string + terraform string + ignore []string + want []string + }{ + { + name: "identical", + direct: `{"name":"a","tasks":[{"task_key":"t"}]}`, + terraform: `{"name":"a","tasks":[{"task_key":"t"}]}`, + want: nil, + }, + { + name: "scalar mismatch", + direct: `{"name":"a"}`, + terraform: `{"name":"b"}`, + want: []string{"name"}, + }, + { + name: "missing on terraform", + direct: `{"name":"a","queue":{"enabled":true}}`, + terraform: `{"name":"a"}`, + want: []string{"queue"}, + }, + { + name: "missing on direct", + direct: `{"name":"a"}`, + terraform: `{"name":"a","max_concurrent_runs":1}`, + want: []string{"max_concurrent_runs"}, + }, + { + name: "nested slice element mismatch", + direct: `{"tasks":[{"task_key":"t","timeout_seconds":1}]}`, + terraform: `{"tasks":[{"task_key":"t","timeout_seconds":2}]}`, + want: []string{"tasks[0].timeout_seconds"}, + }, + { + name: "slice length mismatch", + direct: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + terraform: `{"tasks":[{"task_key":"a"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "number 1 vs 1.0 differ", + direct: `{"n":1}`, + terraform: `{"n":1.0}`, + want: []string{"n"}, + }, + { + name: "ignored path", + direct: `{"tasks":[{"timeout_seconds":1}]}`, + terraform: `{"tasks":[{"timeout_seconds":2}]}`, + ignore: []string{"tasks[*].timeout_seconds"}, + want: nil, + }, + { + name: "dotted map key is bracket-quoted", + direct: `{"spark_conf":{"spark.x.y":"1"}}`, + terraform: `{"spark_conf":{}}`, + want: []string{`spark_conf["spark.x.y"]`}, + }, + { + name: "dotted map key can be ignored", + direct: `{"c":{"spark_conf":{"spark.x.y":"1"}}}`, + terraform: `{"c":{"spark_conf":{}}}`, + ignore: []string{`c.spark_conf["spark.x.y"]`}, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + diffs, err := DiffPayloads(json.RawMessage(tt.direct), json.RawMessage(tt.terraform), tt.ignore) + require.NoError(t, err) + + var paths []string + for _, d := range diffs { + paths = append(paths, d.Path) + } + assert.ElementsMatch(t, tt.want, paths) + }) + } +} diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go new file mode 100644 index 0000000000..55e52eb0bb --- /dev/null +++ b/bundle/fuzz/fuzz_test.go @@ -0,0 +1,68 @@ +package fuzz + +import ( + "encoding/json" + "os" + "strconv" + "testing" + + "github.com/stretchr/testify/require" +) + +// defaultParitySeeds is the number of random jobs TestJobCreateParity checks by +// default. Each seed runs two real deploys (direct + terraform), so the count is +// kept modest; override with FUZZ_SEEDS for a deeper local run. +const defaultParitySeeds = 20 + +// TestJobCreateParity is the first DECO-25361 technique: for many random job +// configs, assert the terraform and direct engines produce equivalent create +// payloads. On divergence it prints the seed and the generated job so the failure +// can be reproduced and inspected. +func TestJobCreateParity(t *testing.T) { + RequireTerraform(t) + + seeds := defaultParitySeeds + if v := os.Getenv("FUZZ_SEEDS"); v != "" { + n, err := strconv.Atoi(v) + require.NoErrorf(t, err, "invalid FUZZ_SEEDS=%q", v) + seeds = n + } + + for seed := int64(0); seed < int64(seeds); seed++ { + t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { + checkJobParity(t, seed) + }) + } +} + +// FuzzJobCreateParity exposes the same parity check to Go's native fuzzer +// (`go test -fuzz=FuzzJobCreateParity`). Note each input runs two real deploys, +// so this is intended for ad-hoc deep runs, not the default `go test` path. +func FuzzJobCreateParity(f *testing.F) { + RequireTerraform(f) + for seed := int64(0); seed < 5; seed++ { + f.Add(seed) + } + f.Fuzz(func(t *testing.T, seed int64) { + checkJobParity(t, seed) + }) +} + +// checkJobParity generates the job for seed, deploys it under both engines, and +// fails the test with reproduction details if the create payloads diverge. +func checkJobParity(t *testing.T, seed int64) { + t.Helper() + job := GenerateJob(newRNG(seed)) + + diffs, err := CompareJobEngines(t.Context(), t, job) + require.NoErrorf(t, err, "seed %d", seed) + + if len(diffs) > 0 { + jobJSON, _ := json.MarshalIndent(job, "", " ") + t.Errorf("seed %d: terraform/direct create payloads diverge (%d differences):", seed, len(diffs)) + for _, d := range diffs { + t.Errorf(" %s", d) + } + t.Logf("reproduce with GenerateJob(newRNG(%d)):\n%s", seed, jobJSON) + } +} diff --git a/bundle/fuzz/generate.go b/bundle/fuzz/generate.go new file mode 100644 index 0000000000..a7c5e6056f --- /dev/null +++ b/bundle/fuzz/generate.go @@ -0,0 +1,349 @@ +// Package fuzz provides randomized generators and harnesses that compare how the +// terraform and direct deploy engines translate the same bundle resource into an +// API create payload. See DECO-25361. +// +// The first technique implemented here generates a random resource config and +// checks for differences in the create payload between the terraform and direct +// engines. Generators are seeded so that any divergence found by the fuzz driver +// can be reproduced from the printed seed. +package fuzz + +import ( + "fmt" + "math/rand/v2" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// Value pools are intentionally small and valid-looking: the goal is to exercise +// the engines' config->payload translation across many field combinations, not to +// stress the API with invalid values (which the testserver would reject before we +// can compare payloads). +var ( + sparkVersions = []string{"13.3.x-scala2.12", "14.3.x-scala2.12", "15.4.x-scala2.12", "16.4.x-scala2.12"} + nodeTypeIDs = []string{"i3.xlarge", "m5.large", "r5.xlarge", "Standard_DS3_v2"} + timezones = []string{"UTC", "America/Los_Angeles", "Europe/Amsterdam"} + cronExprs = []string{"0 0 12 * * ?", "0 15 10 ? * MON-FRI", "0 0/30 * * * ?"} + pauseStatuses = []jobs.PauseStatus{jobs.PauseStatusPaused, jobs.PauseStatusUnpaused} + performance = []jobs.PerformanceTarget{jobs.PerformanceTargetPerformanceOptimized, jobs.PerformanceTargetStandard} + timeUnits = []string{"HOURS", "DAYS", "WEEKS"} + healthMetrics = []string{"RUN_DURATION_SECONDS", "STREAMING_BACKLOG_BYTES", "STREAMING_BACKLOG_RECORDS"} + conditionOps = []string{"EQUAL_TO", "NOT_EQUAL", "GREATER_THAN", "LESS_THAN_OR_EQUAL"} + runIfs = []string{"ALL_SUCCESS", "AT_LEAST_ONE_SUCCESS", "NONE_FAILED", "ALL_DONE"} + gitProviders = []jobs.GitProvider{jobs.GitProviderGitHub, jobs.GitProviderGitLab, jobs.GitProviderAzureDevOpsServices} +) + +// GenerateJob builds a random, well-formed job config driven entirely by rng, so +// the same seed always produces the same job. It deliberately favors fields whose +// translation tends to differ between engines (tasks, clusters, schedules, +// notifications, tags, zero-able scalars). +func GenerateJob(rng *rand.Rand) *resources.Job { + job := &resources.Job{} + job.Name = randName(rng, "job") + + if chance(rng, 0.5) { + job.Description = randSentence(rng) + } + if chance(rng, 0.4) { + job.MaxConcurrentRuns = rng.IntN(10) + 1 + } + if chance(rng, 0.4) { + job.TimeoutSeconds = rng.IntN(7200) + } + if chance(rng, 0.3) { + job.PerformanceTarget = oneOf(rng, performance) + } + if chance(rng, 0.5) { + job.Tags = randTags(rng) + } + if chance(rng, 0.3) { + job.GitSource = randGitSource(rng) + } + + randScheduling(rng, job) + + if chance(rng, 0.3) { + job.EmailNotifications = randEmailNotifications(rng) + } + if chance(rng, 0.2) { + job.WebhookNotifications = randWebhookNotifications(rng) + } + if chance(rng, 0.3) { + job.NotificationSettings = &jobs.JobNotificationSettings{ + NoAlertForCanceledRuns: chance(rng, 0.5), + NoAlertForSkippedRuns: chance(rng, 0.5), + } + } + if chance(rng, 0.3) { + job.Health = randHealth(rng) + } + if chance(rng, 0.3) { + job.Parameters = randParameters(rng) + } + if chance(rng, 0.3) { + job.Queue = &jobs.QueueSettings{Enabled: chance(rng, 0.5)} + } + + // Generate shared job clusters first so tasks can reference them by key. + var jobClusterKeys []string + if chance(rng, 0.5) { + n := rng.IntN(2) + 1 + for i := range n { + key := fmt.Sprintf("cluster_%d", i) + jobClusterKeys = append(jobClusterKeys, key) + job.JobClusters = append(job.JobClusters, jobs.JobCluster{ + JobClusterKey: key, + NewCluster: randClusterSpec(rng), + }) + } + } + + nTasks := rng.IntN(3) + 1 + var taskKeys []string + for i := range nTasks { + task := randTask(rng, i, jobClusterKeys) + // Randomly chain dependencies onto previously generated tasks. + if len(taskKeys) > 0 && chance(rng, 0.4) { + dep := taskKeys[rng.IntN(len(taskKeys))] + task.DependsOn = []jobs.TaskDependency{{TaskKey: dep}} + if chance(rng, 0.5) { + task.RunIf = jobs.RunIf(oneOf(rng, runIfs)) + } + } + taskKeys = append(taskKeys, task.TaskKey) + job.Tasks = append(job.Tasks, task) + } + + return job +} + +// randScheduling sets at most one of schedule/trigger/continuous, which are +// mutually exclusive ways to launch a job. +func randScheduling(rng *rand.Rand, job *resources.Job) { + switch rng.IntN(5) { + case 0: + job.Schedule = &jobs.CronSchedule{ + QuartzCronExpression: oneOf(rng, cronExprs), + TimezoneId: oneOf(rng, timezones), + PauseStatus: oneOf(rng, pauseStatuses), + } + case 1: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + Periodic: &jobs.PeriodicTriggerConfiguration{ + Interval: rng.IntN(12) + 1, + Unit: jobs.PeriodicTriggerConfigurationTimeUnit(oneOf(rng, timeUnits)), + }, + } + case 2: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + FileArrival: &jobs.FileArrivalTriggerConfiguration{ + Url: "s3://" + randWord(rng) + "/" + randWord(rng), + }, + } + case 3: + job.Continuous = &jobs.Continuous{PauseStatus: oneOf(rng, pauseStatuses)} + default: + // no scheduling + } +} + +func randTask(rng *rand.Rand, idx int, jobClusterKeys []string) jobs.Task { + task := jobs.Task{TaskKey: fmt.Sprintf("task_%d", idx)} + + // Use absolute workspace paths with source=WORKSPACE so the generated bundle + // never depends on local files existing on disk (which deploy would reject). + // condition_task needs no compute, so it is handled separately below. + needsCompute := true + switch rng.IntN(4) { + case 0: + task.NotebookTask = &jobs.NotebookTask{ + NotebookPath: "/Workspace/Users/test/" + randName(rng, "nb"), + Source: jobs.SourceWorkspace, + } + case 1: + task.SparkPythonTask = &jobs.SparkPythonTask{ + PythonFile: "/Workspace/Users/test/" + randName(rng, "main") + ".py", + Source: jobs.SourceWorkspace, + } + case 2: + task.PythonWheelTask = &jobs.PythonWheelTask{ + PackageName: randName(rng, "pkg"), + EntryPoint: "main", + } + case 3: + task.ConditionTask = &jobs.ConditionTask{ + Left: randWord(rng), + Op: jobs.ConditionTaskOp(oneOf(rng, conditionOps)), + Right: randWord(rng), + } + needsCompute = false + } + + if needsCompute { + assignCompute(rng, &task, jobClusterKeys) + if chance(rng, 0.4) { + task.Libraries = randLibraries(rng) + } + } + + if chance(rng, 0.3) { + task.TimeoutSeconds = rng.IntN(3600) + } + if chance(rng, 0.3) { + task.MaxRetries = rng.IntN(5) + task.MinRetryIntervalMillis = rng.IntN(60000) + task.RetryOnTimeout = chance(rng, 0.5) + } + return task +} + +// assignCompute attaches exactly one compute source, which notebook/python/wheel +// tasks require: a shared job cluster (when available), a brand-new cluster, or an +// existing cluster id. +func assignCompute(rng *rand.Rand, task *jobs.Task, jobClusterKeys []string) { + const ( + computeNew = iota + computeExisting + computeShared + ) + options := []int{computeNew, computeExisting} + if len(jobClusterKeys) > 0 { + options = append(options, computeShared) + } + switch oneOf(rng, options) { + case computeNew: + spec := randClusterSpec(rng) + task.NewCluster = &spec + case computeExisting: + task.ExistingClusterId = randName(rng, "cluster") + case computeShared: + task.JobClusterKey = oneOf(rng, jobClusterKeys) + } +} + +func randClusterSpec(rng *rand.Rand) compute.ClusterSpec { + spec := compute.ClusterSpec{ + SparkVersion: oneOf(rng, sparkVersions), + NodeTypeId: oneOf(rng, nodeTypeIDs), + } + if chance(rng, 0.5) { + spec.NumWorkers = rng.IntN(8) + } else { + spec.Autoscale = &compute.AutoScale{ + MinWorkers: 1, + MaxWorkers: rng.IntN(8) + 2, + } + } + if chance(rng, 0.4) { + spec.SparkConf = map[string]string{ + "spark.databricks.delta.preview.enabled": "true", + "spark.speculation": fmt.Sprintf("%t", chance(rng, 0.5)), + } + } + if chance(rng, 0.3) { + spec.CustomTags = randTags(rng) + } + if chance(rng, 0.3) { + spec.SparkEnvVars = map[string]string{"PYSPARK_PYTHON": "/databricks/python3/bin/python3"} + } + if chance(rng, 0.3) { + spec.DriverNodeTypeId = oneOf(rng, nodeTypeIDs) + } + return spec +} + +func randGitSource(rng *rand.Rand) *jobs.GitSource { + src := &jobs.GitSource{ + GitProvider: oneOf(rng, gitProviders), + GitUrl: "https://example.com/" + randWord(rng) + "/" + randWord(rng) + ".git", + } + switch rng.IntN(3) { + case 0: + src.GitBranch = oneOf(rng, []string{"main", "develop", "release"}) + case 1: + src.GitTag = "v" + fmt.Sprintf("%d.%d.0", rng.IntN(5), rng.IntN(10)) + case 2: + src.GitCommit = fmt.Sprintf("%040x", rng.Int64()) + } + return src +} + +func randEmailNotifications(rng *rand.Rand) *jobs.JobEmailNotifications { + email := randWord(rng) + "@example.com" + n := &jobs.JobEmailNotifications{NoAlertForSkippedRuns: chance(rng, 0.5)} + if chance(rng, 0.6) { + n.OnFailure = []string{email} + } + if chance(rng, 0.4) { + n.OnSuccess = []string{email} + } + if chance(rng, 0.3) { + n.OnStart = []string{email} + } + return n +} + +func randWebhookNotifications(rng *rand.Rand) *jobs.WebhookNotifications { + hook := []jobs.Webhook{{Id: randName(rng, "hook")}} + n := &jobs.WebhookNotifications{} + if chance(rng, 0.6) { + n.OnFailure = hook + } + if chance(rng, 0.4) { + n.OnSuccess = hook + } + return n +} + +func randHealth(rng *rand.Rand) *jobs.JobsHealthRules { + return &jobs.JobsHealthRules{ + Rules: []jobs.JobsHealthRule{ + { + Metric: jobs.JobsHealthMetric(oneOf(rng, healthMetrics)), + Op: jobs.JobsHealthOperatorGreaterThan, + Value: int64(rng.IntN(3600) + 1), + }, + }, + } +} + +func randLibraries(rng *rand.Rand) []compute.Library { + n := rng.IntN(2) + 1 + libs := make([]compute.Library, 0, n) + for range n { + switch rng.IntN(3) { + case 0: + libs = append(libs, compute.Library{Pypi: &compute.PythonPyPiLibrary{Package: randWord(rng)}}) + case 1: + libs = append(libs, compute.Library{Maven: &compute.MavenLibrary{Coordinates: "org.example:" + randWord(rng) + ":1.0.0"}}) + case 2: + libs = append(libs, compute.Library{Whl: "/Workspace/Users/test/" + randName(rng, "lib") + ".whl"}) + } + } + return libs +} + +func randParameters(rng *rand.Rand) []jobs.JobParameterDefinition { + n := rng.IntN(3) + 1 + params := make([]jobs.JobParameterDefinition, 0, n) + for i := range n { + params = append(params, jobs.JobParameterDefinition{ + Name: fmt.Sprintf("param_%d", i), + Default: randWord(rng), + }) + } + return params +} + +func randTags(rng *rand.Rand) map[string]string { + n := rng.IntN(3) + 1 + tags := make(map[string]string, n) + for i := range n { + tags[fmt.Sprintf("tag_%d", i)] = randWord(rng) + } + return tags +} diff --git a/bundle/fuzz/generate_test.go b/bundle/fuzz/generate_test.go new file mode 100644 index 0000000000..524e84864c --- /dev/null +++ b/bundle/fuzz/generate_test.go @@ -0,0 +1,47 @@ +package fuzz + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGenerateJobIsDeterministic(t *testing.T) { + a := GenerateJob(newRNG(42)) + b := GenerateJob(newRNG(42)) + assert.Equal(t, a, b, "same seed must produce identical job") +} + +func TestGenerateJobIsWellFormed(t *testing.T) { + for seed := int64(0); seed < 200; seed++ { + job := GenerateJob(newRNG(seed)) + require.NotEmptyf(t, job.Name, "seed %d: job must have a name", seed) + require.NotEmptyf(t, job.Tasks, "seed %d: job must have at least one task", seed) + + clusterKeys := map[string]bool{} + for _, jc := range job.JobClusters { + clusterKeys[jc.JobClusterKey] = true + } + + taskKeys := map[string]bool{} + for _, task := range job.Tasks { + require.NotEmptyf(t, task.TaskKey, "seed %d: task must have a key", seed) + taskKeys[task.TaskKey] = true + + // A task referencing a job cluster must reference one we generated. + if task.JobClusterKey != "" { + assert.Containsf(t, clusterKeys, task.JobClusterKey, + "seed %d: task %q references unknown job cluster %q", seed, task.TaskKey, task.JobClusterKey) + } + } + + // Every dependency must point at a task that exists in this job. + for _, task := range job.Tasks { + for _, dep := range task.DependsOn { + assert.Containsf(t, taskKeys, dep.TaskKey, + "seed %d: task %q depends on unknown task %q", seed, task.TaskKey, dep.TaskKey) + } + } + } +} diff --git a/bundle/fuzz/rand.go b/bundle/fuzz/rand.go new file mode 100644 index 0000000000..529e4da115 --- /dev/null +++ b/bundle/fuzz/rand.go @@ -0,0 +1,47 @@ +package fuzz + +import ( + "fmt" + "math/rand/v2" + "strings" +) + +var words = []string{ + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa", +} + +// newRNG returns a deterministic RNG for the given seed, so any job the fuzzer +// flags can be regenerated from the printed seed alone. +func newRNG(seed int64) *rand.Rand { + return rand.New(rand.NewPCG(uint64(seed), 0)) +} + +// chance returns true with probability p (0..1). +func chance(rng *rand.Rand, p float64) bool { + return rng.Float64() < p +} + +// oneOf returns a random element of s. s must be non-empty. +func oneOf[T any](rng *rand.Rand, s []T) T { + return s[rng.IntN(len(s))] +} + +func randWord(rng *rand.Rand) string { + return oneOf(rng, words) +} + +// randName returns a deterministic-but-varied identifier with the given prefix, +// e.g. "job_alpha_4271". +func randName(rng *rand.Rand, prefix string) string { + return fmt.Sprintf("%s_%s_%d", prefix, randWord(rng), rng.IntN(10000)) +} + +func randSentence(rng *rand.Rand) string { + n := rng.IntN(4) + 2 + parts := make([]string, 0, n) + for range n { + parts = append(parts, randWord(rng)) + } + return strings.Join(parts, " ") +} From 7b55664db1e56818a3d7e72dc8cb511a8e7e6e2e Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Tue, 23 Jun 2026 08:06:00 +0000 Subject: [PATCH 2/8] bundle/fuzz: fix lint (intrange, perfsprint) and correct num_workers ignore Address golangci-lint failures (intrange loops, strconv.FormatBool over fmt.Sprintf) and tighten the create-payload ignore list: drop the dead job_clusters num_workers entry (those are at parity) and document the task-level num_workers divergence as a real CLI gap to fix separately. --- bundle/fuzz/compare.go | 14 ++++++++------ bundle/fuzz/fuzz_test.go | 4 ++-- bundle/fuzz/generate.go | 3 ++- bundle/fuzz/generate_test.go | 2 +- 4 files changed, 13 insertions(+), 10 deletions(-) diff --git a/bundle/fuzz/compare.go b/bundle/fuzz/compare.go index 48b7d3e648..e893ab443d 100644 --- a/bundle/fuzz/compare.go +++ b/bundle/fuzz/compare.go @@ -187,13 +187,15 @@ func normalizePath(path string) string { // the engines and are not parity bugs. Keep this list small and well-justified; // every entry is a known, intentional divergence. var DefaultIgnorePaths = []string{ - // num_workers is a zero-able int: when a cluster has num_workers: 0 the - // terraform provider serializes it explicitly while the direct engine drops - // it via omitempty. The backend treats absent and 0 identically, so this is a - // benign serialization difference. See the update_single_node acceptance test - // ("issues with zero conversion"). + // A single-node task cluster (num_workers: 0, no autoscale) diverges: the + // terraform provider sends num_workers: 0 while the direct engine omits it. + // JobClustersFixups.initializeNumWorkers force-sends num_workers for + // job_clusters but is NOT applied to task-level new_cluster, so the fix-up + // only covers job_clusters (those are at parity and need no ignore here). + // This is a real CLI gap surfaced by the fuzzer, tracked separately; ignore + // it here so the fuzz suite stays green until the fix-up is extended to task + // clusters. "tasks[*].new_cluster.num_workers", - "job_clusters[*].new_cluster.num_workers", // The terraform provider strips the deprecated/ignored spark conf // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go index 55e52eb0bb..ace7a5efd3 100644 --- a/bundle/fuzz/fuzz_test.go +++ b/bundle/fuzz/fuzz_test.go @@ -28,7 +28,7 @@ func TestJobCreateParity(t *testing.T) { seeds = n } - for seed := int64(0); seed < int64(seeds); seed++ { + for seed := range int64(seeds) { t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { checkJobParity(t, seed) }) @@ -40,7 +40,7 @@ func TestJobCreateParity(t *testing.T) { // so this is intended for ad-hoc deep runs, not the default `go test` path. func FuzzJobCreateParity(f *testing.F) { RequireTerraform(f) - for seed := int64(0); seed < 5; seed++ { + for seed := range int64(5) { f.Add(seed) } f.Fuzz(func(t *testing.T, seed int64) { diff --git a/bundle/fuzz/generate.go b/bundle/fuzz/generate.go index a7c5e6056f..98db7a70f5 100644 --- a/bundle/fuzz/generate.go +++ b/bundle/fuzz/generate.go @@ -11,6 +11,7 @@ package fuzz import ( "fmt" "math/rand/v2" + "strconv" "github.com/databricks/cli/bundle/config/resources" "github.com/databricks/databricks-sdk-go/service/compute" @@ -241,7 +242,7 @@ func randClusterSpec(rng *rand.Rand) compute.ClusterSpec { if chance(rng, 0.4) { spec.SparkConf = map[string]string{ "spark.databricks.delta.preview.enabled": "true", - "spark.speculation": fmt.Sprintf("%t", chance(rng, 0.5)), + "spark.speculation": strconv.FormatBool(chance(rng, 0.5)), } } if chance(rng, 0.3) { diff --git a/bundle/fuzz/generate_test.go b/bundle/fuzz/generate_test.go index 524e84864c..f7a797e8f5 100644 --- a/bundle/fuzz/generate_test.go +++ b/bundle/fuzz/generate_test.go @@ -14,7 +14,7 @@ func TestGenerateJobIsDeterministic(t *testing.T) { } func TestGenerateJobIsWellFormed(t *testing.T) { - for seed := int64(0); seed < 200; seed++ { + for seed := range int64(200) { job := GenerateJob(newRNG(seed)) require.NotEmptyf(t, job.Name, "seed %d: job must have a name", seed) require.NotEmptyf(t, job.Tasks, "seed %d: job must have at least one task", seed) From 2d8ea6d252c5f2b67eeae81404302ac99b9fd9bf Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Tue, 23 Jun 2026 09:46:58 +0000 Subject: [PATCH 3/8] bundle/fuzz: wire parity tests into CI and harden harness - Add a `test-fuzz` task and a nightly CI job that provisions terraform and runs the create-payload parity tests. They previously always skipped because terraform was never provisioned in the test path. - Ignore repo-root build/ so the provisioned terraform binary and provider mirror are not accidentally committed. - Skip cleanly when build/ is only partially provisioned (missing provider mirror or .terraformrc) instead of failing mid-deploy. - Document that the harness covers jobs only for now (DECO-25361). --- .github/workflows/push.yml | 35 +++++++++++++++++++++++++++++++++++ .gitignore | 4 ++++ Taskfile.yml | 15 +++++++++++++++ bundle/fuzz/capture_deploy.go | 10 ++++++++-- bundle/fuzz/generate.go | 6 ++++++ 5 files changed, 68 insertions(+), 2 deletions(-) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index f80cfba7ad..3b2720ba3a 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -370,6 +370,41 @@ jobs: run: | go tool -modfile=tools/task/go.mod task test-sandbox + test-fuzz: + needs: + - cleanups + + # The terraform/direct create-payload parity tests run two real `bundle deploy` + # invocations per seed, so they are too slow for every PR and too noisy to gate + # the merge queue. Run them on the nightly schedule to catch engine drift; not + # part of test-result for that reason. + if: ${{ github.event_name == 'schedule' }} + name: "task test-fuzz" + runs-on: + group: databricks-protected-runner-group-large + labels: linux-ubuntu-latest-large + + defaults: + run: + shell: bash + + permissions: + id-token: write + contents: read + + steps: + - name: Checkout repository and submodules + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Setup build environment + uses: ./.github/actions/setup-build-environment + with: + cache-key: test-fuzz + + - name: Run tests + run: | + go tool -modfile=tools/task/go.mod task test-fuzz + # This job groups the result of all the above test jobs. # It is a required check, so it blocks auto-merge and the merge queue. # diff --git a/.gitignore b/.gitignore index 116ec5e976..9da32ca035 100644 --- a/.gitignore +++ b/.gitignore @@ -58,6 +58,10 @@ tools/testmask/testmask # Release artifacts dist/ +# Terraform binary + provider mirror provisioned by acceptance/install_terraform.py +# for the bundle/fuzz parity tests (see Taskfile `test-fuzz`). +/build/ + # Local development notes, tmp /pr-* /tmp/ diff --git a/Taskfile.yml b/Taskfile.yml index fe6966653b..b980fa32e6 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -678,6 +678,21 @@ tasks: --packages ./acceptance/... \ -- -timeout=${LOCAL_TIMEOUT:-30m} -run "TestAccept/cmd/sandbox" + test-fuzz: + desc: Run terraform/direct create-payload parity fuzz tests (provisions terraform) + sources: + - bundle/fuzz/** + cmds: + # The parity harness expects terraform + the provider mirror at /build; + # RequireTerraform skips when it's absent, so provision it first. + - python3 acceptance/install_terraform.py --targetdir build + - | + {{.GO_TOOL}} gotestsum \ + --format ${GOTESTSUM_FORMAT:-pkgname-and-test-fails} \ + --no-summary=skipped \ + --packages ./bundle/fuzz/... \ + -- -timeout=${LOCAL_TIMEOUT:-30m} + # --- Integration tests --- integration: diff --git a/bundle/fuzz/capture_deploy.go b/bundle/fuzz/capture_deploy.go index 6f06487bf3..0efeaa9ed1 100644 --- a/bundle/fuzz/capture_deploy.go +++ b/bundle/fuzz/capture_deploy.go @@ -114,8 +114,14 @@ func RequireTerraform(t testing.TB) { execPath := filepath.Join(buildDir, "terraform") cfgFile := filepath.Join(buildDir, ".terraformrc") - if _, err := os.Stat(execPath); err != nil { - t.Skipf("terraform not provisioned (%s); run: python3 acceptance/install_terraform.py --targetdir build", execPath) + // install_terraform.py provisions all three together; a partial build/ (e.g. + // the binary without the provider mirror or .terraformrc) would otherwise fail + // mid-deploy with a confusing error instead of skipping cleanly. + tfpluginsDir := filepath.Join(buildDir, "tfplugins") + for _, p := range []string{execPath, cfgFile, tfpluginsDir} { + if _, err := os.Stat(p); err != nil { + t.Skipf("terraform not fully provisioned (%s); run: python3 acceptance/install_terraform.py --targetdir build", p) + } } t.Setenv("DATABRICKS_TF_EXEC_PATH", execPath) diff --git a/bundle/fuzz/generate.go b/bundle/fuzz/generate.go index 98db7a70f5..697748e03f 100644 --- a/bundle/fuzz/generate.go +++ b/bundle/fuzz/generate.go @@ -6,6 +6,9 @@ // checks for differences in the create payload between the terraform and direct // engines. Generators are seeded so that any divergence found by the fuzz driver // can be reproduced from the printed seed. +// +// Only jobs are covered for now. Extending the harness to other resource kinds +// (pipelines, apps, ...) is tracked as follow-up work under DECO-25361. package fuzz import ( @@ -40,6 +43,9 @@ var ( // the same seed always produces the same job. It deliberately favors fields whose // translation tends to differ between engines (tasks, clusters, schedules, // notifications, tags, zero-able scalars). +// +// TODO(DECO-25361): generalize the harness across resource kinds so pipelines, +// apps, etc. get the same create-payload parity coverage as jobs. func GenerateJob(rng *rand.Rand) *resources.Job { job := &resources.Job{} job.Name = randName(rng, "job") From 27d29e1aba65b51a56160cee39130ad6b5cc4ecc Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Wed, 24 Jun 2026 08:28:25 +0000 Subject: [PATCH 4/8] bundle/fuzz: rotate nightly seeds and add single-seed reproduction Make the create-payload parity fuzz suite explore new configs over time and be reproducible from a reported seed: - FUZZ_SEED (comma-separated) runs exactly those seeds, overriding the range, so a reported divergence reproduces with one command. The failure message now prints this knob. - FUZZ_SEED_OFFSET shifts the deterministic window; push.yml derives it from GITHUB_RUN_NUMBER so each nightly run checks seeds it has never tested before instead of re-checking a fixed set. Windows are non-overlapping because the run number is unique and monotonic. - Guard FUZZ_SEEDS > 0 so a negative value no longer panics make() and zero no longer passes as a no-op. - Drop the test-fuzz Task sources fingerprint: the seeds depend on env vars Task can't see, so skipping on an unchanged checksum would silently no-op a repro run or a shifted window. - Keep the nightly window modest (25); exploration comes from rotation, not size, and it can be raised once nightly timings are known. --- .github/workflows/push.yml | 14 ++++++++++ Taskfile.yml | 6 ++-- bundle/fuzz/fuzz_test.go | 57 +++++++++++++++++++++++++++++++++----- 3 files changed, 68 insertions(+), 9 deletions(-) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 3b2720ba3a..bf995d7555 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -402,7 +402,21 @@ jobs: cache-key: test-fuzz - name: Run tests + env: + # Shift the seed window by the run number every nightly run so CI + # explores configs it has never tested before instead of re-checking a + # fixed set. The window is kept modest (each seed runs two real deploys) + # since the exploration comes from rotating the window, not its size; + # raise it once nightly timings are known. A divergence prints + # FUZZ_SEED= for one-command reproduction. + # + # offset = GITHUB_RUN_NUMBER * FUZZ_SEEDS. GITHUB_RUN_NUMBER is a + # built-in, monotonically increasing, unique-per-run integer, so as long + # as FUZZ_SEEDS is constant the windows are non-overlapping (gaps from + # non-schedule runs are fine; we only need fresh seeds, not every seed). + FUZZ_SEEDS: "25" run: | + export FUZZ_SEED_OFFSET=$(( GITHUB_RUN_NUMBER * FUZZ_SEEDS )) go tool -modfile=tools/task/go.mod task test-fuzz # This job groups the result of all the above test jobs. diff --git a/Taskfile.yml b/Taskfile.yml index b980fa32e6..d2293a148e 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -680,8 +680,10 @@ tasks: test-fuzz: desc: Run terraform/direct create-payload parity fuzz tests (provisions terraform) - sources: - - bundle/fuzz/** + # No `sources:` fingerprint: the seeds checked are a function of the FUZZ_SEED, + # FUZZ_SEEDS, and FUZZ_SEED_OFFSET env vars, which Task can't see. Skipping on + # an unchanged source checksum would silently no-op a FUZZ_SEED= repro run + # or a shifted nightly window, so always run. cmds: # The parity harness expects terraform + the provider mirror at /build; # RequireTerraform skips when it's absent, so provision it first. diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go index ace7a5efd3..51471b3533 100644 --- a/bundle/fuzz/fuzz_test.go +++ b/bundle/fuzz/fuzz_test.go @@ -4,6 +4,7 @@ import ( "encoding/json" "os" "strconv" + "strings" "testing" "github.com/stretchr/testify/require" @@ -21,18 +22,60 @@ const defaultParitySeeds = 20 func TestJobCreateParity(t *testing.T) { RequireTerraform(t) - seeds := defaultParitySeeds + for _, seed := range paritySeeds(t) { + t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { + checkJobParity(t, seed) + }) + } +} + +// paritySeeds returns the seeds TestJobCreateParity should check. +// +// FUZZ_SEED (comma-separated list) runs exactly those seeds and overrides +// everything else. This is the knob the failure message prints so a single +// reported divergence can be reproduced with one command, without re-running +// every seed before it. +// +// Otherwise the test runs FUZZ_SEEDS seeds (default defaultParitySeeds) starting +// at FUZZ_SEED_OFFSET. The offset lets the nightly job shift the window every run +// (push.yml derives it from the run number) so CI explores configs it has never +// tested before instead of re-checking the same fixed set forever. +func paritySeeds(t *testing.T) []int64 { + if v := os.Getenv("FUZZ_SEED"); v != "" { + var seeds []int64 + for _, part := range strings.Split(v, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + n, err := strconv.ParseInt(part, 10, 64) + require.NoErrorf(t, err, "invalid FUZZ_SEED entry %q", part) + seeds = append(seeds, n) + } + require.NotEmptyf(t, seeds, "FUZZ_SEED=%q contained no seeds", v) + return seeds + } + + count := defaultParitySeeds if v := os.Getenv("FUZZ_SEEDS"); v != "" { n, err := strconv.Atoi(v) require.NoErrorf(t, err, "invalid FUZZ_SEEDS=%q", v) - seeds = n + require.Greaterf(t, n, 0, "FUZZ_SEEDS must be positive, got %d", n) + count = n } - for seed := range int64(seeds) { - t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { - checkJobParity(t, seed) - }) + var offset int64 + if v := os.Getenv("FUZZ_SEED_OFFSET"); v != "" { + n, err := strconv.ParseInt(v, 10, 64) + require.NoErrorf(t, err, "invalid FUZZ_SEED_OFFSET=%q", v) + offset = n + } + + seeds := make([]int64, 0, count) + for i := range int64(count) { + seeds = append(seeds, offset+i) } + return seeds } // FuzzJobCreateParity exposes the same parity check to Go's native fuzzer @@ -63,6 +106,6 @@ func checkJobParity(t *testing.T, seed int64) { for _, d := range diffs { t.Errorf(" %s", d) } - t.Logf("reproduce with GenerateJob(newRNG(%d)):\n%s", seed, jobJSON) + t.Logf("reproduce with: FUZZ_SEED=%d go test ./bundle/fuzz -run TestJobCreateParity\n%s", seed, jobJSON) } } From 59b1e2f128ac01d7bed781d562e0b429bef7c3c2 Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Wed, 24 Jun 2026 12:03:48 +0000 Subject: [PATCH 5/8] bundle: force-send num_workers for single-node task clusters The terraform provider force-sends num_workers: 0 for a single-node new_cluster (no autoscale) on both job_clusters and task-level clusters, but JobClustersFixups only applied initializeNumWorkers to job_clusters. The direct engine therefore omitted num_workers on task clusters, so the two engines produced divergent create payloads. This divergence was surfaced by the bundle/fuzz parity harness. Apply initializeNumWorkers to task new_cluster too so the direct engine matches terraform, and drop the now-obsolete tasks[*].new_cluster.num_workers entry from the fuzz DefaultIgnorePaths. --- acceptance/bundle/deploy/wal/chain-3-jobs/output.txt | 2 ++ .../bundle/deploy/wal/crash-after-create/output.txt | 1 + acceptance/bundle/override/job_tasks/output.txt | 2 ++ .../missing_map_key/out.validate.direct.json | 3 ++- .../missing_map_key/out.validate.terraform.json | 3 ++- .../config/mutator/resourcemutator/cluster_fixups.go | 1 + bundle/fuzz/compare.go | 10 ---------- 7 files changed, 10 insertions(+), 12 deletions(-) diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt index f27bfaa3f2..f11dc173ee 100644 --- a/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt @@ -35,6 +35,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { @@ -73,6 +74,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/deploy/wal/crash-after-create/output.txt b/acceptance/bundle/deploy/wal/crash-after-create/output.txt index 2ab926a1dd..9cd95a0b5c 100644 --- a/acceptance/bundle/deploy/wal/crash-after-create/output.txt +++ b/acceptance/bundle/deploy/wal/crash-after-create/output.txt @@ -39,6 +39,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/override/job_tasks/output.txt b/acceptance/bundle/override/job_tasks/output.txt index 2bee9738e3..59b6fc1c39 100644 --- a/acceptance/bundle/override/job_tasks/output.txt +++ b/acceptance/bundle/override/job_tasks/output.txt @@ -18,6 +18,7 @@ }, { "new_cluster": { + "num_workers": 0, "spark_version": "13.3.x-scala2.12" }, "spark_python_task": { @@ -42,6 +43,7 @@ Exit code: 1 "tasks": [ { "new_cluster": { + "num_workers": 0, "spark_version": "13.3.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json b/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json index cfd1427ce4..7279aaeba3 100644 --- a/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json +++ b/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json @@ -30,7 +30,8 @@ "new_cluster": { "custom_tags": { "ResourceClass": "SingleNode" - } + }, + "num_workers": 0 }, "task_key": "test-task" } diff --git a/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json b/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json index 3cdf58f84e..3bad6f4619 100644 --- a/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json +++ b/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json @@ -30,7 +30,8 @@ "new_cluster": { "custom_tags": { "ResourceClass": "SingleNode" - } + }, + "num_workers": 0 }, "task_key": "test-task" } diff --git a/bundle/config/mutator/resourcemutator/cluster_fixups.go b/bundle/config/mutator/resourcemutator/cluster_fixups.go index 893cd248aa..04ddef6cc2 100644 --- a/bundle/config/mutator/resourcemutator/cluster_fixups.go +++ b/bundle/config/mutator/resourcemutator/cluster_fixups.go @@ -94,6 +94,7 @@ func prepareJobSettingsForUpdate(js *jobs.JobSettings) { for _, task := range js.Tasks { if task.NewCluster != nil { ModifyRequestOnInstancePool(task.NewCluster) + initializeNumWorkers(task.NewCluster) } } for ind := range js.JobClusters { diff --git a/bundle/fuzz/compare.go b/bundle/fuzz/compare.go index e893ab443d..de68171962 100644 --- a/bundle/fuzz/compare.go +++ b/bundle/fuzz/compare.go @@ -187,16 +187,6 @@ func normalizePath(path string) string { // the engines and are not parity bugs. Keep this list small and well-justified; // every entry is a known, intentional divergence. var DefaultIgnorePaths = []string{ - // A single-node task cluster (num_workers: 0, no autoscale) diverges: the - // terraform provider sends num_workers: 0 while the direct engine omits it. - // JobClustersFixups.initializeNumWorkers force-sends num_workers for - // job_clusters but is NOT applied to task-level new_cluster, so the fix-up - // only covers job_clusters (those are at parity and need no ignore here). - // This is a real CLI gap surfaced by the fuzzer, tracked separately; ignore - // it here so the fuzz suite stays green until the fix-up is extended to task - // clusters. - "tasks[*].new_cluster.num_workers", - // The terraform provider strips the deprecated/ignored spark conf // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while // the direct engine forwards it verbatim. The backend ignores the key either From bec3d5f5cc5935a6c5e2277083c8d67cadcd5f54 Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Wed, 24 Jun 2026 12:04:02 +0000 Subject: [PATCH 6/8] bundle/fuzz: report nightly parity failures and fix create-path comment The nightly test-fuzz job is intentionally excluded from test-result, so a failure was only visible in the Actions tab. Add a failure step that opens (or comments on) a single deduped GitHub issue with a one-command repro. Also correct the jobsCreatePath comment: a different API version shows up as a capture failure (the testserver registers only this route, so a mismatched version 404s and the deploy fails), not as a payload diff. --- .github/workflows/push.yml | 37 +++++++++++++++++++++++++++++++++++++ bundle/fuzz/capture.go | 8 +++++--- 2 files changed, 42 insertions(+), 3 deletions(-) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index bf995d7555..8c6f1e5f3a 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -391,6 +391,8 @@ jobs: permissions: id-token: write contents: read + # Needed by the failure-reporting step below to open/comment a tracking issue. + issues: write steps: - name: Checkout repository and submodules @@ -419,6 +421,41 @@ jobs: export FUZZ_SEED_OFFSET=$(( GITHUB_RUN_NUMBER * FUZZ_SEEDS )) go tool -modfile=tools/task/go.mod task test-fuzz + # This job is intentionally excluded from test-result, so a failure here is + # invisible unless someone watches the Actions tab. Surface it as a GitHub + # issue instead. Reuse a single open issue (deduped by label) so a recurring + # divergence doesn't open one issue per night. + - name: Report failure + if: ${{ failure() }} + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + run: | + gh label create fuzz-nightly \ + --description "Nightly terraform/direct create-payload parity failures" \ + --color FBCA04 2>/dev/null || true + + body=$(cat <\`. + Reproduce locally with: + + \`\`\` + FUZZ_SEED= go test ./bundle/fuzz -run TestJobCreateParity + \`\`\` + EOF + ) + + existing=$(gh issue list --state open --label fuzz-nightly --json number --jq '.[0].number') + if [ -n "$existing" ]; then + gh issue comment "$existing" --body "$body" + else + gh issue create --title "Nightly fuzz parity failure" --label fuzz-nightly --body "$body" + fi + # This job groups the result of all the above test jobs. # It is a required check, so it blocks auto-merge and the merge queue. # diff --git a/bundle/fuzz/capture.go b/bundle/fuzz/capture.go index 330f485f82..fe10bc10be 100644 --- a/bundle/fuzz/capture.go +++ b/bundle/fuzz/capture.go @@ -8,9 +8,11 @@ import ( ) // jobsCreatePath is the Jobs API route both engines must hit on create. The -// direct engine posts here via the SDK; the terraform provider is expected to -// post here too, and a mismatch (e.g. a different API version) is itself a -// divergence worth surfacing. +// direct engine posts here via the SDK and the terraform provider is expected to +// as well. The testserver registers only this exact route, so if an engine ever +// posted to a different version the deploy would 404 and CaptureJobCreate would +// fail with "did not POST". A version skew therefore surfaces as a capture +// failure, not as a payload diff. const jobsCreatePath = "/api/2.2/jobs/create" // CapturedRequest is a single mutating API request observed by the testserver. From 55d118233a16fe968391fa78c351db10ecb2890e Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Wed, 24 Jun 2026 13:26:12 +0000 Subject: [PATCH 7/8] bundle/fuzz: make harness files test-only and add num_workers regression test Rename the capture/deploy/recorder helpers to *_test.go so the parity harness compiles only under `go test` instead of into the package's regular build, and add a committed regression test (cluster_fixups_test.go) covering the single-node task-cluster num_workers force-send fix so the divergence is guarded at PR time, not just in the nightly suite. --- .github/workflows/push.yml | 5 +- Taskfile.yml | 8 +- .../resourcemutator/cluster_fixups_test.go | 92 +++++++++++++++++++ bundle/fuzz/compare.go | 72 +++++++++++++++ bundle/fuzz/compare_test.go | 24 +++++ ...re_deploy_test.go => deploy_smoke_test.go} | 6 +- .../{capture_deploy.go => deploy_test.go} | 44 ++++++--- bundle/fuzz/fuzz_test.go | 81 ++++++++++++++-- bundle/fuzz/{capture.go => recorder_test.go} | 10 +- 9 files changed, 311 insertions(+), 31 deletions(-) create mode 100644 bundle/config/mutator/resourcemutator/cluster_fixups_test.go rename bundle/fuzz/{capture_deploy_test.go => deploy_smoke_test.go} (82%) rename bundle/fuzz/{capture_deploy.go => deploy_test.go} (73%) rename bundle/fuzz/{capture.go => recorder_test.go} (86%) diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index 8c6f1e5f3a..cc69da23f4 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -444,8 +444,11 @@ jobs: Reproduce locally with: \`\`\` - FUZZ_SEED= go test ./bundle/fuzz -run TestJobCreateParity + FUZZ_SEED= task test-fuzz \`\`\` + + Once fixed, add the seed to \`regressionSeeds\` in \`bundle/fuzz/fuzz_test.go\` + in the same PR so the divergence can never silently regress. EOF ) diff --git a/Taskfile.yml b/Taskfile.yml index d2293a148e..2f59f4fa2a 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -684,9 +684,15 @@ tasks: # FUZZ_SEEDS, and FUZZ_SEED_OFFSET env vars, which Task can't see. Skipping on # an unchanged source checksum would silently no-op a FUZZ_SEED= repro run # or a shifted nightly window, so always run. + env: + # The terraform parity tests are opt-in (see requireFuzzOptIn): they skip + # unless a FUZZ_* var is set, so a leftover build/ never makes them run as + # part of a plain `task test`. This constant flag opts this target in + # without overriding the FUZZ_SEED(S)/OFFSET tuning knobs. + FUZZ_PARITY: "1" cmds: # The parity harness expects terraform + the provider mirror at /build; - # RequireTerraform skips when it's absent, so provision it first. + # requireTerraform skips when it's absent, so provision it first. - python3 acceptance/install_terraform.py --targetdir build - | {{.GO_TOOL}} gotestsum \ diff --git a/bundle/config/mutator/resourcemutator/cluster_fixups_test.go b/bundle/config/mutator/resourcemutator/cluster_fixups_test.go new file mode 100644 index 0000000000..5cb2e93749 --- /dev/null +++ b/bundle/config/mutator/resourcemutator/cluster_fixups_test.go @@ -0,0 +1,92 @@ +package resourcemutator + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestInitializeNumWorkers(t *testing.T) { + tests := []struct { + name string + spec compute.ClusterSpec + wantForceSend bool + }{ + { + name: "single-node cluster force-sends num_workers", + spec: compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + wantForceSend: true, + }, + { + name: "autoscale cluster does not force-send", + spec: compute.ClusterSpec{Autoscale: &compute.AutoScale{MinWorkers: 1, MaxWorkers: 4}}, + wantForceSend: false, + }, + { + name: "multi-node cluster does not force-send", + spec: compute.ClusterSpec{NumWorkers: 3}, + wantForceSend: false, + }, + { + name: "already force-sent stays force-sent without duplicating", + spec: compute.ClusterSpec{ForceSendFields: []string{"NumWorkers"}}, + wantForceSend: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spec := tt.spec + initializeNumWorkers(&spec) + + count := 0 + for _, f := range spec.ForceSendFields { + if f == "NumWorkers" { + count++ + } + } + if tt.wantForceSend { + assert.Equal(t, 1, count, "NumWorkers must appear in ForceSendFields exactly once") + } else { + assert.Equal(t, 0, count, "NumWorkers must not be in ForceSendFields") + } + }) + } +} + +// TestPrepareJobSettingsForUpdateForcesNumWorkers locks the DECO-25361 fix: a +// single-node new_cluster must force-send num_workers on task-level clusters too, +// not just shared job_clusters. The terraform provider always sends num_workers:0 +// for such clusters, so missing it on the task side made the direct engine +// produce a divergent create payload. +func TestPrepareJobSettingsForUpdateForcesNumWorkers(t *testing.T) { + js := &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "single_node_task", + NewCluster: &compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + }, + { + TaskKey: "autoscale_task", + NewCluster: &compute.ClusterSpec{Autoscale: &compute.AutoScale{MinWorkers: 1, MaxWorkers: 4}}, + }, + }, + JobClusters: []jobs.JobCluster{ + { + JobClusterKey: "single_node_cluster", + NewCluster: compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + }, + }, + } + + prepareJobSettingsForUpdate(js) + + assert.Contains(t, js.Tasks[0].NewCluster.ForceSendFields, "NumWorkers", + "single-node task cluster must force-send num_workers") + assert.NotContains(t, js.Tasks[1].NewCluster.ForceSendFields, "NumWorkers", + "autoscale task cluster must not force-send num_workers") + assert.Contains(t, js.JobClusters[0].NewCluster.ForceSendFields, "NumWorkers", + "single-node job cluster must force-send num_workers") +} diff --git a/bundle/fuzz/compare.go b/bundle/fuzz/compare.go index de68171962..81c1bc7afb 100644 --- a/bundle/fuzz/compare.go +++ b/bundle/fuzz/compare.go @@ -110,6 +110,14 @@ func diffValue(path string, a, b any, diffs *[]Difference) { *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) return } + // Slices whose elements carry a natural identity key (tasks, job clusters) + // are matched by that key so an engine emitting the same elements in a + // different order is not reported as a difference. Everything else is + // compared positionally. + if key := identityKey(av, bv); key != "" { + diffKeyedSlice(path, key, av, bv, diffs) + return + } n := max(len(av), len(bv)) for i := range n { child := fmt.Sprintf("%s[%d]", path, i) @@ -129,6 +137,70 @@ func diffValue(path string, a, b any, diffs *[]Difference) { } } +// identityFields are the keys, in priority order, that uniquely identify the +// elements of a payload slice. Job tasks and shared job clusters are the slices +// whose order is not significant but which the engines may emit differently. +var identityFields = []string{"task_key", "job_cluster_key"} + +// identityKey returns the field that identifies every element of both slices, or +// "" if the elements are not uniformly keyed objects (in which case the caller +// falls back to positional comparison). +func identityKey(a, b []any) string { + for _, field := range identityFields { + if allHaveKey(a, field) && allHaveKey(b, field) { + return field + } + } + return "" +} + +func allHaveKey(s []any, field string) bool { + if len(s) == 0 { + return false + } + for _, el := range s { + m, ok := el.(map[string]any) + if !ok { + return false + } + if _, ok := m[field].(string); !ok { + return false + } + } + return true +} + +// diffKeyedSlice matches elements of a and b by the value of key (which is unique +// within each slice for tasks/job clusters) and diffs each matched pair, +// reporting unmatched elements as present-on-one-side. Paths keep numeric indices +// so ignore-path [*] normalization still applies. +func diffKeyedSlice(path, key string, a, b []any, diffs *[]Difference) { + bByKey := make(map[string]any, len(b)) + for _, el := range b { + bByKey[el.(map[string]any)[key].(string)] = el + } + + matched := make(map[string]bool, len(a)) + for i, el := range a { + child := fmt.Sprintf("%s[%d]", path, i) + k := el.(map[string]any)[key].(string) + matched[k] = true + if bel, ok := bByKey[k]; ok { + diffValue(child, el, bel, diffs) + } else { + *diffs = append(*diffs, Difference{Path: child, Direct: el, Terraform: missing{}}) + } + } + for j, el := range b { + k := el.(map[string]any)[key].(string) + if matched[k] { + continue + } + child := fmt.Sprintf("%s[%d]", path, j) + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: el}) + } +} + // scalarEqual compares two JSON scalars. json.Number is compared by its string // form so 1 and 1.0 don't masquerade as equal across engines. func scalarEqual(a, b any) bool { diff --git a/bundle/fuzz/compare_test.go b/bundle/fuzz/compare_test.go index ec5818468b..46e506d75c 100644 --- a/bundle/fuzz/compare_test.go +++ b/bundle/fuzz/compare_test.go @@ -78,6 +78,30 @@ func TestDiffPayloads(t *testing.T) { ignore: []string{`c.spark_conf["spark.x.y"]`}, want: nil, }, + { + name: "tasks matched by key ignore order", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":2},{"task_key":"a","timeout_seconds":1}]}`, + want: nil, + }, + { + name: "tasks matched by key surface real diff at direct index", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":9},{"task_key":"a","timeout_seconds":1}]}`, + want: []string{"tasks[1].timeout_seconds"}, + }, + { + name: "task only on terraform reported at its index", + direct: `{"tasks":[{"task_key":"a"}]}`, + terraform: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "job_clusters matched by key ignore order", + direct: `{"job_clusters":[{"job_cluster_key":"x","new_cluster":{"num_workers":1}},{"job_cluster_key":"y","new_cluster":{"num_workers":2}}]}`, + terraform: `{"job_clusters":[{"job_cluster_key":"y","new_cluster":{"num_workers":2}},{"job_cluster_key":"x","new_cluster":{"num_workers":1}}]}`, + want: nil, + }, } for _, tt := range tests { diff --git a/bundle/fuzz/capture_deploy_test.go b/bundle/fuzz/deploy_smoke_test.go similarity index 82% rename from bundle/fuzz/capture_deploy_test.go rename to bundle/fuzz/deploy_smoke_test.go index 2518265d75..d501ee7808 100644 --- a/bundle/fuzz/capture_deploy_test.go +++ b/bundle/fuzz/deploy_smoke_test.go @@ -11,7 +11,7 @@ import ( func TestCaptureJobCreateDirect(t *testing.T) { job := GenerateJob(newRNG(1)) - body, err := CaptureJobCreate(t.Context(), t, job, "direct") + body, err := captureJobCreate(t.Context(), t, job, "direct") require.NoError(t, err) require.NotEmpty(t, body) @@ -22,10 +22,10 @@ func TestCaptureJobCreateDirect(t *testing.T) { } func TestCaptureJobCreateTerraform(t *testing.T) { - RequireTerraform(t) + requireTerraform(t) job := GenerateJob(newRNG(1)) - body, err := CaptureJobCreate(t.Context(), t, job, "terraform") + body, err := captureJobCreate(t.Context(), t, job, "terraform") require.NoError(t, err) require.NotEmpty(t, body) diff --git a/bundle/fuzz/capture_deploy.go b/bundle/fuzz/deploy_test.go similarity index 73% rename from bundle/fuzz/capture_deploy.go rename to bundle/fuzz/deploy_test.go index 0efeaa9ed1..e42dbb7434 100644 --- a/bundle/fuzz/capture_deploy.go +++ b/bundle/fuzz/deploy_test.go @@ -19,7 +19,7 @@ const ( fakeToken = "testtoken" ) -// CaptureJobCreate deploys a bundle containing job through the given engine +// captureJobCreate deploys a bundle containing job through the given engine // ("direct" or "terraform") and returns the create request body sent to the // Jobs API. // @@ -31,8 +31,8 @@ const ( // // The terraform engine additionally requires DATABRICKS_TF_EXEC_PATH and // DATABRICKS_TF_CLI_CONFIG_FILE to point at a provisioned terraform binary and -// provider mirror; see RequireTerraform. -func CaptureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, engine string) (json.RawMessage, error) { +// provider mirror; see requireTerraform. +func captureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, engine string) (json.RawMessage, error) { rec := &recorder{} server := testserver.New(t) server.RequestCallback = rec.callback @@ -61,15 +61,15 @@ func CaptureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, eng return body, nil } -// CompareJobEngines deploys job under both engines and returns the create-payload +// compareJobEngines deploys job under both engines and returns the create-payload // differences that are not covered by DefaultIgnorePaths. An empty result means // the engines produced equivalent create payloads. -func CompareJobEngines(ctx context.Context, t *testing.T, job *resources.Job) ([]Difference, error) { - direct, err := CaptureJobCreate(ctx, t, job, "direct") +func compareJobEngines(ctx context.Context, t *testing.T, job *resources.Job) ([]Difference, error) { + direct, err := captureJobCreate(ctx, t, job, "direct") if err != nil { return nil, fmt.Errorf("capturing direct payload: %w", err) } - terraform, err := CaptureJobCreate(ctx, t, job, "terraform") + terraform, err := captureJobCreate(ctx, t, job, "terraform") if err != nil { return nil, fmt.Errorf("capturing terraform payload: %w", err) } @@ -106,10 +106,32 @@ func writeJobBundle(dir, host string, job *resources.Job) error { return os.WriteFile(filepath.Join(dir, "databricks.yml"), data, 0o600) } -// RequireTerraform points the terraform engine at the binary and provider mirror -// provisioned by acceptance/install_terraform.py into /build, and skips the -// test when they are absent so the suite still runs where terraform is not set up. -func RequireTerraform(t testing.TB) { +// fuzzOptInVars are the environment variables that opt a run into the +// terraform-backed parity suite. FUZZ_SEED / FUZZ_SEEDS / FUZZ_SEED_OFFSET double +// as the tuning knobs (see paritySeeds), so setting any of them implies opt-in; +// FUZZ_PARITY is a no-tuning switch used by `task test-fuzz`. +var fuzzOptInVars = []string{"FUZZ_PARITY", "FUZZ_SEED", "FUZZ_SEEDS", "FUZZ_SEED_OFFSET"} + +// requireFuzzOptIn skips unless the run explicitly opted into the terraform +// parity suite. Gating on an env var rather than on the presence of build/ keeps +// a leftover terraform install (from a prior `task test-fuzz` or acceptance run) +// from silently turning a plain `task test` into dozens of real deploys. +func requireFuzzOptIn(t testing.TB) { + for _, name := range fuzzOptInVars { + if os.Getenv(name) != "" { + return + } + } + t.Skip("terraform parity suite is opt-in; run `task test-fuzz` or set FUZZ_SEED= to reproduce a single seed") +} + +// requireTerraform opts in via requireFuzzOptIn, then points the terraform engine +// at the binary and provider mirror provisioned by acceptance/install_terraform.py +// into /build, skipping when they are absent so the suite still skips +// cleanly where terraform is not set up. +func requireTerraform(t testing.TB) { + requireFuzzOptIn(t) + buildDir := filepath.Join(repoRoot(t), "build") execPath := filepath.Join(buildDir, "terraform") cfgFile := filepath.Join(buildDir, ".terraformrc") diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go index 51471b3533..7b0d0df8ea 100644 --- a/bundle/fuzz/fuzz_test.go +++ b/bundle/fuzz/fuzz_test.go @@ -7,6 +7,7 @@ import ( "strings" "testing" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" ) @@ -15,12 +16,27 @@ import ( // kept modest; override with FUZZ_SEEDS for a deeper local run. const defaultParitySeeds = 20 +// regressionSeeds are seeds that previously surfaced a terraform/direct create +// payload divergence. They are always checked (in addition to the rotating +// nightly window) so a fixed divergence can never silently regress, even though +// the nightly window moves on every run and would otherwise never revisit them. +// +// When the nightly job reports a new failing FUZZ_SEED, add it here in the same +// PR that fixes the divergence. +// +// - 29: first seed that generates a single-node task-level new_cluster +// (num_workers 0, no autoscale). The direct engine omitted num_workers on +// task clusters while terraform force-sent num_workers:0, so the create +// payloads diverged. Fixed by applying initializeNumWorkers to task clusters +// in resourcemutator.prepareJobSettingsForUpdate. +var regressionSeeds = []int64{29} + // TestJobCreateParity is the first DECO-25361 technique: for many random job // configs, assert the terraform and direct engines produce equivalent create // payloads. On divergence it prints the seed and the generated job so the failure // can be reproduced and inspected. func TestJobCreateParity(t *testing.T) { - RequireTerraform(t) + requireTerraform(t) for _, seed := range paritySeeds(t) { t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { @@ -36,10 +52,12 @@ func TestJobCreateParity(t *testing.T) { // reported divergence can be reproduced with one command, without re-running // every seed before it. // -// Otherwise the test runs FUZZ_SEEDS seeds (default defaultParitySeeds) starting -// at FUZZ_SEED_OFFSET. The offset lets the nightly job shift the window every run -// (push.yml derives it from the run number) so CI explores configs it has never -// tested before instead of re-checking the same fixed set forever. +// Otherwise the test runs the regressionSeeds plus FUZZ_SEEDS seeds (default +// defaultParitySeeds) starting at FUZZ_SEED_OFFSET. The offset lets the nightly +// job shift the window every run (push.yml derives it from the run number) so CI +// explores configs it has never tested before instead of re-checking the same +// fixed set forever; the regressionSeeds are always included on top so known +// past divergences keep being verified. func paritySeeds(t *testing.T) []int64 { if v := os.Getenv("FUZZ_SEED"); v != "" { var seeds []int64 @@ -71,21 +89,64 @@ func paritySeeds(t *testing.T) []int64 { offset = n } - seeds := make([]int64, 0, count) + seeds := make([]int64, 0, len(regressionSeeds)+count) + seen := make(map[int64]bool, len(regressionSeeds)+count) + for _, s := range regressionSeeds { + if !seen[s] { + seen[s] = true + seeds = append(seeds, s) + } + } for i := range int64(count) { - seeds = append(seeds, offset+i) + s := offset + i + if !seen[s] { + seen[s] = true + seeds = append(seeds, s) + } } return seeds } +func TestParitySeeds(t *testing.T) { + t.Run("default includes regression seeds then window", func(t *testing.T) { + t.Setenv("FUZZ_SEEDS", "3") + t.Setenv("FUZZ_SEED_OFFSET", "100") + want := append(append([]int64{}, regressionSeeds...), 100, 101, 102) + assert.Equal(t, want, paritySeeds(t)) + }) + + t.Run("window overlapping a regression seed is deduplicated", func(t *testing.T) { + t.Setenv("FUZZ_SEEDS", "5") + t.Setenv("FUZZ_SEED_OFFSET", "27") + seeds := paritySeeds(t) + count := 0 + for _, s := range seeds { + if s == 29 { + count++ + } + } + assert.Equal(t, 1, count, "seed 29 must appear once even though it is both a regression seed and inside the window") + }) + + t.Run("FUZZ_SEED override ignores regression seeds", func(t *testing.T) { + t.Setenv("FUZZ_SEED", "7, 8") + assert.Equal(t, []int64{7, 8}, paritySeeds(t)) + }) +} + // FuzzJobCreateParity exposes the same parity check to Go's native fuzzer // (`go test -fuzz=FuzzJobCreateParity`). Note each input runs two real deploys, // so this is intended for ad-hoc deep runs, not the default `go test` path. func FuzzJobCreateParity(f *testing.F) { - RequireTerraform(f) + requireTerraform(f) for seed := range int64(5) { f.Add(seed) } + // Seed the corpus with known past divergences so the fuzzer always starts + // from inputs that previously exposed a bug. + for _, seed := range regressionSeeds { + f.Add(seed) + } f.Fuzz(func(t *testing.T, seed int64) { checkJobParity(t, seed) }) @@ -97,7 +158,7 @@ func checkJobParity(t *testing.T, seed int64) { t.Helper() job := GenerateJob(newRNG(seed)) - diffs, err := CompareJobEngines(t.Context(), t, job) + diffs, err := compareJobEngines(t.Context(), t, job) require.NoErrorf(t, err, "seed %d", seed) if len(diffs) > 0 { @@ -106,6 +167,6 @@ func checkJobParity(t *testing.T, seed int64) { for _, d := range diffs { t.Errorf(" %s", d) } - t.Logf("reproduce with: FUZZ_SEED=%d go test ./bundle/fuzz -run TestJobCreateParity\n%s", seed, jobJSON) + t.Logf("reproduce with: FUZZ_SEED=%d task test-fuzz\nonce fixed, add %d to regressionSeeds in bundle/fuzz/fuzz_test.go\n%s", seed, seed, jobJSON) } } diff --git a/bundle/fuzz/capture.go b/bundle/fuzz/recorder_test.go similarity index 86% rename from bundle/fuzz/capture.go rename to bundle/fuzz/recorder_test.go index fe10bc10be..244cb81480 100644 --- a/bundle/fuzz/capture.go +++ b/bundle/fuzz/recorder_test.go @@ -10,13 +10,13 @@ import ( // jobsCreatePath is the Jobs API route both engines must hit on create. The // direct engine posts here via the SDK and the terraform provider is expected to // as well. The testserver registers only this exact route, so if an engine ever -// posted to a different version the deploy would 404 and CaptureJobCreate would +// posted to a different version the deploy would 404 and captureJobCreate would // fail with "did not POST". A version skew therefore surfaces as a capture // failure, not as a payload diff. const jobsCreatePath = "/api/2.2/jobs/create" -// CapturedRequest is a single mutating API request observed by the testserver. -type CapturedRequest struct { +// capturedRequest is a single mutating API request observed by the testserver. +type capturedRequest struct { Method string Path string Body json.RawMessage @@ -27,7 +27,7 @@ type CapturedRequest struct { // goroutines. type recorder struct { mu sync.Mutex - requests []CapturedRequest + requests []capturedRequest } func (r *recorder) callback(req *testserver.Request) { @@ -40,7 +40,7 @@ func (r *recorder) callback(req *testserver.Request) { body = append(json.RawMessage(nil), req.Body...) } - r.requests = append(r.requests, CapturedRequest{ + r.requests = append(r.requests, capturedRequest{ Method: req.Method, Path: req.URL.Path, Body: body, From e596815384211c3f6defa8b107019da7d820e187 Mon Sep 17 00:00:00 2001 From: Rada Kamysheva Date: Wed, 24 Jun 2026 13:34:51 +0000 Subject: [PATCH 8/8] bundle/fuzz: make the whole package test-only and harden parity reporting Move the remaining generator/diff/rand implementation into _test.go files (keeping only a doc.go for the package comment) so nothing in the harness compiles into the regular build, since no product code imports it. Distinguish deploy/capture failures from create-payload divergences in checkJobParity: skip when neither engine deploys the generated config, fail distinctly when exactly one engine accepts it (an acceptance divergence, not a payload diff), and only diff payloads when both deploys succeed. This keeps nightly triage from misdirecting a deploy failure into regressionSeeds. Also document the unique-identity-key assumption in diffKeyedSlice. --- bundle/fuzz/compare.go | 268 ----------------- bundle/fuzz/compare_cases_test.go | 119 ++++++++ bundle/fuzz/compare_test.go | 374 +++++++++++++++++------- bundle/fuzz/deploy_test.go | 15 - bundle/fuzz/doc.go | 17 ++ bundle/fuzz/fuzz_test.go | 27 +- bundle/fuzz/generate.go | 356 ---------------------- bundle/fuzz/generate_invariants_test.go | 47 +++ bundle/fuzz/generate_test.go | 358 +++++++++++++++++++++-- bundle/fuzz/{rand.go => rand_test.go} | 0 10 files changed, 800 insertions(+), 781 deletions(-) delete mode 100644 bundle/fuzz/compare.go create mode 100644 bundle/fuzz/compare_cases_test.go create mode 100644 bundle/fuzz/doc.go delete mode 100644 bundle/fuzz/generate.go create mode 100644 bundle/fuzz/generate_invariants_test.go rename bundle/fuzz/{rand.go => rand_test.go} (100%) diff --git a/bundle/fuzz/compare.go b/bundle/fuzz/compare.go deleted file mode 100644 index 81c1bc7afb..0000000000 --- a/bundle/fuzz/compare.go +++ /dev/null @@ -1,268 +0,0 @@ -package fuzz - -import ( - "bytes" - "encoding/json" - "fmt" - "regexp" - "slices" - "strconv" - "strings" -) - -// Difference is a single mismatch between the two engines' create payloads, -// located by a JSON-ish path (e.g. "tasks[0].new_cluster.num_workers"). -type Difference struct { - Path string - Direct any - Terraform any -} - -func (d Difference) String() string { - return fmt.Sprintf("%s: direct=%s terraform=%s", d.Path, render(d.Direct), render(d.Terraform)) -} - -// missing marks a value that is absent on one side. -type missing struct{} - -func render(v any) string { - if _, ok := v.(missing); ok { - return "" - } - b, err := json.Marshal(v) - if err != nil { - return fmt.Sprintf("%v", v) - } - return string(b) -} - -// DiffPayloads decodes both create payloads and returns every difference whose -// path is not explicitly ignored. ignorePaths are matched exactly against the -// rendered path, with "[*]" standing in for any slice index. -func DiffPayloads(direct, terraform json.RawMessage, ignorePaths []string) ([]Difference, error) { - d, err := decode(direct) - if err != nil { - return nil, fmt.Errorf("decoding direct payload: %w", err) - } - tf, err := decode(terraform) - if err != nil { - return nil, fmt.Errorf("decoding terraform payload: %w", err) - } - - var diffs []Difference - diffValue("", d, tf, &diffs) - - ignore := make(map[string]bool, len(ignorePaths)) - for _, p := range ignorePaths { - ignore[p] = true - } - - filtered := diffs[:0] - for _, diff := range diffs { - if !ignore[normalizePath(diff.Path)] { - filtered = append(filtered, diff) - } - } - return filtered, nil -} - -// decode unmarshals JSON using UseNumber so large int64 values (e.g. job ids, -// spark_context_id) are not corrupted by float64 rounding. See the encoding rule -// in the repo style guide. -func decode(raw json.RawMessage) (any, error) { - if len(raw) == 0 { - return nil, nil - } - dec := json.NewDecoder(bytes.NewReader(raw)) - dec.UseNumber() - var v any - if err := dec.Decode(&v); err != nil { - return nil, err - } - return v, nil -} - -func diffValue(path string, a, b any, diffs *[]Difference) { - switch av := a.(type) { - case map[string]any: - bv, ok := b.(map[string]any) - if !ok { - *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) - return - } - keys := unionKeys(av, bv) - for _, k := range keys { - achild, aok := av[k] - bchild, bok := bv[k] - child := joinKey(path, k) - switch { - case aok && bok: - diffValue(child, achild, bchild, diffs) - case aok: - *diffs = append(*diffs, Difference{Path: child, Direct: achild, Terraform: missing{}}) - default: - *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bchild}) - } - } - case []any: - bv, ok := b.([]any) - if !ok { - *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) - return - } - // Slices whose elements carry a natural identity key (tasks, job clusters) - // are matched by that key so an engine emitting the same elements in a - // different order is not reported as a difference. Everything else is - // compared positionally. - if key := identityKey(av, bv); key != "" { - diffKeyedSlice(path, key, av, bv, diffs) - return - } - n := max(len(av), len(bv)) - for i := range n { - child := fmt.Sprintf("%s[%d]", path, i) - switch { - case i < len(av) && i < len(bv): - diffValue(child, av[i], bv[i], diffs) - case i < len(av): - *diffs = append(*diffs, Difference{Path: child, Direct: av[i], Terraform: missing{}}) - default: - *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bv[i]}) - } - } - default: - if !scalarEqual(a, b) { - *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) - } - } -} - -// identityFields are the keys, in priority order, that uniquely identify the -// elements of a payload slice. Job tasks and shared job clusters are the slices -// whose order is not significant but which the engines may emit differently. -var identityFields = []string{"task_key", "job_cluster_key"} - -// identityKey returns the field that identifies every element of both slices, or -// "" if the elements are not uniformly keyed objects (in which case the caller -// falls back to positional comparison). -func identityKey(a, b []any) string { - for _, field := range identityFields { - if allHaveKey(a, field) && allHaveKey(b, field) { - return field - } - } - return "" -} - -func allHaveKey(s []any, field string) bool { - if len(s) == 0 { - return false - } - for _, el := range s { - m, ok := el.(map[string]any) - if !ok { - return false - } - if _, ok := m[field].(string); !ok { - return false - } - } - return true -} - -// diffKeyedSlice matches elements of a and b by the value of key (which is unique -// within each slice for tasks/job clusters) and diffs each matched pair, -// reporting unmatched elements as present-on-one-side. Paths keep numeric indices -// so ignore-path [*] normalization still applies. -func diffKeyedSlice(path, key string, a, b []any, diffs *[]Difference) { - bByKey := make(map[string]any, len(b)) - for _, el := range b { - bByKey[el.(map[string]any)[key].(string)] = el - } - - matched := make(map[string]bool, len(a)) - for i, el := range a { - child := fmt.Sprintf("%s[%d]", path, i) - k := el.(map[string]any)[key].(string) - matched[k] = true - if bel, ok := bByKey[k]; ok { - diffValue(child, el, bel, diffs) - } else { - *diffs = append(*diffs, Difference{Path: child, Direct: el, Terraform: missing{}}) - } - } - for j, el := range b { - k := el.(map[string]any)[key].(string) - if matched[k] { - continue - } - child := fmt.Sprintf("%s[%d]", path, j) - *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: el}) - } -} - -// scalarEqual compares two JSON scalars. json.Number is compared by its string -// form so 1 and 1.0 don't masquerade as equal across engines. -func scalarEqual(a, b any) bool { - an, aok := a.(json.Number) - bn, bok := b.(json.Number) - if aok && bok { - return an.String() == bn.String() - } - return a == b -} - -func unionKeys(a, b map[string]any) []string { - seen := map[string]bool{} - var keys []string - for k := range a { - if !seen[k] { - seen[k] = true - keys = append(keys, k) - } - } - for k := range b { - if !seen[k] { - seen[k] = true - keys = append(keys, k) - } - } - slices.Sort(keys) - return keys -} - -func joinKey(path, key string) string { - // Map keys can themselves contain dots or brackets (e.g. spark_conf entries - // like "spark.databricks.delta.preview.enabled"). Render those as bracketed, - // quoted segments so the path stays unambiguous and ignore entries can target - // a single key. - if key == "" || strings.ContainsAny(key, `.[]"`) { - return path + "[" + strconv.Quote(key) + "]" - } - if path == "" { - return key - } - return path + "." + key -} - -// indexRe matches numeric slice indices like "[12]" but not quoted string keys -// like ["spark.x"]. -var indexRe = regexp.MustCompile(`\[\d+\]`) - -// normalizePath replaces concrete slice indices with [*] so a single ignore -// entry can cover every element of a slice. -func normalizePath(path string) string { - return indexRe.ReplaceAllString(path, "[*]") -} - -// DefaultIgnorePaths lists create-payload paths that legitimately differ between -// the engines and are not parity bugs. Keep this list small and well-justified; -// every entry is a known, intentional divergence. -var DefaultIgnorePaths = []string{ - // The terraform provider strips the deprecated/ignored spark conf - // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while - // the direct engine forwards it verbatim. The backend ignores the key either - // way, so this is a benign provider-side filter rather than a parity bug. - `tasks[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, - `job_clusters[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, -} diff --git a/bundle/fuzz/compare_cases_test.go b/bundle/fuzz/compare_cases_test.go new file mode 100644 index 0000000000..46e506d75c --- /dev/null +++ b/bundle/fuzz/compare_cases_test.go @@ -0,0 +1,119 @@ +package fuzz + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiffPayloads(t *testing.T) { + tests := []struct { + name string + direct string + terraform string + ignore []string + want []string + }{ + { + name: "identical", + direct: `{"name":"a","tasks":[{"task_key":"t"}]}`, + terraform: `{"name":"a","tasks":[{"task_key":"t"}]}`, + want: nil, + }, + { + name: "scalar mismatch", + direct: `{"name":"a"}`, + terraform: `{"name":"b"}`, + want: []string{"name"}, + }, + { + name: "missing on terraform", + direct: `{"name":"a","queue":{"enabled":true}}`, + terraform: `{"name":"a"}`, + want: []string{"queue"}, + }, + { + name: "missing on direct", + direct: `{"name":"a"}`, + terraform: `{"name":"a","max_concurrent_runs":1}`, + want: []string{"max_concurrent_runs"}, + }, + { + name: "nested slice element mismatch", + direct: `{"tasks":[{"task_key":"t","timeout_seconds":1}]}`, + terraform: `{"tasks":[{"task_key":"t","timeout_seconds":2}]}`, + want: []string{"tasks[0].timeout_seconds"}, + }, + { + name: "slice length mismatch", + direct: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + terraform: `{"tasks":[{"task_key":"a"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "number 1 vs 1.0 differ", + direct: `{"n":1}`, + terraform: `{"n":1.0}`, + want: []string{"n"}, + }, + { + name: "ignored path", + direct: `{"tasks":[{"timeout_seconds":1}]}`, + terraform: `{"tasks":[{"timeout_seconds":2}]}`, + ignore: []string{"tasks[*].timeout_seconds"}, + want: nil, + }, + { + name: "dotted map key is bracket-quoted", + direct: `{"spark_conf":{"spark.x.y":"1"}}`, + terraform: `{"spark_conf":{}}`, + want: []string{`spark_conf["spark.x.y"]`}, + }, + { + name: "dotted map key can be ignored", + direct: `{"c":{"spark_conf":{"spark.x.y":"1"}}}`, + terraform: `{"c":{"spark_conf":{}}}`, + ignore: []string{`c.spark_conf["spark.x.y"]`}, + want: nil, + }, + { + name: "tasks matched by key ignore order", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":2},{"task_key":"a","timeout_seconds":1}]}`, + want: nil, + }, + { + name: "tasks matched by key surface real diff at direct index", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":9},{"task_key":"a","timeout_seconds":1}]}`, + want: []string{"tasks[1].timeout_seconds"}, + }, + { + name: "task only on terraform reported at its index", + direct: `{"tasks":[{"task_key":"a"}]}`, + terraform: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "job_clusters matched by key ignore order", + direct: `{"job_clusters":[{"job_cluster_key":"x","new_cluster":{"num_workers":1}},{"job_cluster_key":"y","new_cluster":{"num_workers":2}}]}`, + terraform: `{"job_clusters":[{"job_cluster_key":"y","new_cluster":{"num_workers":2}},{"job_cluster_key":"x","new_cluster":{"num_workers":1}}]}`, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + diffs, err := DiffPayloads(json.RawMessage(tt.direct), json.RawMessage(tt.terraform), tt.ignore) + require.NoError(t, err) + + var paths []string + for _, d := range diffs { + paths = append(paths, d.Path) + } + assert.ElementsMatch(t, tt.want, paths) + }) + } +} diff --git a/bundle/fuzz/compare_test.go b/bundle/fuzz/compare_test.go index 46e506d75c..fd6807b56c 100644 --- a/bundle/fuzz/compare_test.go +++ b/bundle/fuzz/compare_test.go @@ -1,119 +1,273 @@ package fuzz import ( + "bytes" "encoding/json" - "testing" - - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "fmt" + "regexp" + "slices" + "strconv" + "strings" ) -func TestDiffPayloads(t *testing.T) { - tests := []struct { - name string - direct string - terraform string - ignore []string - want []string - }{ - { - name: "identical", - direct: `{"name":"a","tasks":[{"task_key":"t"}]}`, - terraform: `{"name":"a","tasks":[{"task_key":"t"}]}`, - want: nil, - }, - { - name: "scalar mismatch", - direct: `{"name":"a"}`, - terraform: `{"name":"b"}`, - want: []string{"name"}, - }, - { - name: "missing on terraform", - direct: `{"name":"a","queue":{"enabled":true}}`, - terraform: `{"name":"a"}`, - want: []string{"queue"}, - }, - { - name: "missing on direct", - direct: `{"name":"a"}`, - terraform: `{"name":"a","max_concurrent_runs":1}`, - want: []string{"max_concurrent_runs"}, - }, - { - name: "nested slice element mismatch", - direct: `{"tasks":[{"task_key":"t","timeout_seconds":1}]}`, - terraform: `{"tasks":[{"task_key":"t","timeout_seconds":2}]}`, - want: []string{"tasks[0].timeout_seconds"}, - }, - { - name: "slice length mismatch", - direct: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, - terraform: `{"tasks":[{"task_key":"a"}]}`, - want: []string{"tasks[1]"}, - }, - { - name: "number 1 vs 1.0 differ", - direct: `{"n":1}`, - terraform: `{"n":1.0}`, - want: []string{"n"}, - }, - { - name: "ignored path", - direct: `{"tasks":[{"timeout_seconds":1}]}`, - terraform: `{"tasks":[{"timeout_seconds":2}]}`, - ignore: []string{"tasks[*].timeout_seconds"}, - want: nil, - }, - { - name: "dotted map key is bracket-quoted", - direct: `{"spark_conf":{"spark.x.y":"1"}}`, - terraform: `{"spark_conf":{}}`, - want: []string{`spark_conf["spark.x.y"]`}, - }, - { - name: "dotted map key can be ignored", - direct: `{"c":{"spark_conf":{"spark.x.y":"1"}}}`, - terraform: `{"c":{"spark_conf":{}}}`, - ignore: []string{`c.spark_conf["spark.x.y"]`}, - want: nil, - }, - { - name: "tasks matched by key ignore order", - direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, - terraform: `{"tasks":[{"task_key":"b","timeout_seconds":2},{"task_key":"a","timeout_seconds":1}]}`, - want: nil, - }, - { - name: "tasks matched by key surface real diff at direct index", - direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, - terraform: `{"tasks":[{"task_key":"b","timeout_seconds":9},{"task_key":"a","timeout_seconds":1}]}`, - want: []string{"tasks[1].timeout_seconds"}, - }, - { - name: "task only on terraform reported at its index", - direct: `{"tasks":[{"task_key":"a"}]}`, - terraform: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, - want: []string{"tasks[1]"}, - }, - { - name: "job_clusters matched by key ignore order", - direct: `{"job_clusters":[{"job_cluster_key":"x","new_cluster":{"num_workers":1}},{"job_cluster_key":"y","new_cluster":{"num_workers":2}}]}`, - terraform: `{"job_clusters":[{"job_cluster_key":"y","new_cluster":{"num_workers":2}},{"job_cluster_key":"x","new_cluster":{"num_workers":1}}]}`, - want: nil, - }, - } - - for _, tt := range tests { - t.Run(tt.name, func(t *testing.T) { - diffs, err := DiffPayloads(json.RawMessage(tt.direct), json.RawMessage(tt.terraform), tt.ignore) - require.NoError(t, err) - - var paths []string - for _, d := range diffs { - paths = append(paths, d.Path) +// Difference is a single mismatch between the two engines' create payloads, +// located by a JSON-ish path (e.g. "tasks[0].new_cluster.num_workers"). +type Difference struct { + Path string + Direct any + Terraform any +} + +func (d Difference) String() string { + return fmt.Sprintf("%s: direct=%s terraform=%s", d.Path, render(d.Direct), render(d.Terraform)) +} + +// missing marks a value that is absent on one side. +type missing struct{} + +func render(v any) string { + if _, ok := v.(missing); ok { + return "" + } + b, err := json.Marshal(v) + if err != nil { + return fmt.Sprintf("%v", v) + } + return string(b) +} + +// DiffPayloads decodes both create payloads and returns every difference whose +// path is not explicitly ignored. ignorePaths are matched exactly against the +// rendered path, with "[*]" standing in for any slice index. +func DiffPayloads(direct, terraform json.RawMessage, ignorePaths []string) ([]Difference, error) { + d, err := decode(direct) + if err != nil { + return nil, fmt.Errorf("decoding direct payload: %w", err) + } + tf, err := decode(terraform) + if err != nil { + return nil, fmt.Errorf("decoding terraform payload: %w", err) + } + + var diffs []Difference + diffValue("", d, tf, &diffs) + + ignore := make(map[string]bool, len(ignorePaths)) + for _, p := range ignorePaths { + ignore[p] = true + } + + filtered := diffs[:0] + for _, diff := range diffs { + if !ignore[normalizePath(diff.Path)] { + filtered = append(filtered, diff) + } + } + return filtered, nil +} + +// decode unmarshals JSON using UseNumber so large int64 values (e.g. job ids, +// spark_context_id) are not corrupted by float64 rounding. See the encoding rule +// in the repo style guide. +func decode(raw json.RawMessage) (any, error) { + if len(raw) == 0 { + return nil, nil + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + var v any + if err := dec.Decode(&v); err != nil { + return nil, err + } + return v, nil +} + +func diffValue(path string, a, b any, diffs *[]Difference) { + switch av := a.(type) { + case map[string]any: + bv, ok := b.(map[string]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + keys := unionKeys(av, bv) + for _, k := range keys { + achild, aok := av[k] + bchild, bok := bv[k] + child := joinKey(path, k) + switch { + case aok && bok: + diffValue(child, achild, bchild, diffs) + case aok: + *diffs = append(*diffs, Difference{Path: child, Direct: achild, Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bchild}) + } + } + case []any: + bv, ok := b.([]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + // Slices whose elements carry a natural identity key (tasks, job clusters) + // are matched by that key so an engine emitting the same elements in a + // different order is not reported as a difference. Everything else is + // compared positionally. + if key := identityKey(av, bv); key != "" { + diffKeyedSlice(path, key, av, bv, diffs) + return + } + n := max(len(av), len(bv)) + for i := range n { + child := fmt.Sprintf("%s[%d]", path, i) + switch { + case i < len(av) && i < len(bv): + diffValue(child, av[i], bv[i], diffs) + case i < len(av): + *diffs = append(*diffs, Difference{Path: child, Direct: av[i], Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bv[i]}) } - assert.ElementsMatch(t, tt.want, paths) - }) + } + default: + if !scalarEqual(a, b) { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + } + } +} + +// identityFields are the keys, in priority order, that uniquely identify the +// elements of a payload slice. Job tasks and shared job clusters are the slices +// whose order is not significant but which the engines may emit differently. +var identityFields = []string{"task_key", "job_cluster_key"} + +// identityKey returns the field that identifies every element of both slices, or +// "" if the elements are not uniformly keyed objects (in which case the caller +// falls back to positional comparison). +func identityKey(a, b []any) string { + for _, field := range identityFields { + if allHaveKey(a, field) && allHaveKey(b, field) { + return field + } + } + return "" +} + +func allHaveKey(s []any, field string) bool { + if len(s) == 0 { + return false + } + for _, el := range s { + m, ok := el.(map[string]any) + if !ok { + return false + } + if _, ok := m[field].(string); !ok { + return false + } + } + return true +} + +// diffKeyedSlice matches elements of a and b by the value of key (which is unique +// within each slice for tasks/job clusters) and diffs each matched pair, +// reporting unmatched elements as present-on-one-side. Paths keep numeric indices +// so ignore-path [*] normalization still applies. +func diffKeyedSlice(path, key string, a, b []any, diffs *[]Difference) { + // identityFields are unique within a slice by API contract (no two job tasks + // share a task_key, no two job_clusters share a job_cluster_key), so keying by + // them is unambiguous. If a payload ever repeated a key, last-one-wins here and + // the duplicate would be mismatched rather than reported precisely; callers + // outside the job-create harness must not rely on this for non-unique keys. + bByKey := make(map[string]any, len(b)) + for _, el := range b { + bByKey[el.(map[string]any)[key].(string)] = el + } + + matched := make(map[string]bool, len(a)) + for i, el := range a { + child := fmt.Sprintf("%s[%d]", path, i) + k := el.(map[string]any)[key].(string) + matched[k] = true + if bel, ok := bByKey[k]; ok { + diffValue(child, el, bel, diffs) + } else { + *diffs = append(*diffs, Difference{Path: child, Direct: el, Terraform: missing{}}) + } + } + for j, el := range b { + k := el.(map[string]any)[key].(string) + if matched[k] { + continue + } + child := fmt.Sprintf("%s[%d]", path, j) + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: el}) + } +} + +// scalarEqual compares two JSON scalars. json.Number is compared by its string +// form so 1 and 1.0 don't masquerade as equal across engines. +func scalarEqual(a, b any) bool { + an, aok := a.(json.Number) + bn, bok := b.(json.Number) + if aok && bok { + return an.String() == bn.String() } + return a == b +} + +func unionKeys(a, b map[string]any) []string { + seen := map[string]bool{} + var keys []string + for k := range a { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + for k := range b { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + slices.Sort(keys) + return keys +} + +func joinKey(path, key string) string { + // Map keys can themselves contain dots or brackets (e.g. spark_conf entries + // like "spark.databricks.delta.preview.enabled"). Render those as bracketed, + // quoted segments so the path stays unambiguous and ignore entries can target + // a single key. + if key == "" || strings.ContainsAny(key, `.[]"`) { + return path + "[" + strconv.Quote(key) + "]" + } + if path == "" { + return key + } + return path + "." + key +} + +// indexRe matches numeric slice indices like "[12]" but not quoted string keys +// like ["spark.x"]. +var indexRe = regexp.MustCompile(`\[\d+\]`) + +// normalizePath replaces concrete slice indices with [*] so a single ignore +// entry can cover every element of a slice. +func normalizePath(path string) string { + return indexRe.ReplaceAllString(path, "[*]") +} + +// DefaultIgnorePaths lists create-payload paths that legitimately differ between +// the engines and are not parity bugs. Keep this list small and well-justified; +// every entry is a known, intentional divergence. +var DefaultIgnorePaths = []string{ + // The terraform provider strips the deprecated/ignored spark conf + // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while + // the direct engine forwards it verbatim. The backend ignores the key either + // way, so this is a benign provider-side filter rather than a parity bug. + `tasks[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, + `job_clusters[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, } diff --git a/bundle/fuzz/deploy_test.go b/bundle/fuzz/deploy_test.go index e42dbb7434..2328e0354e 100644 --- a/bundle/fuzz/deploy_test.go +++ b/bundle/fuzz/deploy_test.go @@ -61,21 +61,6 @@ func captureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, eng return body, nil } -// compareJobEngines deploys job under both engines and returns the create-payload -// differences that are not covered by DefaultIgnorePaths. An empty result means -// the engines produced equivalent create payloads. -func compareJobEngines(ctx context.Context, t *testing.T, job *resources.Job) ([]Difference, error) { - direct, err := captureJobCreate(ctx, t, job, "direct") - if err != nil { - return nil, fmt.Errorf("capturing direct payload: %w", err) - } - terraform, err := captureJobCreate(ctx, t, job, "terraform") - if err != nil { - return nil, fmt.Errorf("capturing terraform payload: %w", err) - } - return DiffPayloads(direct, terraform, DefaultIgnorePaths) -} - // writeJobBundle writes a minimal databricks.yml describing a single job. The // document is emitted as JSON, which is valid YAML, so we can reuse the job's // own JSON marshaling (which honors ForceSendFields) without a YAML dependency. diff --git a/bundle/fuzz/doc.go b/bundle/fuzz/doc.go new file mode 100644 index 0000000000..cf898d3ec1 --- /dev/null +++ b/bundle/fuzz/doc.go @@ -0,0 +1,17 @@ +// Package fuzz provides randomized generators and harnesses that compare how the +// terraform and direct deploy engines translate the same bundle resource into an +// API create payload. See DECO-25361. +// +// The first technique implemented here generates a random resource config and +// checks for differences in the create payload between the terraform and direct +// engines. Generators are seeded so that any divergence found by the fuzz driver +// can be reproduced from the printed seed. +// +// Only jobs are covered for now. Extending the harness to other resource kinds +// (pipelines, apps, ...) is tracked as follow-up work under DECO-25361. +// +// Everything else in the package lives in _test.go files: the package is a +// test-only utility and nothing in the product imports it, so keeping the logic +// out of the regular build avoids shipping dead code. This file exists only to +// carry the package documentation in a non-test file. +package fuzz diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go index 7b0d0df8ea..88a3c5a3b6 100644 --- a/bundle/fuzz/fuzz_test.go +++ b/bundle/fuzz/fuzz_test.go @@ -154,12 +154,35 @@ func FuzzJobCreateParity(f *testing.F) { // checkJobParity generates the job for seed, deploys it under both engines, and // fails the test with reproduction details if the create payloads diverge. +// +// A deploy/capture failure is not a create-payload divergence, so the three +// outcomes are handled distinctly to keep nightly triage from misdirecting a +// deploy failure into regressionSeeds (which is only for real payload diffs): +// - neither engine deployed: the generator produced a config nothing accepts, +// so skip (logging both errors) rather than flag a parity bug. +// - exactly one engine deployed: the engines disagree on whether the config is +// even valid. That is a real divergence worth failing on, but an acceptance +// divergence, not a payload diff, so it is reported as such. +// - both deployed: compare the captured create payloads. func checkJobParity(t *testing.T, seed int64) { t.Helper() job := GenerateJob(newRNG(seed)) - diffs, err := compareJobEngines(t.Context(), t, job) - require.NoErrorf(t, err, "seed %d", seed) + ctx := t.Context() + direct, directErr := captureJobCreate(ctx, t, job, "direct") + terraform, tfErr := captureJobCreate(ctx, t, job, "terraform") + + switch { + case directErr != nil && tfErr != nil: + t.Skipf("seed %d: config did not deploy under either engine (not a parity divergence)\ndirect: %v\nterraform: %v", seed, directErr, tfErr) + case directErr != nil: + t.Fatalf("seed %d: direct rejected a config terraform accepted (engine acceptance divergence, not a payload diff): %v", seed, directErr) + case tfErr != nil: + t.Fatalf("seed %d: terraform rejected a config direct accepted (engine acceptance divergence, not a payload diff): %v", seed, tfErr) + } + + diffs, err := DiffPayloads(direct, terraform, DefaultIgnorePaths) + require.NoErrorf(t, err, "seed %d: comparing create payloads", seed) if len(diffs) > 0 { jobJSON, _ := json.MarshalIndent(job, "", " ") diff --git a/bundle/fuzz/generate.go b/bundle/fuzz/generate.go deleted file mode 100644 index 697748e03f..0000000000 --- a/bundle/fuzz/generate.go +++ /dev/null @@ -1,356 +0,0 @@ -// Package fuzz provides randomized generators and harnesses that compare how the -// terraform and direct deploy engines translate the same bundle resource into an -// API create payload. See DECO-25361. -// -// The first technique implemented here generates a random resource config and -// checks for differences in the create payload between the terraform and direct -// engines. Generators are seeded so that any divergence found by the fuzz driver -// can be reproduced from the printed seed. -// -// Only jobs are covered for now. Extending the harness to other resource kinds -// (pipelines, apps, ...) is tracked as follow-up work under DECO-25361. -package fuzz - -import ( - "fmt" - "math/rand/v2" - "strconv" - - "github.com/databricks/cli/bundle/config/resources" - "github.com/databricks/databricks-sdk-go/service/compute" - "github.com/databricks/databricks-sdk-go/service/jobs" -) - -// Value pools are intentionally small and valid-looking: the goal is to exercise -// the engines' config->payload translation across many field combinations, not to -// stress the API with invalid values (which the testserver would reject before we -// can compare payloads). -var ( - sparkVersions = []string{"13.3.x-scala2.12", "14.3.x-scala2.12", "15.4.x-scala2.12", "16.4.x-scala2.12"} - nodeTypeIDs = []string{"i3.xlarge", "m5.large", "r5.xlarge", "Standard_DS3_v2"} - timezones = []string{"UTC", "America/Los_Angeles", "Europe/Amsterdam"} - cronExprs = []string{"0 0 12 * * ?", "0 15 10 ? * MON-FRI", "0 0/30 * * * ?"} - pauseStatuses = []jobs.PauseStatus{jobs.PauseStatusPaused, jobs.PauseStatusUnpaused} - performance = []jobs.PerformanceTarget{jobs.PerformanceTargetPerformanceOptimized, jobs.PerformanceTargetStandard} - timeUnits = []string{"HOURS", "DAYS", "WEEKS"} - healthMetrics = []string{"RUN_DURATION_SECONDS", "STREAMING_BACKLOG_BYTES", "STREAMING_BACKLOG_RECORDS"} - conditionOps = []string{"EQUAL_TO", "NOT_EQUAL", "GREATER_THAN", "LESS_THAN_OR_EQUAL"} - runIfs = []string{"ALL_SUCCESS", "AT_LEAST_ONE_SUCCESS", "NONE_FAILED", "ALL_DONE"} - gitProviders = []jobs.GitProvider{jobs.GitProviderGitHub, jobs.GitProviderGitLab, jobs.GitProviderAzureDevOpsServices} -) - -// GenerateJob builds a random, well-formed job config driven entirely by rng, so -// the same seed always produces the same job. It deliberately favors fields whose -// translation tends to differ between engines (tasks, clusters, schedules, -// notifications, tags, zero-able scalars). -// -// TODO(DECO-25361): generalize the harness across resource kinds so pipelines, -// apps, etc. get the same create-payload parity coverage as jobs. -func GenerateJob(rng *rand.Rand) *resources.Job { - job := &resources.Job{} - job.Name = randName(rng, "job") - - if chance(rng, 0.5) { - job.Description = randSentence(rng) - } - if chance(rng, 0.4) { - job.MaxConcurrentRuns = rng.IntN(10) + 1 - } - if chance(rng, 0.4) { - job.TimeoutSeconds = rng.IntN(7200) - } - if chance(rng, 0.3) { - job.PerformanceTarget = oneOf(rng, performance) - } - if chance(rng, 0.5) { - job.Tags = randTags(rng) - } - if chance(rng, 0.3) { - job.GitSource = randGitSource(rng) - } - - randScheduling(rng, job) - - if chance(rng, 0.3) { - job.EmailNotifications = randEmailNotifications(rng) - } - if chance(rng, 0.2) { - job.WebhookNotifications = randWebhookNotifications(rng) - } - if chance(rng, 0.3) { - job.NotificationSettings = &jobs.JobNotificationSettings{ - NoAlertForCanceledRuns: chance(rng, 0.5), - NoAlertForSkippedRuns: chance(rng, 0.5), - } - } - if chance(rng, 0.3) { - job.Health = randHealth(rng) - } - if chance(rng, 0.3) { - job.Parameters = randParameters(rng) - } - if chance(rng, 0.3) { - job.Queue = &jobs.QueueSettings{Enabled: chance(rng, 0.5)} - } - - // Generate shared job clusters first so tasks can reference them by key. - var jobClusterKeys []string - if chance(rng, 0.5) { - n := rng.IntN(2) + 1 - for i := range n { - key := fmt.Sprintf("cluster_%d", i) - jobClusterKeys = append(jobClusterKeys, key) - job.JobClusters = append(job.JobClusters, jobs.JobCluster{ - JobClusterKey: key, - NewCluster: randClusterSpec(rng), - }) - } - } - - nTasks := rng.IntN(3) + 1 - var taskKeys []string - for i := range nTasks { - task := randTask(rng, i, jobClusterKeys) - // Randomly chain dependencies onto previously generated tasks. - if len(taskKeys) > 0 && chance(rng, 0.4) { - dep := taskKeys[rng.IntN(len(taskKeys))] - task.DependsOn = []jobs.TaskDependency{{TaskKey: dep}} - if chance(rng, 0.5) { - task.RunIf = jobs.RunIf(oneOf(rng, runIfs)) - } - } - taskKeys = append(taskKeys, task.TaskKey) - job.Tasks = append(job.Tasks, task) - } - - return job -} - -// randScheduling sets at most one of schedule/trigger/continuous, which are -// mutually exclusive ways to launch a job. -func randScheduling(rng *rand.Rand, job *resources.Job) { - switch rng.IntN(5) { - case 0: - job.Schedule = &jobs.CronSchedule{ - QuartzCronExpression: oneOf(rng, cronExprs), - TimezoneId: oneOf(rng, timezones), - PauseStatus: oneOf(rng, pauseStatuses), - } - case 1: - job.Trigger = &jobs.TriggerSettings{ - PauseStatus: oneOf(rng, pauseStatuses), - Periodic: &jobs.PeriodicTriggerConfiguration{ - Interval: rng.IntN(12) + 1, - Unit: jobs.PeriodicTriggerConfigurationTimeUnit(oneOf(rng, timeUnits)), - }, - } - case 2: - job.Trigger = &jobs.TriggerSettings{ - PauseStatus: oneOf(rng, pauseStatuses), - FileArrival: &jobs.FileArrivalTriggerConfiguration{ - Url: "s3://" + randWord(rng) + "/" + randWord(rng), - }, - } - case 3: - job.Continuous = &jobs.Continuous{PauseStatus: oneOf(rng, pauseStatuses)} - default: - // no scheduling - } -} - -func randTask(rng *rand.Rand, idx int, jobClusterKeys []string) jobs.Task { - task := jobs.Task{TaskKey: fmt.Sprintf("task_%d", idx)} - - // Use absolute workspace paths with source=WORKSPACE so the generated bundle - // never depends on local files existing on disk (which deploy would reject). - // condition_task needs no compute, so it is handled separately below. - needsCompute := true - switch rng.IntN(4) { - case 0: - task.NotebookTask = &jobs.NotebookTask{ - NotebookPath: "/Workspace/Users/test/" + randName(rng, "nb"), - Source: jobs.SourceWorkspace, - } - case 1: - task.SparkPythonTask = &jobs.SparkPythonTask{ - PythonFile: "/Workspace/Users/test/" + randName(rng, "main") + ".py", - Source: jobs.SourceWorkspace, - } - case 2: - task.PythonWheelTask = &jobs.PythonWheelTask{ - PackageName: randName(rng, "pkg"), - EntryPoint: "main", - } - case 3: - task.ConditionTask = &jobs.ConditionTask{ - Left: randWord(rng), - Op: jobs.ConditionTaskOp(oneOf(rng, conditionOps)), - Right: randWord(rng), - } - needsCompute = false - } - - if needsCompute { - assignCompute(rng, &task, jobClusterKeys) - if chance(rng, 0.4) { - task.Libraries = randLibraries(rng) - } - } - - if chance(rng, 0.3) { - task.TimeoutSeconds = rng.IntN(3600) - } - if chance(rng, 0.3) { - task.MaxRetries = rng.IntN(5) - task.MinRetryIntervalMillis = rng.IntN(60000) - task.RetryOnTimeout = chance(rng, 0.5) - } - return task -} - -// assignCompute attaches exactly one compute source, which notebook/python/wheel -// tasks require: a shared job cluster (when available), a brand-new cluster, or an -// existing cluster id. -func assignCompute(rng *rand.Rand, task *jobs.Task, jobClusterKeys []string) { - const ( - computeNew = iota - computeExisting - computeShared - ) - options := []int{computeNew, computeExisting} - if len(jobClusterKeys) > 0 { - options = append(options, computeShared) - } - switch oneOf(rng, options) { - case computeNew: - spec := randClusterSpec(rng) - task.NewCluster = &spec - case computeExisting: - task.ExistingClusterId = randName(rng, "cluster") - case computeShared: - task.JobClusterKey = oneOf(rng, jobClusterKeys) - } -} - -func randClusterSpec(rng *rand.Rand) compute.ClusterSpec { - spec := compute.ClusterSpec{ - SparkVersion: oneOf(rng, sparkVersions), - NodeTypeId: oneOf(rng, nodeTypeIDs), - } - if chance(rng, 0.5) { - spec.NumWorkers = rng.IntN(8) - } else { - spec.Autoscale = &compute.AutoScale{ - MinWorkers: 1, - MaxWorkers: rng.IntN(8) + 2, - } - } - if chance(rng, 0.4) { - spec.SparkConf = map[string]string{ - "spark.databricks.delta.preview.enabled": "true", - "spark.speculation": strconv.FormatBool(chance(rng, 0.5)), - } - } - if chance(rng, 0.3) { - spec.CustomTags = randTags(rng) - } - if chance(rng, 0.3) { - spec.SparkEnvVars = map[string]string{"PYSPARK_PYTHON": "/databricks/python3/bin/python3"} - } - if chance(rng, 0.3) { - spec.DriverNodeTypeId = oneOf(rng, nodeTypeIDs) - } - return spec -} - -func randGitSource(rng *rand.Rand) *jobs.GitSource { - src := &jobs.GitSource{ - GitProvider: oneOf(rng, gitProviders), - GitUrl: "https://example.com/" + randWord(rng) + "/" + randWord(rng) + ".git", - } - switch rng.IntN(3) { - case 0: - src.GitBranch = oneOf(rng, []string{"main", "develop", "release"}) - case 1: - src.GitTag = "v" + fmt.Sprintf("%d.%d.0", rng.IntN(5), rng.IntN(10)) - case 2: - src.GitCommit = fmt.Sprintf("%040x", rng.Int64()) - } - return src -} - -func randEmailNotifications(rng *rand.Rand) *jobs.JobEmailNotifications { - email := randWord(rng) + "@example.com" - n := &jobs.JobEmailNotifications{NoAlertForSkippedRuns: chance(rng, 0.5)} - if chance(rng, 0.6) { - n.OnFailure = []string{email} - } - if chance(rng, 0.4) { - n.OnSuccess = []string{email} - } - if chance(rng, 0.3) { - n.OnStart = []string{email} - } - return n -} - -func randWebhookNotifications(rng *rand.Rand) *jobs.WebhookNotifications { - hook := []jobs.Webhook{{Id: randName(rng, "hook")}} - n := &jobs.WebhookNotifications{} - if chance(rng, 0.6) { - n.OnFailure = hook - } - if chance(rng, 0.4) { - n.OnSuccess = hook - } - return n -} - -func randHealth(rng *rand.Rand) *jobs.JobsHealthRules { - return &jobs.JobsHealthRules{ - Rules: []jobs.JobsHealthRule{ - { - Metric: jobs.JobsHealthMetric(oneOf(rng, healthMetrics)), - Op: jobs.JobsHealthOperatorGreaterThan, - Value: int64(rng.IntN(3600) + 1), - }, - }, - } -} - -func randLibraries(rng *rand.Rand) []compute.Library { - n := rng.IntN(2) + 1 - libs := make([]compute.Library, 0, n) - for range n { - switch rng.IntN(3) { - case 0: - libs = append(libs, compute.Library{Pypi: &compute.PythonPyPiLibrary{Package: randWord(rng)}}) - case 1: - libs = append(libs, compute.Library{Maven: &compute.MavenLibrary{Coordinates: "org.example:" + randWord(rng) + ":1.0.0"}}) - case 2: - libs = append(libs, compute.Library{Whl: "/Workspace/Users/test/" + randName(rng, "lib") + ".whl"}) - } - } - return libs -} - -func randParameters(rng *rand.Rand) []jobs.JobParameterDefinition { - n := rng.IntN(3) + 1 - params := make([]jobs.JobParameterDefinition, 0, n) - for i := range n { - params = append(params, jobs.JobParameterDefinition{ - Name: fmt.Sprintf("param_%d", i), - Default: randWord(rng), - }) - } - return params -} - -func randTags(rng *rand.Rand) map[string]string { - n := rng.IntN(3) + 1 - tags := make(map[string]string, n) - for i := range n { - tags[fmt.Sprintf("tag_%d", i)] = randWord(rng) - } - return tags -} diff --git a/bundle/fuzz/generate_invariants_test.go b/bundle/fuzz/generate_invariants_test.go new file mode 100644 index 0000000000..f7a797e8f5 --- /dev/null +++ b/bundle/fuzz/generate_invariants_test.go @@ -0,0 +1,47 @@ +package fuzz + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGenerateJobIsDeterministic(t *testing.T) { + a := GenerateJob(newRNG(42)) + b := GenerateJob(newRNG(42)) + assert.Equal(t, a, b, "same seed must produce identical job") +} + +func TestGenerateJobIsWellFormed(t *testing.T) { + for seed := range int64(200) { + job := GenerateJob(newRNG(seed)) + require.NotEmptyf(t, job.Name, "seed %d: job must have a name", seed) + require.NotEmptyf(t, job.Tasks, "seed %d: job must have at least one task", seed) + + clusterKeys := map[string]bool{} + for _, jc := range job.JobClusters { + clusterKeys[jc.JobClusterKey] = true + } + + taskKeys := map[string]bool{} + for _, task := range job.Tasks { + require.NotEmptyf(t, task.TaskKey, "seed %d: task must have a key", seed) + taskKeys[task.TaskKey] = true + + // A task referencing a job cluster must reference one we generated. + if task.JobClusterKey != "" { + assert.Containsf(t, clusterKeys, task.JobClusterKey, + "seed %d: task %q references unknown job cluster %q", seed, task.TaskKey, task.JobClusterKey) + } + } + + // Every dependency must point at a task that exists in this job. + for _, task := range job.Tasks { + for _, dep := range task.DependsOn { + assert.Containsf(t, taskKeys, dep.TaskKey, + "seed %d: task %q depends on unknown task %q", seed, task.TaskKey, dep.TaskKey) + } + } + } +} diff --git a/bundle/fuzz/generate_test.go b/bundle/fuzz/generate_test.go index f7a797e8f5..1b0acf55b0 100644 --- a/bundle/fuzz/generate_test.go +++ b/bundle/fuzz/generate_test.go @@ -1,47 +1,345 @@ package fuzz import ( - "testing" + "fmt" + "math/rand/v2" + "strconv" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/require" + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" ) -func TestGenerateJobIsDeterministic(t *testing.T) { - a := GenerateJob(newRNG(42)) - b := GenerateJob(newRNG(42)) - assert.Equal(t, a, b, "same seed must produce identical job") -} +// Value pools are intentionally small and valid-looking: the goal is to exercise +// the engines' config->payload translation across many field combinations, not to +// stress the API with invalid values (which the testserver would reject before we +// can compare payloads). +var ( + sparkVersions = []string{"13.3.x-scala2.12", "14.3.x-scala2.12", "15.4.x-scala2.12", "16.4.x-scala2.12"} + nodeTypeIDs = []string{"i3.xlarge", "m5.large", "r5.xlarge", "Standard_DS3_v2"} + timezones = []string{"UTC", "America/Los_Angeles", "Europe/Amsterdam"} + cronExprs = []string{"0 0 12 * * ?", "0 15 10 ? * MON-FRI", "0 0/30 * * * ?"} + pauseStatuses = []jobs.PauseStatus{jobs.PauseStatusPaused, jobs.PauseStatusUnpaused} + performance = []jobs.PerformanceTarget{jobs.PerformanceTargetPerformanceOptimized, jobs.PerformanceTargetStandard} + timeUnits = []string{"HOURS", "DAYS", "WEEKS"} + healthMetrics = []string{"RUN_DURATION_SECONDS", "STREAMING_BACKLOG_BYTES", "STREAMING_BACKLOG_RECORDS"} + conditionOps = []string{"EQUAL_TO", "NOT_EQUAL", "GREATER_THAN", "LESS_THAN_OR_EQUAL"} + runIfs = []string{"ALL_SUCCESS", "AT_LEAST_ONE_SUCCESS", "NONE_FAILED", "ALL_DONE"} + gitProviders = []jobs.GitProvider{jobs.GitProviderGitHub, jobs.GitProviderGitLab, jobs.GitProviderAzureDevOpsServices} +) + +// GenerateJob builds a random, well-formed job config driven entirely by rng, so +// the same seed always produces the same job. It deliberately favors fields whose +// translation tends to differ between engines (tasks, clusters, schedules, +// notifications, tags, zero-able scalars). +// +// TODO(DECO-25361): generalize the harness across resource kinds so pipelines, +// apps, etc. get the same create-payload parity coverage as jobs. +func GenerateJob(rng *rand.Rand) *resources.Job { + job := &resources.Job{} + job.Name = randName(rng, "job") + + if chance(rng, 0.5) { + job.Description = randSentence(rng) + } + if chance(rng, 0.4) { + job.MaxConcurrentRuns = rng.IntN(10) + 1 + } + if chance(rng, 0.4) { + job.TimeoutSeconds = rng.IntN(7200) + } + if chance(rng, 0.3) { + job.PerformanceTarget = oneOf(rng, performance) + } + if chance(rng, 0.5) { + job.Tags = randTags(rng) + } + if chance(rng, 0.3) { + job.GitSource = randGitSource(rng) + } -func TestGenerateJobIsWellFormed(t *testing.T) { - for seed := range int64(200) { - job := GenerateJob(newRNG(seed)) - require.NotEmptyf(t, job.Name, "seed %d: job must have a name", seed) - require.NotEmptyf(t, job.Tasks, "seed %d: job must have at least one task", seed) + randScheduling(rng, job) - clusterKeys := map[string]bool{} - for _, jc := range job.JobClusters { - clusterKeys[jc.JobClusterKey] = true + if chance(rng, 0.3) { + job.EmailNotifications = randEmailNotifications(rng) + } + if chance(rng, 0.2) { + job.WebhookNotifications = randWebhookNotifications(rng) + } + if chance(rng, 0.3) { + job.NotificationSettings = &jobs.JobNotificationSettings{ + NoAlertForCanceledRuns: chance(rng, 0.5), + NoAlertForSkippedRuns: chance(rng, 0.5), } + } + if chance(rng, 0.3) { + job.Health = randHealth(rng) + } + if chance(rng, 0.3) { + job.Parameters = randParameters(rng) + } + if chance(rng, 0.3) { + job.Queue = &jobs.QueueSettings{Enabled: chance(rng, 0.5)} + } - taskKeys := map[string]bool{} - for _, task := range job.Tasks { - require.NotEmptyf(t, task.TaskKey, "seed %d: task must have a key", seed) - taskKeys[task.TaskKey] = true + // Generate shared job clusters first so tasks can reference them by key. + var jobClusterKeys []string + if chance(rng, 0.5) { + n := rng.IntN(2) + 1 + for i := range n { + key := fmt.Sprintf("cluster_%d", i) + jobClusterKeys = append(jobClusterKeys, key) + job.JobClusters = append(job.JobClusters, jobs.JobCluster{ + JobClusterKey: key, + NewCluster: randClusterSpec(rng), + }) + } + } - // A task referencing a job cluster must reference one we generated. - if task.JobClusterKey != "" { - assert.Containsf(t, clusterKeys, task.JobClusterKey, - "seed %d: task %q references unknown job cluster %q", seed, task.TaskKey, task.JobClusterKey) + nTasks := rng.IntN(3) + 1 + var taskKeys []string + for i := range nTasks { + task := randTask(rng, i, jobClusterKeys) + // Randomly chain dependencies onto previously generated tasks. + if len(taskKeys) > 0 && chance(rng, 0.4) { + dep := taskKeys[rng.IntN(len(taskKeys))] + task.DependsOn = []jobs.TaskDependency{{TaskKey: dep}} + if chance(rng, 0.5) { + task.RunIf = jobs.RunIf(oneOf(rng, runIfs)) } } + taskKeys = append(taskKeys, task.TaskKey) + job.Tasks = append(job.Tasks, task) + } - // Every dependency must point at a task that exists in this job. - for _, task := range job.Tasks { - for _, dep := range task.DependsOn { - assert.Containsf(t, taskKeys, dep.TaskKey, - "seed %d: task %q depends on unknown task %q", seed, task.TaskKey, dep.TaskKey) - } + return job +} + +// randScheduling sets at most one of schedule/trigger/continuous, which are +// mutually exclusive ways to launch a job. +func randScheduling(rng *rand.Rand, job *resources.Job) { + switch rng.IntN(5) { + case 0: + job.Schedule = &jobs.CronSchedule{ + QuartzCronExpression: oneOf(rng, cronExprs), + TimezoneId: oneOf(rng, timezones), + PauseStatus: oneOf(rng, pauseStatuses), + } + case 1: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + Periodic: &jobs.PeriodicTriggerConfiguration{ + Interval: rng.IntN(12) + 1, + Unit: jobs.PeriodicTriggerConfigurationTimeUnit(oneOf(rng, timeUnits)), + }, + } + case 2: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + FileArrival: &jobs.FileArrivalTriggerConfiguration{ + Url: "s3://" + randWord(rng) + "/" + randWord(rng), + }, + } + case 3: + job.Continuous = &jobs.Continuous{PauseStatus: oneOf(rng, pauseStatuses)} + default: + // no scheduling + } +} + +func randTask(rng *rand.Rand, idx int, jobClusterKeys []string) jobs.Task { + task := jobs.Task{TaskKey: fmt.Sprintf("task_%d", idx)} + + // Use absolute workspace paths with source=WORKSPACE so the generated bundle + // never depends on local files existing on disk (which deploy would reject). + // condition_task needs no compute, so it is handled separately below. + needsCompute := true + switch rng.IntN(4) { + case 0: + task.NotebookTask = &jobs.NotebookTask{ + NotebookPath: "/Workspace/Users/test/" + randName(rng, "nb"), + Source: jobs.SourceWorkspace, + } + case 1: + task.SparkPythonTask = &jobs.SparkPythonTask{ + PythonFile: "/Workspace/Users/test/" + randName(rng, "main") + ".py", + Source: jobs.SourceWorkspace, + } + case 2: + task.PythonWheelTask = &jobs.PythonWheelTask{ + PackageName: randName(rng, "pkg"), + EntryPoint: "main", + } + case 3: + task.ConditionTask = &jobs.ConditionTask{ + Left: randWord(rng), + Op: jobs.ConditionTaskOp(oneOf(rng, conditionOps)), + Right: randWord(rng), + } + needsCompute = false + } + + if needsCompute { + assignCompute(rng, &task, jobClusterKeys) + if chance(rng, 0.4) { + task.Libraries = randLibraries(rng) + } + } + + if chance(rng, 0.3) { + task.TimeoutSeconds = rng.IntN(3600) + } + if chance(rng, 0.3) { + task.MaxRetries = rng.IntN(5) + task.MinRetryIntervalMillis = rng.IntN(60000) + task.RetryOnTimeout = chance(rng, 0.5) + } + return task +} + +// assignCompute attaches exactly one compute source, which notebook/python/wheel +// tasks require: a shared job cluster (when available), a brand-new cluster, or an +// existing cluster id. +func assignCompute(rng *rand.Rand, task *jobs.Task, jobClusterKeys []string) { + const ( + computeNew = iota + computeExisting + computeShared + ) + options := []int{computeNew, computeExisting} + if len(jobClusterKeys) > 0 { + options = append(options, computeShared) + } + switch oneOf(rng, options) { + case computeNew: + spec := randClusterSpec(rng) + task.NewCluster = &spec + case computeExisting: + task.ExistingClusterId = randName(rng, "cluster") + case computeShared: + task.JobClusterKey = oneOf(rng, jobClusterKeys) + } +} + +func randClusterSpec(rng *rand.Rand) compute.ClusterSpec { + spec := compute.ClusterSpec{ + SparkVersion: oneOf(rng, sparkVersions), + NodeTypeId: oneOf(rng, nodeTypeIDs), + } + if chance(rng, 0.5) { + spec.NumWorkers = rng.IntN(8) + } else { + spec.Autoscale = &compute.AutoScale{ + MinWorkers: 1, + MaxWorkers: rng.IntN(8) + 2, + } + } + if chance(rng, 0.4) { + spec.SparkConf = map[string]string{ + "spark.databricks.delta.preview.enabled": "true", + "spark.speculation": strconv.FormatBool(chance(rng, 0.5)), } } + if chance(rng, 0.3) { + spec.CustomTags = randTags(rng) + } + if chance(rng, 0.3) { + spec.SparkEnvVars = map[string]string{"PYSPARK_PYTHON": "/databricks/python3/bin/python3"} + } + if chance(rng, 0.3) { + spec.DriverNodeTypeId = oneOf(rng, nodeTypeIDs) + } + return spec +} + +func randGitSource(rng *rand.Rand) *jobs.GitSource { + src := &jobs.GitSource{ + GitProvider: oneOf(rng, gitProviders), + GitUrl: "https://example.com/" + randWord(rng) + "/" + randWord(rng) + ".git", + } + switch rng.IntN(3) { + case 0: + src.GitBranch = oneOf(rng, []string{"main", "develop", "release"}) + case 1: + src.GitTag = "v" + fmt.Sprintf("%d.%d.0", rng.IntN(5), rng.IntN(10)) + case 2: + src.GitCommit = fmt.Sprintf("%040x", rng.Int64()) + } + return src +} + +func randEmailNotifications(rng *rand.Rand) *jobs.JobEmailNotifications { + email := randWord(rng) + "@example.com" + n := &jobs.JobEmailNotifications{NoAlertForSkippedRuns: chance(rng, 0.5)} + if chance(rng, 0.6) { + n.OnFailure = []string{email} + } + if chance(rng, 0.4) { + n.OnSuccess = []string{email} + } + if chance(rng, 0.3) { + n.OnStart = []string{email} + } + return n +} + +func randWebhookNotifications(rng *rand.Rand) *jobs.WebhookNotifications { + hook := []jobs.Webhook{{Id: randName(rng, "hook")}} + n := &jobs.WebhookNotifications{} + if chance(rng, 0.6) { + n.OnFailure = hook + } + if chance(rng, 0.4) { + n.OnSuccess = hook + } + return n +} + +func randHealth(rng *rand.Rand) *jobs.JobsHealthRules { + return &jobs.JobsHealthRules{ + Rules: []jobs.JobsHealthRule{ + { + Metric: jobs.JobsHealthMetric(oneOf(rng, healthMetrics)), + Op: jobs.JobsHealthOperatorGreaterThan, + Value: int64(rng.IntN(3600) + 1), + }, + }, + } +} + +func randLibraries(rng *rand.Rand) []compute.Library { + n := rng.IntN(2) + 1 + libs := make([]compute.Library, 0, n) + for range n { + switch rng.IntN(3) { + case 0: + libs = append(libs, compute.Library{Pypi: &compute.PythonPyPiLibrary{Package: randWord(rng)}}) + case 1: + libs = append(libs, compute.Library{Maven: &compute.MavenLibrary{Coordinates: "org.example:" + randWord(rng) + ":1.0.0"}}) + case 2: + libs = append(libs, compute.Library{Whl: "/Workspace/Users/test/" + randName(rng, "lib") + ".whl"}) + } + } + return libs +} + +func randParameters(rng *rand.Rand) []jobs.JobParameterDefinition { + n := rng.IntN(3) + 1 + params := make([]jobs.JobParameterDefinition, 0, n) + for i := range n { + params = append(params, jobs.JobParameterDefinition{ + Name: fmt.Sprintf("param_%d", i), + Default: randWord(rng), + }) + } + return params +} + +func randTags(rng *rand.Rand) map[string]string { + n := rng.IntN(3) + 1 + tags := make(map[string]string, n) + for i := range n { + tags[fmt.Sprintf("tag_%d", i)] = randWord(rng) + } + return tags } diff --git a/bundle/fuzz/rand.go b/bundle/fuzz/rand_test.go similarity index 100% rename from bundle/fuzz/rand.go rename to bundle/fuzz/rand_test.go