diff --git a/.github/workflows/push.yml b/.github/workflows/push.yml index f80cfba7adc..cc69da23f48 100644 --- a/.github/workflows/push.yml +++ b/.github/workflows/push.yml @@ -370,6 +370,95 @@ jobs: run: | go tool -modfile=tools/task/go.mod task test-sandbox + test-fuzz: + needs: + - cleanups + + # The terraform/direct create-payload parity tests run two real `bundle deploy` + # invocations per seed, so they are too slow for every PR and too noisy to gate + # the merge queue. Run them on the nightly schedule to catch engine drift; not + # part of test-result for that reason. + if: ${{ github.event_name == 'schedule' }} + name: "task test-fuzz" + runs-on: + group: databricks-protected-runner-group-large + labels: linux-ubuntu-latest-large + + defaults: + run: + shell: bash + + permissions: + id-token: write + contents: read + # Needed by the failure-reporting step below to open/comment a tracking issue. + issues: write + + steps: + - name: Checkout repository and submodules + uses: actions/checkout@de0fac2e4500dabe0009e67214ff5f5447ce83dd # v6.0.2 + + - name: Setup build environment + uses: ./.github/actions/setup-build-environment + with: + cache-key: test-fuzz + + - name: Run tests + env: + # Shift the seed window by the run number every nightly run so CI + # explores configs it has never tested before instead of re-checking a + # fixed set. The window is kept modest (each seed runs two real deploys) + # since the exploration comes from rotating the window, not its size; + # raise it once nightly timings are known. A divergence prints + # FUZZ_SEED= for one-command reproduction. + # + # offset = GITHUB_RUN_NUMBER * FUZZ_SEEDS. GITHUB_RUN_NUMBER is a + # built-in, monotonically increasing, unique-per-run integer, so as long + # as FUZZ_SEEDS is constant the windows are non-overlapping (gaps from + # non-schedule runs are fine; we only need fresh seeds, not every seed). + FUZZ_SEEDS: "25" + run: | + export FUZZ_SEED_OFFSET=$(( GITHUB_RUN_NUMBER * FUZZ_SEEDS )) + go tool -modfile=tools/task/go.mod task test-fuzz + + # This job is intentionally excluded from test-result, so a failure here is + # invisible unless someone watches the Actions tab. Surface it as a GitHub + # issue instead. Reuse a single open issue (deduped by label) so a recurring + # divergence doesn't open one issue per night. + - name: Report failure + if: ${{ failure() }} + env: + GH_TOKEN: ${{ secrets.GITHUB_TOKEN }} + RUN_URL: ${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }} + run: | + gh label create fuzz-nightly \ + --description "Nightly terraform/direct create-payload parity failures" \ + --color FBCA04 2>/dev/null || true + + body=$(cat <\`. + Reproduce locally with: + + \`\`\` + FUZZ_SEED= task test-fuzz + \`\`\` + + Once fixed, add the seed to \`regressionSeeds\` in \`bundle/fuzz/fuzz_test.go\` + in the same PR so the divergence can never silently regress. + EOF + ) + + existing=$(gh issue list --state open --label fuzz-nightly --json number --jq '.[0].number') + if [ -n "$existing" ]; then + gh issue comment "$existing" --body "$body" + else + gh issue create --title "Nightly fuzz parity failure" --label fuzz-nightly --body "$body" + fi + # This job groups the result of all the above test jobs. # It is a required check, so it blocks auto-merge and the merge queue. # diff --git a/.gitignore b/.gitignore index 116ec5e976f..9da32ca0358 100644 --- a/.gitignore +++ b/.gitignore @@ -58,6 +58,10 @@ tools/testmask/testmask # Release artifacts dist/ +# Terraform binary + provider mirror provisioned by acceptance/install_terraform.py +# for the bundle/fuzz parity tests (see Taskfile `test-fuzz`). +/build/ + # Local development notes, tmp /pr-* /tmp/ diff --git a/Taskfile.yml b/Taskfile.yml index fe6966653bf..2f59f4fa2a4 100644 --- a/Taskfile.yml +++ b/Taskfile.yml @@ -678,6 +678,29 @@ tasks: --packages ./acceptance/... \ -- -timeout=${LOCAL_TIMEOUT:-30m} -run "TestAccept/cmd/sandbox" + test-fuzz: + desc: Run terraform/direct create-payload parity fuzz tests (provisions terraform) + # No `sources:` fingerprint: the seeds checked are a function of the FUZZ_SEED, + # FUZZ_SEEDS, and FUZZ_SEED_OFFSET env vars, which Task can't see. Skipping on + # an unchanged source checksum would silently no-op a FUZZ_SEED= repro run + # or a shifted nightly window, so always run. + env: + # The terraform parity tests are opt-in (see requireFuzzOptIn): they skip + # unless a FUZZ_* var is set, so a leftover build/ never makes them run as + # part of a plain `task test`. This constant flag opts this target in + # without overriding the FUZZ_SEED(S)/OFFSET tuning knobs. + FUZZ_PARITY: "1" + cmds: + # The parity harness expects terraform + the provider mirror at /build; + # requireTerraform skips when it's absent, so provision it first. + - python3 acceptance/install_terraform.py --targetdir build + - | + {{.GO_TOOL}} gotestsum \ + --format ${GOTESTSUM_FORMAT:-pkgname-and-test-fails} \ + --no-summary=skipped \ + --packages ./bundle/fuzz/... \ + -- -timeout=${LOCAL_TIMEOUT:-30m} + # --- Integration tests --- integration: diff --git a/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt index f27bfaa3f2c..f11dc173eef 100644 --- a/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt +++ b/acceptance/bundle/deploy/wal/chain-3-jobs/output.txt @@ -35,6 +35,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { @@ -73,6 +74,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/deploy/wal/crash-after-create/output.txt b/acceptance/bundle/deploy/wal/crash-after-create/output.txt index 2ab926a1dd9..9cd95a0b5c3 100644 --- a/acceptance/bundle/deploy/wal/crash-after-create/output.txt +++ b/acceptance/bundle/deploy/wal/crash-after-create/output.txt @@ -39,6 +39,7 @@ Exit code: [KILLED] { "new_cluster": { "node_type_id": "[NODE_TYPE_ID]", + "num_workers": 0, "spark_version": "15.4.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/override/job_tasks/output.txt b/acceptance/bundle/override/job_tasks/output.txt index 2bee9738e33..59b6fc1c397 100644 --- a/acceptance/bundle/override/job_tasks/output.txt +++ b/acceptance/bundle/override/job_tasks/output.txt @@ -18,6 +18,7 @@ }, { "new_cluster": { + "num_workers": 0, "spark_version": "13.3.x-scala2.12" }, "spark_python_task": { @@ -42,6 +43,7 @@ Exit code: 1 "tasks": [ { "new_cluster": { + "num_workers": 0, "spark_version": "13.3.x-scala2.12" }, "spark_python_task": { diff --git a/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json b/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json index cfd1427ce4d..7279aaeba31 100644 --- a/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json +++ b/acceptance/bundle/resource_deps/missing_map_key/out.validate.direct.json @@ -30,7 +30,8 @@ "new_cluster": { "custom_tags": { "ResourceClass": "SingleNode" - } + }, + "num_workers": 0 }, "task_key": "test-task" } diff --git a/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json b/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json index 3cdf58f84ea..3bad6f46193 100644 --- a/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json +++ b/acceptance/bundle/resource_deps/missing_map_key/out.validate.terraform.json @@ -30,7 +30,8 @@ "new_cluster": { "custom_tags": { "ResourceClass": "SingleNode" - } + }, + "num_workers": 0 }, "task_key": "test-task" } diff --git a/bundle/config/mutator/resourcemutator/cluster_fixups.go b/bundle/config/mutator/resourcemutator/cluster_fixups.go index 893cd248aa4..04ddef6cc2f 100644 --- a/bundle/config/mutator/resourcemutator/cluster_fixups.go +++ b/bundle/config/mutator/resourcemutator/cluster_fixups.go @@ -94,6 +94,7 @@ func prepareJobSettingsForUpdate(js *jobs.JobSettings) { for _, task := range js.Tasks { if task.NewCluster != nil { ModifyRequestOnInstancePool(task.NewCluster) + initializeNumWorkers(task.NewCluster) } } for ind := range js.JobClusters { diff --git a/bundle/config/mutator/resourcemutator/cluster_fixups_test.go b/bundle/config/mutator/resourcemutator/cluster_fixups_test.go new file mode 100644 index 00000000000..5cb2e937494 --- /dev/null +++ b/bundle/config/mutator/resourcemutator/cluster_fixups_test.go @@ -0,0 +1,92 @@ +package resourcemutator + +import ( + "testing" + + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" + "github.com/stretchr/testify/assert" +) + +func TestInitializeNumWorkers(t *testing.T) { + tests := []struct { + name string + spec compute.ClusterSpec + wantForceSend bool + }{ + { + name: "single-node cluster force-sends num_workers", + spec: compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + wantForceSend: true, + }, + { + name: "autoscale cluster does not force-send", + spec: compute.ClusterSpec{Autoscale: &compute.AutoScale{MinWorkers: 1, MaxWorkers: 4}}, + wantForceSend: false, + }, + { + name: "multi-node cluster does not force-send", + spec: compute.ClusterSpec{NumWorkers: 3}, + wantForceSend: false, + }, + { + name: "already force-sent stays force-sent without duplicating", + spec: compute.ClusterSpec{ForceSendFields: []string{"NumWorkers"}}, + wantForceSend: true, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + spec := tt.spec + initializeNumWorkers(&spec) + + count := 0 + for _, f := range spec.ForceSendFields { + if f == "NumWorkers" { + count++ + } + } + if tt.wantForceSend { + assert.Equal(t, 1, count, "NumWorkers must appear in ForceSendFields exactly once") + } else { + assert.Equal(t, 0, count, "NumWorkers must not be in ForceSendFields") + } + }) + } +} + +// TestPrepareJobSettingsForUpdateForcesNumWorkers locks the DECO-25361 fix: a +// single-node new_cluster must force-send num_workers on task-level clusters too, +// not just shared job_clusters. The terraform provider always sends num_workers:0 +// for such clusters, so missing it on the task side made the direct engine +// produce a divergent create payload. +func TestPrepareJobSettingsForUpdateForcesNumWorkers(t *testing.T) { + js := &jobs.JobSettings{ + Tasks: []jobs.Task{ + { + TaskKey: "single_node_task", + NewCluster: &compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + }, + { + TaskKey: "autoscale_task", + NewCluster: &compute.ClusterSpec{Autoscale: &compute.AutoScale{MinWorkers: 1, MaxWorkers: 4}}, + }, + }, + JobClusters: []jobs.JobCluster{ + { + JobClusterKey: "single_node_cluster", + NewCluster: compute.ClusterSpec{SparkVersion: "15.4.x-scala2.12", NodeTypeId: "i3.xlarge"}, + }, + }, + } + + prepareJobSettingsForUpdate(js) + + assert.Contains(t, js.Tasks[0].NewCluster.ForceSendFields, "NumWorkers", + "single-node task cluster must force-send num_workers") + assert.NotContains(t, js.Tasks[1].NewCluster.ForceSendFields, "NumWorkers", + "autoscale task cluster must not force-send num_workers") + assert.Contains(t, js.JobClusters[0].NewCluster.ForceSendFields, "NumWorkers", + "single-node job cluster must force-send num_workers") +} diff --git a/bundle/fuzz/compare_cases_test.go b/bundle/fuzz/compare_cases_test.go new file mode 100644 index 00000000000..46e506d75c6 --- /dev/null +++ b/bundle/fuzz/compare_cases_test.go @@ -0,0 +1,119 @@ +package fuzz + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestDiffPayloads(t *testing.T) { + tests := []struct { + name string + direct string + terraform string + ignore []string + want []string + }{ + { + name: "identical", + direct: `{"name":"a","tasks":[{"task_key":"t"}]}`, + terraform: `{"name":"a","tasks":[{"task_key":"t"}]}`, + want: nil, + }, + { + name: "scalar mismatch", + direct: `{"name":"a"}`, + terraform: `{"name":"b"}`, + want: []string{"name"}, + }, + { + name: "missing on terraform", + direct: `{"name":"a","queue":{"enabled":true}}`, + terraform: `{"name":"a"}`, + want: []string{"queue"}, + }, + { + name: "missing on direct", + direct: `{"name":"a"}`, + terraform: `{"name":"a","max_concurrent_runs":1}`, + want: []string{"max_concurrent_runs"}, + }, + { + name: "nested slice element mismatch", + direct: `{"tasks":[{"task_key":"t","timeout_seconds":1}]}`, + terraform: `{"tasks":[{"task_key":"t","timeout_seconds":2}]}`, + want: []string{"tasks[0].timeout_seconds"}, + }, + { + name: "slice length mismatch", + direct: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + terraform: `{"tasks":[{"task_key":"a"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "number 1 vs 1.0 differ", + direct: `{"n":1}`, + terraform: `{"n":1.0}`, + want: []string{"n"}, + }, + { + name: "ignored path", + direct: `{"tasks":[{"timeout_seconds":1}]}`, + terraform: `{"tasks":[{"timeout_seconds":2}]}`, + ignore: []string{"tasks[*].timeout_seconds"}, + want: nil, + }, + { + name: "dotted map key is bracket-quoted", + direct: `{"spark_conf":{"spark.x.y":"1"}}`, + terraform: `{"spark_conf":{}}`, + want: []string{`spark_conf["spark.x.y"]`}, + }, + { + name: "dotted map key can be ignored", + direct: `{"c":{"spark_conf":{"spark.x.y":"1"}}}`, + terraform: `{"c":{"spark_conf":{}}}`, + ignore: []string{`c.spark_conf["spark.x.y"]`}, + want: nil, + }, + { + name: "tasks matched by key ignore order", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":2},{"task_key":"a","timeout_seconds":1}]}`, + want: nil, + }, + { + name: "tasks matched by key surface real diff at direct index", + direct: `{"tasks":[{"task_key":"a","timeout_seconds":1},{"task_key":"b","timeout_seconds":2}]}`, + terraform: `{"tasks":[{"task_key":"b","timeout_seconds":9},{"task_key":"a","timeout_seconds":1}]}`, + want: []string{"tasks[1].timeout_seconds"}, + }, + { + name: "task only on terraform reported at its index", + direct: `{"tasks":[{"task_key":"a"}]}`, + terraform: `{"tasks":[{"task_key":"a"},{"task_key":"b"}]}`, + want: []string{"tasks[1]"}, + }, + { + name: "job_clusters matched by key ignore order", + direct: `{"job_clusters":[{"job_cluster_key":"x","new_cluster":{"num_workers":1}},{"job_cluster_key":"y","new_cluster":{"num_workers":2}}]}`, + terraform: `{"job_clusters":[{"job_cluster_key":"y","new_cluster":{"num_workers":2}},{"job_cluster_key":"x","new_cluster":{"num_workers":1}}]}`, + want: nil, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + diffs, err := DiffPayloads(json.RawMessage(tt.direct), json.RawMessage(tt.terraform), tt.ignore) + require.NoError(t, err) + + var paths []string + for _, d := range diffs { + paths = append(paths, d.Path) + } + assert.ElementsMatch(t, tt.want, paths) + }) + } +} diff --git a/bundle/fuzz/compare_test.go b/bundle/fuzz/compare_test.go new file mode 100644 index 00000000000..fd6807b56cc --- /dev/null +++ b/bundle/fuzz/compare_test.go @@ -0,0 +1,273 @@ +package fuzz + +import ( + "bytes" + "encoding/json" + "fmt" + "regexp" + "slices" + "strconv" + "strings" +) + +// Difference is a single mismatch between the two engines' create payloads, +// located by a JSON-ish path (e.g. "tasks[0].new_cluster.num_workers"). +type Difference struct { + Path string + Direct any + Terraform any +} + +func (d Difference) String() string { + return fmt.Sprintf("%s: direct=%s terraform=%s", d.Path, render(d.Direct), render(d.Terraform)) +} + +// missing marks a value that is absent on one side. +type missing struct{} + +func render(v any) string { + if _, ok := v.(missing); ok { + return "" + } + b, err := json.Marshal(v) + if err != nil { + return fmt.Sprintf("%v", v) + } + return string(b) +} + +// DiffPayloads decodes both create payloads and returns every difference whose +// path is not explicitly ignored. ignorePaths are matched exactly against the +// rendered path, with "[*]" standing in for any slice index. +func DiffPayloads(direct, terraform json.RawMessage, ignorePaths []string) ([]Difference, error) { + d, err := decode(direct) + if err != nil { + return nil, fmt.Errorf("decoding direct payload: %w", err) + } + tf, err := decode(terraform) + if err != nil { + return nil, fmt.Errorf("decoding terraform payload: %w", err) + } + + var diffs []Difference + diffValue("", d, tf, &diffs) + + ignore := make(map[string]bool, len(ignorePaths)) + for _, p := range ignorePaths { + ignore[p] = true + } + + filtered := diffs[:0] + for _, diff := range diffs { + if !ignore[normalizePath(diff.Path)] { + filtered = append(filtered, diff) + } + } + return filtered, nil +} + +// decode unmarshals JSON using UseNumber so large int64 values (e.g. job ids, +// spark_context_id) are not corrupted by float64 rounding. See the encoding rule +// in the repo style guide. +func decode(raw json.RawMessage) (any, error) { + if len(raw) == 0 { + return nil, nil + } + dec := json.NewDecoder(bytes.NewReader(raw)) + dec.UseNumber() + var v any + if err := dec.Decode(&v); err != nil { + return nil, err + } + return v, nil +} + +func diffValue(path string, a, b any, diffs *[]Difference) { + switch av := a.(type) { + case map[string]any: + bv, ok := b.(map[string]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + keys := unionKeys(av, bv) + for _, k := range keys { + achild, aok := av[k] + bchild, bok := bv[k] + child := joinKey(path, k) + switch { + case aok && bok: + diffValue(child, achild, bchild, diffs) + case aok: + *diffs = append(*diffs, Difference{Path: child, Direct: achild, Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bchild}) + } + } + case []any: + bv, ok := b.([]any) + if !ok { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + return + } + // Slices whose elements carry a natural identity key (tasks, job clusters) + // are matched by that key so an engine emitting the same elements in a + // different order is not reported as a difference. Everything else is + // compared positionally. + if key := identityKey(av, bv); key != "" { + diffKeyedSlice(path, key, av, bv, diffs) + return + } + n := max(len(av), len(bv)) + for i := range n { + child := fmt.Sprintf("%s[%d]", path, i) + switch { + case i < len(av) && i < len(bv): + diffValue(child, av[i], bv[i], diffs) + case i < len(av): + *diffs = append(*diffs, Difference{Path: child, Direct: av[i], Terraform: missing{}}) + default: + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: bv[i]}) + } + } + default: + if !scalarEqual(a, b) { + *diffs = append(*diffs, Difference{Path: path, Direct: a, Terraform: b}) + } + } +} + +// identityFields are the keys, in priority order, that uniquely identify the +// elements of a payload slice. Job tasks and shared job clusters are the slices +// whose order is not significant but which the engines may emit differently. +var identityFields = []string{"task_key", "job_cluster_key"} + +// identityKey returns the field that identifies every element of both slices, or +// "" if the elements are not uniformly keyed objects (in which case the caller +// falls back to positional comparison). +func identityKey(a, b []any) string { + for _, field := range identityFields { + if allHaveKey(a, field) && allHaveKey(b, field) { + return field + } + } + return "" +} + +func allHaveKey(s []any, field string) bool { + if len(s) == 0 { + return false + } + for _, el := range s { + m, ok := el.(map[string]any) + if !ok { + return false + } + if _, ok := m[field].(string); !ok { + return false + } + } + return true +} + +// diffKeyedSlice matches elements of a and b by the value of key (which is unique +// within each slice for tasks/job clusters) and diffs each matched pair, +// reporting unmatched elements as present-on-one-side. Paths keep numeric indices +// so ignore-path [*] normalization still applies. +func diffKeyedSlice(path, key string, a, b []any, diffs *[]Difference) { + // identityFields are unique within a slice by API contract (no two job tasks + // share a task_key, no two job_clusters share a job_cluster_key), so keying by + // them is unambiguous. If a payload ever repeated a key, last-one-wins here and + // the duplicate would be mismatched rather than reported precisely; callers + // outside the job-create harness must not rely on this for non-unique keys. + bByKey := make(map[string]any, len(b)) + for _, el := range b { + bByKey[el.(map[string]any)[key].(string)] = el + } + + matched := make(map[string]bool, len(a)) + for i, el := range a { + child := fmt.Sprintf("%s[%d]", path, i) + k := el.(map[string]any)[key].(string) + matched[k] = true + if bel, ok := bByKey[k]; ok { + diffValue(child, el, bel, diffs) + } else { + *diffs = append(*diffs, Difference{Path: child, Direct: el, Terraform: missing{}}) + } + } + for j, el := range b { + k := el.(map[string]any)[key].(string) + if matched[k] { + continue + } + child := fmt.Sprintf("%s[%d]", path, j) + *diffs = append(*diffs, Difference{Path: child, Direct: missing{}, Terraform: el}) + } +} + +// scalarEqual compares two JSON scalars. json.Number is compared by its string +// form so 1 and 1.0 don't masquerade as equal across engines. +func scalarEqual(a, b any) bool { + an, aok := a.(json.Number) + bn, bok := b.(json.Number) + if aok && bok { + return an.String() == bn.String() + } + return a == b +} + +func unionKeys(a, b map[string]any) []string { + seen := map[string]bool{} + var keys []string + for k := range a { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + for k := range b { + if !seen[k] { + seen[k] = true + keys = append(keys, k) + } + } + slices.Sort(keys) + return keys +} + +func joinKey(path, key string) string { + // Map keys can themselves contain dots or brackets (e.g. spark_conf entries + // like "spark.databricks.delta.preview.enabled"). Render those as bracketed, + // quoted segments so the path stays unambiguous and ignore entries can target + // a single key. + if key == "" || strings.ContainsAny(key, `.[]"`) { + return path + "[" + strconv.Quote(key) + "]" + } + if path == "" { + return key + } + return path + "." + key +} + +// indexRe matches numeric slice indices like "[12]" but not quoted string keys +// like ["spark.x"]. +var indexRe = regexp.MustCompile(`\[\d+\]`) + +// normalizePath replaces concrete slice indices with [*] so a single ignore +// entry can cover every element of a slice. +func normalizePath(path string) string { + return indexRe.ReplaceAllString(path, "[*]") +} + +// DefaultIgnorePaths lists create-payload paths that legitimately differ between +// the engines and are not parity bugs. Keep this list small and well-justified; +// every entry is a known, intentional divergence. +var DefaultIgnorePaths = []string{ + // The terraform provider strips the deprecated/ignored spark conf + // "spark.databricks.delta.preview.enabled" from new_cluster.spark_conf, while + // the direct engine forwards it verbatim. The backend ignores the key either + // way, so this is a benign provider-side filter rather than a parity bug. + `tasks[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, + `job_clusters[*].new_cluster.spark_conf["spark.databricks.delta.preview.enabled"]`, +} diff --git a/bundle/fuzz/deploy_smoke_test.go b/bundle/fuzz/deploy_smoke_test.go new file mode 100644 index 00000000000..d501ee78089 --- /dev/null +++ b/bundle/fuzz/deploy_smoke_test.go @@ -0,0 +1,35 @@ +package fuzz + +import ( + "encoding/json" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestCaptureJobCreateDirect(t *testing.T) { + job := GenerateJob(newRNG(1)) + + body, err := captureJobCreate(t.Context(), t, job, "direct") + require.NoError(t, err) + require.NotEmpty(t, body) + + var payload map[string]any + require.NoError(t, json.Unmarshal(body, &payload)) + assert.Equal(t, job.Name, payload["name"]) + assert.Contains(t, payload, "tasks") +} + +func TestCaptureJobCreateTerraform(t *testing.T) { + requireTerraform(t) + job := GenerateJob(newRNG(1)) + + body, err := captureJobCreate(t.Context(), t, job, "terraform") + require.NoError(t, err) + require.NotEmpty(t, body) + + var payload map[string]any + require.NoError(t, json.Unmarshal(body, &payload)) + assert.Equal(t, job.Name, payload["name"]) +} diff --git a/bundle/fuzz/deploy_test.go b/bundle/fuzz/deploy_test.go new file mode 100644 index 00000000000..2328e0354e2 --- /dev/null +++ b/bundle/fuzz/deploy_test.go @@ -0,0 +1,158 @@ +package fuzz + +import ( + "context" + "encoding/json" + "fmt" + "os" + "path/filepath" + "testing" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/cli/internal/testcli" + "github.com/databricks/cli/libs/testserver" +) + +const ( + // bundleResourceKey is the map key the generated job is registered under. + bundleResourceKey = "fuzz_job" + fakeToken = "testtoken" +) + +// captureJobCreate deploys a bundle containing job through the given engine +// ("direct" or "terraform") and returns the create request body sent to the +// Jobs API. +// +// Both engines run the full `bundle deploy` pipeline against an in-process +// testserver, so the only difference between two captures with different engines +// is the engine itself. That is what makes the resulting payloads directly +// comparable: shared mutators (deployment metadata, presets, ...) are applied +// identically on both sides and cancel out in the diff. +// +// The terraform engine additionally requires DATABRICKS_TF_EXEC_PATH and +// DATABRICKS_TF_CLI_CONFIG_FILE to point at a provisioned terraform binary and +// provider mirror; see requireTerraform. +func captureJobCreate(ctx context.Context, t *testing.T, job *resources.Job, engine string) (json.RawMessage, error) { + rec := &recorder{} + server := testserver.New(t) + server.RequestCallback = rec.callback + testserver.AddDefaultHandlers(server) + + dir := t.TempDir() + if err := writeJobBundle(dir, server.URL, job); err != nil { + return nil, err + } + + t.Setenv("DATABRICKS_HOST", server.URL) + t.Setenv("DATABRICKS_TOKEN", fakeToken) + t.Setenv("DATABRICKS_BUNDLE_ENGINE", engine) + t.Chdir(dir) + + stdout, stderr, err := testcli.NewRunner(t, ctx, "bundle", "deploy").Run() + if err != nil { + return nil, fmt.Errorf("bundle deploy (engine=%s) failed: %w\nstdout:\n%s\nstderr:\n%s", + engine, err, stdout.String(), stderr.String()) + } + + body, ok := rec.find("POST", jobsCreatePath) + if !ok { + return nil, fmt.Errorf("engine=%s did not POST %s during deploy", engine, jobsCreatePath) + } + return body, nil +} + +// writeJobBundle writes a minimal databricks.yml describing a single job. The +// document is emitted as JSON, which is valid YAML, so we can reuse the job's +// own JSON marshaling (which honors ForceSendFields) without a YAML dependency. +func writeJobBundle(dir, host string, job *resources.Job) error { + jobJSON, err := json.Marshal(job) + if err != nil { + return fmt.Errorf("marshaling job: %w", err) + } + + var jobMap map[string]any + if err := json.Unmarshal(jobJSON, &jobMap); err != nil { + return fmt.Errorf("unmarshaling job: %w", err) + } + + doc := map[string]any{ + "bundle": map[string]any{"name": "fuzz"}, + "workspace": map[string]any{"host": host}, + "resources": map[string]any{ + "jobs": map[string]any{bundleResourceKey: jobMap}, + }, + } + + data, err := json.MarshalIndent(doc, "", " ") + if err != nil { + return fmt.Errorf("marshaling bundle: %w", err) + } + + return os.WriteFile(filepath.Join(dir, "databricks.yml"), data, 0o600) +} + +// fuzzOptInVars are the environment variables that opt a run into the +// terraform-backed parity suite. FUZZ_SEED / FUZZ_SEEDS / FUZZ_SEED_OFFSET double +// as the tuning knobs (see paritySeeds), so setting any of them implies opt-in; +// FUZZ_PARITY is a no-tuning switch used by `task test-fuzz`. +var fuzzOptInVars = []string{"FUZZ_PARITY", "FUZZ_SEED", "FUZZ_SEEDS", "FUZZ_SEED_OFFSET"} + +// requireFuzzOptIn skips unless the run explicitly opted into the terraform +// parity suite. Gating on an env var rather than on the presence of build/ keeps +// a leftover terraform install (from a prior `task test-fuzz` or acceptance run) +// from silently turning a plain `task test` into dozens of real deploys. +func requireFuzzOptIn(t testing.TB) { + for _, name := range fuzzOptInVars { + if os.Getenv(name) != "" { + return + } + } + t.Skip("terraform parity suite is opt-in; run `task test-fuzz` or set FUZZ_SEED= to reproduce a single seed") +} + +// requireTerraform opts in via requireFuzzOptIn, then points the terraform engine +// at the binary and provider mirror provisioned by acceptance/install_terraform.py +// into /build, skipping when they are absent so the suite still skips +// cleanly where terraform is not set up. +func requireTerraform(t testing.TB) { + requireFuzzOptIn(t) + + buildDir := filepath.Join(repoRoot(t), "build") + execPath := filepath.Join(buildDir, "terraform") + cfgFile := filepath.Join(buildDir, ".terraformrc") + + // install_terraform.py provisions all three together; a partial build/ (e.g. + // the binary without the provider mirror or .terraformrc) would otherwise fail + // mid-deploy with a confusing error instead of skipping cleanly. + tfpluginsDir := filepath.Join(buildDir, "tfplugins") + for _, p := range []string{execPath, cfgFile, tfpluginsDir} { + if _, err := os.Stat(p); err != nil { + t.Skipf("terraform not fully provisioned (%s); run: python3 acceptance/install_terraform.py --targetdir build", p) + } + } + + t.Setenv("DATABRICKS_TF_EXEC_PATH", execPath) + t.Setenv("DATABRICKS_TF_CLI_CONFIG_FILE", cfgFile) + t.Setenv("TF_CLI_CONFIG_FILE", cfgFile) + // Terraform phones home to checkpoint-api.hashicorp.com otherwise; disable it + // so the testserver/network isn't hit. See acceptance_test.go. + t.Setenv("CHECKPOINT_DISABLE", "1") +} + +// repoRoot returns the repository root by walking up from the current directory. +func repoRoot(t testing.TB) string { + dir, err := os.Getwd() + if err != nil { + t.Fatalf("getwd: %s", err) + } + for { + if _, err := os.Stat(filepath.Join(dir, "go.mod")); err == nil { + return dir + } + parent := filepath.Dir(dir) + if parent == dir { + t.Fatal("could not locate repo root (go.mod not found)") + } + dir = parent + } +} diff --git a/bundle/fuzz/doc.go b/bundle/fuzz/doc.go new file mode 100644 index 00000000000..cf898d3ec14 --- /dev/null +++ b/bundle/fuzz/doc.go @@ -0,0 +1,17 @@ +// Package fuzz provides randomized generators and harnesses that compare how the +// terraform and direct deploy engines translate the same bundle resource into an +// API create payload. See DECO-25361. +// +// The first technique implemented here generates a random resource config and +// checks for differences in the create payload between the terraform and direct +// engines. Generators are seeded so that any divergence found by the fuzz driver +// can be reproduced from the printed seed. +// +// Only jobs are covered for now. Extending the harness to other resource kinds +// (pipelines, apps, ...) is tracked as follow-up work under DECO-25361. +// +// Everything else in the package lives in _test.go files: the package is a +// test-only utility and nothing in the product imports it, so keeping the logic +// out of the regular build avoids shipping dead code. This file exists only to +// carry the package documentation in a non-test file. +package fuzz diff --git a/bundle/fuzz/fuzz_test.go b/bundle/fuzz/fuzz_test.go new file mode 100644 index 00000000000..88a3c5a3b6d --- /dev/null +++ b/bundle/fuzz/fuzz_test.go @@ -0,0 +1,195 @@ +package fuzz + +import ( + "encoding/json" + "os" + "strconv" + "strings" + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +// defaultParitySeeds is the number of random jobs TestJobCreateParity checks by +// default. Each seed runs two real deploys (direct + terraform), so the count is +// kept modest; override with FUZZ_SEEDS for a deeper local run. +const defaultParitySeeds = 20 + +// regressionSeeds are seeds that previously surfaced a terraform/direct create +// payload divergence. They are always checked (in addition to the rotating +// nightly window) so a fixed divergence can never silently regress, even though +// the nightly window moves on every run and would otherwise never revisit them. +// +// When the nightly job reports a new failing FUZZ_SEED, add it here in the same +// PR that fixes the divergence. +// +// - 29: first seed that generates a single-node task-level new_cluster +// (num_workers 0, no autoscale). The direct engine omitted num_workers on +// task clusters while terraform force-sent num_workers:0, so the create +// payloads diverged. Fixed by applying initializeNumWorkers to task clusters +// in resourcemutator.prepareJobSettingsForUpdate. +var regressionSeeds = []int64{29} + +// TestJobCreateParity is the first DECO-25361 technique: for many random job +// configs, assert the terraform and direct engines produce equivalent create +// payloads. On divergence it prints the seed and the generated job so the failure +// can be reproduced and inspected. +func TestJobCreateParity(t *testing.T) { + requireTerraform(t) + + for _, seed := range paritySeeds(t) { + t.Run("seed="+strconv.FormatInt(seed, 10), func(t *testing.T) { + checkJobParity(t, seed) + }) + } +} + +// paritySeeds returns the seeds TestJobCreateParity should check. +// +// FUZZ_SEED (comma-separated list) runs exactly those seeds and overrides +// everything else. This is the knob the failure message prints so a single +// reported divergence can be reproduced with one command, without re-running +// every seed before it. +// +// Otherwise the test runs the regressionSeeds plus FUZZ_SEEDS seeds (default +// defaultParitySeeds) starting at FUZZ_SEED_OFFSET. The offset lets the nightly +// job shift the window every run (push.yml derives it from the run number) so CI +// explores configs it has never tested before instead of re-checking the same +// fixed set forever; the regressionSeeds are always included on top so known +// past divergences keep being verified. +func paritySeeds(t *testing.T) []int64 { + if v := os.Getenv("FUZZ_SEED"); v != "" { + var seeds []int64 + for _, part := range strings.Split(v, ",") { + part = strings.TrimSpace(part) + if part == "" { + continue + } + n, err := strconv.ParseInt(part, 10, 64) + require.NoErrorf(t, err, "invalid FUZZ_SEED entry %q", part) + seeds = append(seeds, n) + } + require.NotEmptyf(t, seeds, "FUZZ_SEED=%q contained no seeds", v) + return seeds + } + + count := defaultParitySeeds + if v := os.Getenv("FUZZ_SEEDS"); v != "" { + n, err := strconv.Atoi(v) + require.NoErrorf(t, err, "invalid FUZZ_SEEDS=%q", v) + require.Greaterf(t, n, 0, "FUZZ_SEEDS must be positive, got %d", n) + count = n + } + + var offset int64 + if v := os.Getenv("FUZZ_SEED_OFFSET"); v != "" { + n, err := strconv.ParseInt(v, 10, 64) + require.NoErrorf(t, err, "invalid FUZZ_SEED_OFFSET=%q", v) + offset = n + } + + seeds := make([]int64, 0, len(regressionSeeds)+count) + seen := make(map[int64]bool, len(regressionSeeds)+count) + for _, s := range regressionSeeds { + if !seen[s] { + seen[s] = true + seeds = append(seeds, s) + } + } + for i := range int64(count) { + s := offset + i + if !seen[s] { + seen[s] = true + seeds = append(seeds, s) + } + } + return seeds +} + +func TestParitySeeds(t *testing.T) { + t.Run("default includes regression seeds then window", func(t *testing.T) { + t.Setenv("FUZZ_SEEDS", "3") + t.Setenv("FUZZ_SEED_OFFSET", "100") + want := append(append([]int64{}, regressionSeeds...), 100, 101, 102) + assert.Equal(t, want, paritySeeds(t)) + }) + + t.Run("window overlapping a regression seed is deduplicated", func(t *testing.T) { + t.Setenv("FUZZ_SEEDS", "5") + t.Setenv("FUZZ_SEED_OFFSET", "27") + seeds := paritySeeds(t) + count := 0 + for _, s := range seeds { + if s == 29 { + count++ + } + } + assert.Equal(t, 1, count, "seed 29 must appear once even though it is both a regression seed and inside the window") + }) + + t.Run("FUZZ_SEED override ignores regression seeds", func(t *testing.T) { + t.Setenv("FUZZ_SEED", "7, 8") + assert.Equal(t, []int64{7, 8}, paritySeeds(t)) + }) +} + +// FuzzJobCreateParity exposes the same parity check to Go's native fuzzer +// (`go test -fuzz=FuzzJobCreateParity`). Note each input runs two real deploys, +// so this is intended for ad-hoc deep runs, not the default `go test` path. +func FuzzJobCreateParity(f *testing.F) { + requireTerraform(f) + for seed := range int64(5) { + f.Add(seed) + } + // Seed the corpus with known past divergences so the fuzzer always starts + // from inputs that previously exposed a bug. + for _, seed := range regressionSeeds { + f.Add(seed) + } + f.Fuzz(func(t *testing.T, seed int64) { + checkJobParity(t, seed) + }) +} + +// checkJobParity generates the job for seed, deploys it under both engines, and +// fails the test with reproduction details if the create payloads diverge. +// +// A deploy/capture failure is not a create-payload divergence, so the three +// outcomes are handled distinctly to keep nightly triage from misdirecting a +// deploy failure into regressionSeeds (which is only for real payload diffs): +// - neither engine deployed: the generator produced a config nothing accepts, +// so skip (logging both errors) rather than flag a parity bug. +// - exactly one engine deployed: the engines disagree on whether the config is +// even valid. That is a real divergence worth failing on, but an acceptance +// divergence, not a payload diff, so it is reported as such. +// - both deployed: compare the captured create payloads. +func checkJobParity(t *testing.T, seed int64) { + t.Helper() + job := GenerateJob(newRNG(seed)) + + ctx := t.Context() + direct, directErr := captureJobCreate(ctx, t, job, "direct") + terraform, tfErr := captureJobCreate(ctx, t, job, "terraform") + + switch { + case directErr != nil && tfErr != nil: + t.Skipf("seed %d: config did not deploy under either engine (not a parity divergence)\ndirect: %v\nterraform: %v", seed, directErr, tfErr) + case directErr != nil: + t.Fatalf("seed %d: direct rejected a config terraform accepted (engine acceptance divergence, not a payload diff): %v", seed, directErr) + case tfErr != nil: + t.Fatalf("seed %d: terraform rejected a config direct accepted (engine acceptance divergence, not a payload diff): %v", seed, tfErr) + } + + diffs, err := DiffPayloads(direct, terraform, DefaultIgnorePaths) + require.NoErrorf(t, err, "seed %d: comparing create payloads", seed) + + if len(diffs) > 0 { + jobJSON, _ := json.MarshalIndent(job, "", " ") + t.Errorf("seed %d: terraform/direct create payloads diverge (%d differences):", seed, len(diffs)) + for _, d := range diffs { + t.Errorf(" %s", d) + } + t.Logf("reproduce with: FUZZ_SEED=%d task test-fuzz\nonce fixed, add %d to regressionSeeds in bundle/fuzz/fuzz_test.go\n%s", seed, seed, jobJSON) + } +} diff --git a/bundle/fuzz/generate_invariants_test.go b/bundle/fuzz/generate_invariants_test.go new file mode 100644 index 00000000000..f7a797e8f59 --- /dev/null +++ b/bundle/fuzz/generate_invariants_test.go @@ -0,0 +1,47 @@ +package fuzz + +import ( + "testing" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestGenerateJobIsDeterministic(t *testing.T) { + a := GenerateJob(newRNG(42)) + b := GenerateJob(newRNG(42)) + assert.Equal(t, a, b, "same seed must produce identical job") +} + +func TestGenerateJobIsWellFormed(t *testing.T) { + for seed := range int64(200) { + job := GenerateJob(newRNG(seed)) + require.NotEmptyf(t, job.Name, "seed %d: job must have a name", seed) + require.NotEmptyf(t, job.Tasks, "seed %d: job must have at least one task", seed) + + clusterKeys := map[string]bool{} + for _, jc := range job.JobClusters { + clusterKeys[jc.JobClusterKey] = true + } + + taskKeys := map[string]bool{} + for _, task := range job.Tasks { + require.NotEmptyf(t, task.TaskKey, "seed %d: task must have a key", seed) + taskKeys[task.TaskKey] = true + + // A task referencing a job cluster must reference one we generated. + if task.JobClusterKey != "" { + assert.Containsf(t, clusterKeys, task.JobClusterKey, + "seed %d: task %q references unknown job cluster %q", seed, task.TaskKey, task.JobClusterKey) + } + } + + // Every dependency must point at a task that exists in this job. + for _, task := range job.Tasks { + for _, dep := range task.DependsOn { + assert.Containsf(t, taskKeys, dep.TaskKey, + "seed %d: task %q depends on unknown task %q", seed, task.TaskKey, dep.TaskKey) + } + } + } +} diff --git a/bundle/fuzz/generate_test.go b/bundle/fuzz/generate_test.go new file mode 100644 index 00000000000..1b0acf55b0f --- /dev/null +++ b/bundle/fuzz/generate_test.go @@ -0,0 +1,345 @@ +package fuzz + +import ( + "fmt" + "math/rand/v2" + "strconv" + + "github.com/databricks/cli/bundle/config/resources" + "github.com/databricks/databricks-sdk-go/service/compute" + "github.com/databricks/databricks-sdk-go/service/jobs" +) + +// Value pools are intentionally small and valid-looking: the goal is to exercise +// the engines' config->payload translation across many field combinations, not to +// stress the API with invalid values (which the testserver would reject before we +// can compare payloads). +var ( + sparkVersions = []string{"13.3.x-scala2.12", "14.3.x-scala2.12", "15.4.x-scala2.12", "16.4.x-scala2.12"} + nodeTypeIDs = []string{"i3.xlarge", "m5.large", "r5.xlarge", "Standard_DS3_v2"} + timezones = []string{"UTC", "America/Los_Angeles", "Europe/Amsterdam"} + cronExprs = []string{"0 0 12 * * ?", "0 15 10 ? * MON-FRI", "0 0/30 * * * ?"} + pauseStatuses = []jobs.PauseStatus{jobs.PauseStatusPaused, jobs.PauseStatusUnpaused} + performance = []jobs.PerformanceTarget{jobs.PerformanceTargetPerformanceOptimized, jobs.PerformanceTargetStandard} + timeUnits = []string{"HOURS", "DAYS", "WEEKS"} + healthMetrics = []string{"RUN_DURATION_SECONDS", "STREAMING_BACKLOG_BYTES", "STREAMING_BACKLOG_RECORDS"} + conditionOps = []string{"EQUAL_TO", "NOT_EQUAL", "GREATER_THAN", "LESS_THAN_OR_EQUAL"} + runIfs = []string{"ALL_SUCCESS", "AT_LEAST_ONE_SUCCESS", "NONE_FAILED", "ALL_DONE"} + gitProviders = []jobs.GitProvider{jobs.GitProviderGitHub, jobs.GitProviderGitLab, jobs.GitProviderAzureDevOpsServices} +) + +// GenerateJob builds a random, well-formed job config driven entirely by rng, so +// the same seed always produces the same job. It deliberately favors fields whose +// translation tends to differ between engines (tasks, clusters, schedules, +// notifications, tags, zero-able scalars). +// +// TODO(DECO-25361): generalize the harness across resource kinds so pipelines, +// apps, etc. get the same create-payload parity coverage as jobs. +func GenerateJob(rng *rand.Rand) *resources.Job { + job := &resources.Job{} + job.Name = randName(rng, "job") + + if chance(rng, 0.5) { + job.Description = randSentence(rng) + } + if chance(rng, 0.4) { + job.MaxConcurrentRuns = rng.IntN(10) + 1 + } + if chance(rng, 0.4) { + job.TimeoutSeconds = rng.IntN(7200) + } + if chance(rng, 0.3) { + job.PerformanceTarget = oneOf(rng, performance) + } + if chance(rng, 0.5) { + job.Tags = randTags(rng) + } + if chance(rng, 0.3) { + job.GitSource = randGitSource(rng) + } + + randScheduling(rng, job) + + if chance(rng, 0.3) { + job.EmailNotifications = randEmailNotifications(rng) + } + if chance(rng, 0.2) { + job.WebhookNotifications = randWebhookNotifications(rng) + } + if chance(rng, 0.3) { + job.NotificationSettings = &jobs.JobNotificationSettings{ + NoAlertForCanceledRuns: chance(rng, 0.5), + NoAlertForSkippedRuns: chance(rng, 0.5), + } + } + if chance(rng, 0.3) { + job.Health = randHealth(rng) + } + if chance(rng, 0.3) { + job.Parameters = randParameters(rng) + } + if chance(rng, 0.3) { + job.Queue = &jobs.QueueSettings{Enabled: chance(rng, 0.5)} + } + + // Generate shared job clusters first so tasks can reference them by key. + var jobClusterKeys []string + if chance(rng, 0.5) { + n := rng.IntN(2) + 1 + for i := range n { + key := fmt.Sprintf("cluster_%d", i) + jobClusterKeys = append(jobClusterKeys, key) + job.JobClusters = append(job.JobClusters, jobs.JobCluster{ + JobClusterKey: key, + NewCluster: randClusterSpec(rng), + }) + } + } + + nTasks := rng.IntN(3) + 1 + var taskKeys []string + for i := range nTasks { + task := randTask(rng, i, jobClusterKeys) + // Randomly chain dependencies onto previously generated tasks. + if len(taskKeys) > 0 && chance(rng, 0.4) { + dep := taskKeys[rng.IntN(len(taskKeys))] + task.DependsOn = []jobs.TaskDependency{{TaskKey: dep}} + if chance(rng, 0.5) { + task.RunIf = jobs.RunIf(oneOf(rng, runIfs)) + } + } + taskKeys = append(taskKeys, task.TaskKey) + job.Tasks = append(job.Tasks, task) + } + + return job +} + +// randScheduling sets at most one of schedule/trigger/continuous, which are +// mutually exclusive ways to launch a job. +func randScheduling(rng *rand.Rand, job *resources.Job) { + switch rng.IntN(5) { + case 0: + job.Schedule = &jobs.CronSchedule{ + QuartzCronExpression: oneOf(rng, cronExprs), + TimezoneId: oneOf(rng, timezones), + PauseStatus: oneOf(rng, pauseStatuses), + } + case 1: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + Periodic: &jobs.PeriodicTriggerConfiguration{ + Interval: rng.IntN(12) + 1, + Unit: jobs.PeriodicTriggerConfigurationTimeUnit(oneOf(rng, timeUnits)), + }, + } + case 2: + job.Trigger = &jobs.TriggerSettings{ + PauseStatus: oneOf(rng, pauseStatuses), + FileArrival: &jobs.FileArrivalTriggerConfiguration{ + Url: "s3://" + randWord(rng) + "/" + randWord(rng), + }, + } + case 3: + job.Continuous = &jobs.Continuous{PauseStatus: oneOf(rng, pauseStatuses)} + default: + // no scheduling + } +} + +func randTask(rng *rand.Rand, idx int, jobClusterKeys []string) jobs.Task { + task := jobs.Task{TaskKey: fmt.Sprintf("task_%d", idx)} + + // Use absolute workspace paths with source=WORKSPACE so the generated bundle + // never depends on local files existing on disk (which deploy would reject). + // condition_task needs no compute, so it is handled separately below. + needsCompute := true + switch rng.IntN(4) { + case 0: + task.NotebookTask = &jobs.NotebookTask{ + NotebookPath: "/Workspace/Users/test/" + randName(rng, "nb"), + Source: jobs.SourceWorkspace, + } + case 1: + task.SparkPythonTask = &jobs.SparkPythonTask{ + PythonFile: "/Workspace/Users/test/" + randName(rng, "main") + ".py", + Source: jobs.SourceWorkspace, + } + case 2: + task.PythonWheelTask = &jobs.PythonWheelTask{ + PackageName: randName(rng, "pkg"), + EntryPoint: "main", + } + case 3: + task.ConditionTask = &jobs.ConditionTask{ + Left: randWord(rng), + Op: jobs.ConditionTaskOp(oneOf(rng, conditionOps)), + Right: randWord(rng), + } + needsCompute = false + } + + if needsCompute { + assignCompute(rng, &task, jobClusterKeys) + if chance(rng, 0.4) { + task.Libraries = randLibraries(rng) + } + } + + if chance(rng, 0.3) { + task.TimeoutSeconds = rng.IntN(3600) + } + if chance(rng, 0.3) { + task.MaxRetries = rng.IntN(5) + task.MinRetryIntervalMillis = rng.IntN(60000) + task.RetryOnTimeout = chance(rng, 0.5) + } + return task +} + +// assignCompute attaches exactly one compute source, which notebook/python/wheel +// tasks require: a shared job cluster (when available), a brand-new cluster, or an +// existing cluster id. +func assignCompute(rng *rand.Rand, task *jobs.Task, jobClusterKeys []string) { + const ( + computeNew = iota + computeExisting + computeShared + ) + options := []int{computeNew, computeExisting} + if len(jobClusterKeys) > 0 { + options = append(options, computeShared) + } + switch oneOf(rng, options) { + case computeNew: + spec := randClusterSpec(rng) + task.NewCluster = &spec + case computeExisting: + task.ExistingClusterId = randName(rng, "cluster") + case computeShared: + task.JobClusterKey = oneOf(rng, jobClusterKeys) + } +} + +func randClusterSpec(rng *rand.Rand) compute.ClusterSpec { + spec := compute.ClusterSpec{ + SparkVersion: oneOf(rng, sparkVersions), + NodeTypeId: oneOf(rng, nodeTypeIDs), + } + if chance(rng, 0.5) { + spec.NumWorkers = rng.IntN(8) + } else { + spec.Autoscale = &compute.AutoScale{ + MinWorkers: 1, + MaxWorkers: rng.IntN(8) + 2, + } + } + if chance(rng, 0.4) { + spec.SparkConf = map[string]string{ + "spark.databricks.delta.preview.enabled": "true", + "spark.speculation": strconv.FormatBool(chance(rng, 0.5)), + } + } + if chance(rng, 0.3) { + spec.CustomTags = randTags(rng) + } + if chance(rng, 0.3) { + spec.SparkEnvVars = map[string]string{"PYSPARK_PYTHON": "/databricks/python3/bin/python3"} + } + if chance(rng, 0.3) { + spec.DriverNodeTypeId = oneOf(rng, nodeTypeIDs) + } + return spec +} + +func randGitSource(rng *rand.Rand) *jobs.GitSource { + src := &jobs.GitSource{ + GitProvider: oneOf(rng, gitProviders), + GitUrl: "https://example.com/" + randWord(rng) + "/" + randWord(rng) + ".git", + } + switch rng.IntN(3) { + case 0: + src.GitBranch = oneOf(rng, []string{"main", "develop", "release"}) + case 1: + src.GitTag = "v" + fmt.Sprintf("%d.%d.0", rng.IntN(5), rng.IntN(10)) + case 2: + src.GitCommit = fmt.Sprintf("%040x", rng.Int64()) + } + return src +} + +func randEmailNotifications(rng *rand.Rand) *jobs.JobEmailNotifications { + email := randWord(rng) + "@example.com" + n := &jobs.JobEmailNotifications{NoAlertForSkippedRuns: chance(rng, 0.5)} + if chance(rng, 0.6) { + n.OnFailure = []string{email} + } + if chance(rng, 0.4) { + n.OnSuccess = []string{email} + } + if chance(rng, 0.3) { + n.OnStart = []string{email} + } + return n +} + +func randWebhookNotifications(rng *rand.Rand) *jobs.WebhookNotifications { + hook := []jobs.Webhook{{Id: randName(rng, "hook")}} + n := &jobs.WebhookNotifications{} + if chance(rng, 0.6) { + n.OnFailure = hook + } + if chance(rng, 0.4) { + n.OnSuccess = hook + } + return n +} + +func randHealth(rng *rand.Rand) *jobs.JobsHealthRules { + return &jobs.JobsHealthRules{ + Rules: []jobs.JobsHealthRule{ + { + Metric: jobs.JobsHealthMetric(oneOf(rng, healthMetrics)), + Op: jobs.JobsHealthOperatorGreaterThan, + Value: int64(rng.IntN(3600) + 1), + }, + }, + } +} + +func randLibraries(rng *rand.Rand) []compute.Library { + n := rng.IntN(2) + 1 + libs := make([]compute.Library, 0, n) + for range n { + switch rng.IntN(3) { + case 0: + libs = append(libs, compute.Library{Pypi: &compute.PythonPyPiLibrary{Package: randWord(rng)}}) + case 1: + libs = append(libs, compute.Library{Maven: &compute.MavenLibrary{Coordinates: "org.example:" + randWord(rng) + ":1.0.0"}}) + case 2: + libs = append(libs, compute.Library{Whl: "/Workspace/Users/test/" + randName(rng, "lib") + ".whl"}) + } + } + return libs +} + +func randParameters(rng *rand.Rand) []jobs.JobParameterDefinition { + n := rng.IntN(3) + 1 + params := make([]jobs.JobParameterDefinition, 0, n) + for i := range n { + params = append(params, jobs.JobParameterDefinition{ + Name: fmt.Sprintf("param_%d", i), + Default: randWord(rng), + }) + } + return params +} + +func randTags(rng *rand.Rand) map[string]string { + n := rng.IntN(3) + 1 + tags := make(map[string]string, n) + for i := range n { + tags[fmt.Sprintf("tag_%d", i)] = randWord(rng) + } + return tags +} diff --git a/bundle/fuzz/rand_test.go b/bundle/fuzz/rand_test.go new file mode 100644 index 00000000000..529e4da1153 --- /dev/null +++ b/bundle/fuzz/rand_test.go @@ -0,0 +1,47 @@ +package fuzz + +import ( + "fmt" + "math/rand/v2" + "strings" +) + +var words = []string{ + "alpha", "bravo", "charlie", "delta", "echo", "foxtrot", "golf", "hotel", + "india", "juliet", "kilo", "lima", "mike", "november", "oscar", "papa", +} + +// newRNG returns a deterministic RNG for the given seed, so any job the fuzzer +// flags can be regenerated from the printed seed alone. +func newRNG(seed int64) *rand.Rand { + return rand.New(rand.NewPCG(uint64(seed), 0)) +} + +// chance returns true with probability p (0..1). +func chance(rng *rand.Rand, p float64) bool { + return rng.Float64() < p +} + +// oneOf returns a random element of s. s must be non-empty. +func oneOf[T any](rng *rand.Rand, s []T) T { + return s[rng.IntN(len(s))] +} + +func randWord(rng *rand.Rand) string { + return oneOf(rng, words) +} + +// randName returns a deterministic-but-varied identifier with the given prefix, +// e.g. "job_alpha_4271". +func randName(rng *rand.Rand, prefix string) string { + return fmt.Sprintf("%s_%s_%d", prefix, randWord(rng), rng.IntN(10000)) +} + +func randSentence(rng *rand.Rand) string { + n := rng.IntN(4) + 2 + parts := make([]string, 0, n) + for range n { + parts = append(parts, randWord(rng)) + } + return strings.Join(parts, " ") +} diff --git a/bundle/fuzz/recorder_test.go b/bundle/fuzz/recorder_test.go new file mode 100644 index 00000000000..244cb81480f --- /dev/null +++ b/bundle/fuzz/recorder_test.go @@ -0,0 +1,61 @@ +package fuzz + +import ( + "encoding/json" + "sync" + + "github.com/databricks/cli/libs/testserver" +) + +// jobsCreatePath is the Jobs API route both engines must hit on create. The +// direct engine posts here via the SDK and the terraform provider is expected to +// as well. The testserver registers only this exact route, so if an engine ever +// posted to a different version the deploy would 404 and captureJobCreate would +// fail with "did not POST". A version skew therefore surfaces as a capture +// failure, not as a payload diff. +const jobsCreatePath = "/api/2.2/jobs/create" + +// capturedRequest is a single mutating API request observed by the testserver. +type capturedRequest struct { + Method string + Path string + Body json.RawMessage +} + +// recorder collects request bodies sent to a testserver. It is safe for +// concurrent use because the SDK and terraform may issue requests from multiple +// goroutines. +type recorder struct { + mu sync.Mutex + requests []capturedRequest +} + +func (r *recorder) callback(req *testserver.Request) { + r.mu.Lock() + defer r.mu.Unlock() + + var body json.RawMessage + if json.Valid(req.Body) { + // Copy: testserver reuses the underlying buffer across requests. + body = append(json.RawMessage(nil), req.Body...) + } + + r.requests = append(r.requests, capturedRequest{ + Method: req.Method, + Path: req.URL.Path, + Body: body, + }) +} + +// find returns the body of the first recorded request matching method and path. +func (r *recorder) find(method, path string) (json.RawMessage, bool) { + r.mu.Lock() + defer r.mu.Unlock() + + for _, req := range r.requests { + if req.Method == method && req.Path == path { + return req.Body, true + } + } + return nil, false +}