Skip to content

[Cosmos] Add azure-cosmos-avad-test module for AVAD Change Feed Processor soak testing#49050

Draft
jeet1995 wants to merge 27 commits intoAzure:mainfrom
jeet1995:users/abhmohanty/cosmos-avad-test
Draft

[Cosmos] Add azure-cosmos-avad-test module for AVAD Change Feed Processor soak testing#49050
jeet1995 wants to merge 27 commits intoAzure:mainfrom
jeet1995:users/abhmohanty/cosmos-avad-test

Conversation

@jeet1995
Copy link
Copy Markdown
Member

@jeet1995 jeet1995 commented May 4, 2026

What

Adds \�zure-cosmos-avad-test\ — an internal soak test tool under \sdk/cosmos/\ for validating All Versions and Deletes (AVAD) Change Feed Processor correctness under sustained load and chaos conditions.

Components

Component Purpose
Ingestor Generates create/replace/upsert/delete workload at configurable rate
AvadReader CFP in AVAD mode — validates previous-image, CRTS ordering, LSN from metadata
LatestVersionReader CFP in latest-version mode — baseline for parity checks
Reconciler Offline gap detection, LSN/CRTS ordering, AVAD-superset-of-LV assertion
HealthMonitor Online health checks querying Cosmos reconciliation container
Helm chart AKS deployment: StatefulSets for consumers, Deployment for ingestor, CronJob for health
Chaos scenarios Pod kill, restart storm, lease throttle, partition split, network fault, node drain

Key AVAD Validations

  • LSN extracted from \ChangeFeedMetaData.getLogSequenceNumber()\ (correct for delete tombstones)
  • CRTS captured via \ChangeFeedMetaData.getConflictResolutionTimestamp()\ with per-partition ordering validation
  • previousImage presence checked on replace/delete events
  • AVAD ⊇ LV parity assertion (every LV event must appear in AVAD)

Structure

Follows the pattern of \�zure-cosmos-benchmark:

  • Internal tool, not a published library
  • Static analysis disabled (checkstyle, spotbugs, revapi)
  • Java 17 source (tool runs in JDK 21 containers on AKS)
  • Fat JAR via maven-shade-plugin

Verification

\\�ash
cd sdk/cosmos/azure-cosmos-avad-test
mvn clean compile
\\

… testing

Adds a new internal soak test tool under sdk/cosmos/ for validating
All Versions and Deletes (AVAD) Change Feed Processor correctness.

Components:
- Ingestor: generates create/replace/upsert/delete workload
- AvadReader: CFP in AVAD mode with previous-image and CRTS validation
- LatestVersionReader: CFP in latest-version mode for parity checks
- Reconciler: gap detection, LSN/CRTS ordering, AVAD-superset-of-LV checks
- HealthMonitor: online health checks via Cosmos reconciliation container
- Helm chart: AKS deployment with StatefulSets for consumers
- Chaos scenarios: pod kill, restart storm, lease throttle, partition split,
  network fault, node drain

Key AVAD validations:
- LSN extracted from ChangeFeedMetaData (correct for delete tombstones)
- CRTS (conflict resolution timestamp) captured and ordering validated
- previousImage presence checked on replace/delete events
- AVAD superset of LV parity assertion

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
@github-actions github-actions Bot added the Cosmos label May 4, 2026
jeet1995 and others added 26 commits May 4, 2026 19:46
Moves AVAD code from standalone azure-cosmos-avad-test module into
azure-cosmos-benchmark:

- Java sources under com.azure.cosmos.avadtest package
- Operational files (Dockerfile, Helm, chaos, scripts) under avad-soak/
- Converted from picocli to jcommander (no new dependencies)
- Removed standalone module from sdk/cosmos/pom.xml
- Reverted picocli addition to external_dependencies.txt

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Convert Java 9+ features to Java 8 equivalents:
- Switch expressions -> if/else chains
- ProcessHandle.current().pid() -> ManagementFactory.getRuntimeMXBean().getName()
- Path.of() -> Paths.get()
- String.isBlank() -> .trim().isEmpty()
- List.of() -> Collections.singletonList()
- String.repeat() -> StringBuilder loop
- var -> explicit types

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Blocking fixes:
- AvadReader: add shutdown hook (matching LatestVersionReader pattern)
- Ingestor: implement rate limiting with Flux.interval instead of
  unbounded Flux.generate + sample (which did not enforce OPS_PER_SEC)
- Ingestor: fix delete correlation by stamping eventId into document
  before delete so AVAD reader's previous image has correct key
- Reconciler: fix LSN/CRTS ordering checks to sort by delivery order
  (seqNo) then verify monotonicity, instead of sorting by value (no-op)
- Ingestor: fix SLF4J format - {:.1f} is not valid SLF4J syntax

Watch fixes:
- ReconciliationWriter: fix close() to drain before dispose
- ReconciliationWriter: use CosmosException.getStatusCode() for retry
- LatestVersionReader: use metadata.getLogSequenceNumber() not _lsn
- AvadReader: implement CRTS ordering validation per partition
- HealthMonitor: treat -1 (query failures) as unhealthy
- Dockerfile: fix build context path (point to module root)
- setup-acr.sh: fix PROJECT_DIR to reach module root
- application.properties: remove hardcoded endpoint
- infra/README: document secret prerequisites, remove hardcoded names

Scope changes:
- Remove 4 chaos scenarios, keep only pod-kill and partition-split
- Pre-compute payload string instead of per-operation allocation
- Clear deleted IDs from ring buffer to avoid retargeting

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace individual point operations (createItem, replaceItem, upsertItem,
deleteItem) with executeBulkOperations for higher throughput.

Changes:
- Build batch of CosmosItemOperation per tick, submit via bulk API
- Merge replace into upsert (both trigger AVAD previousImage)
- Operation mix: 40% create, 40% upsert, 20% delete
- Tick interval increased to 100ms (larger batches, better bulk efficiency)
- Track OpMeta per operation for correlating bulk responses to event log
- Remove read-before-write for replaces (upsert handles it)
- Delete uses docId as correlation key (no read-before-delete needed)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
SoakMetrics was never wired to any workload component — /metrics
endpoint always returned zeros. Each component already tracks its
own counters via LongAdder and reports via SLF4J.

Removed SoakMetrics.java and the /metrics endpoint from HealthServer.
HealthServer now only serves /health and /ready (K8s probes).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
All components now follow the same close() sequence:
1. Log 'Closing {Component}...'
2. Stop workload-specific resources (CFP processors, running flag)
3. Close eventLog (try-catch)
4. Close reconWriter
5. Close Cosmos client
6. Log summary and 'closed' confirmation

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Each component (Ingestor, AvadReader, LatestVersionReader) was creating
2 CosmosAsyncClient instances — one for the workload, one inside
ReconciliationWriter. This wastes connections and memory.

Now ReconciliationWriter accepts a shared client instead of creating
its own. The component owns the client lifecycle and closes it in
close(). ReconciliationWriter only drains its sink and disposes
its subscription.

Also: store and dispose Reactor subscriptions in Ingestor to prevent
leaks, and explicitly shut down HealthServer's ExecutorService.

Before: 6 CosmosAsyncClients across 3 components
After:  3 CosmosAsyncClients (1 per component)

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Replace application.properties with a structured JSON config file.
TestConfig now supports two load paths:

  --config config.json    → JSON file with env var overrides
  (no --config)           → env vars only (backward compatible)

Precedence: env var > JSON value > built-in default.
Secrets (COSMOS_KEY) should always come from env vars.

JSON schema groups settings by concern:
  cosmos.*     — endpoint, database, containers, region
  ingestor.*   — opsPerSec, docSize, partitions, duration, workers
  logging.*    — produced/consumed log file paths

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Local mode (run-local.sh / run-local.ps1):
- Launches ingestor + avad-reader + lv-reader as local JVM processes
- Each on its own health port (8080/8081/8082)
- Builds module automatically on first run
- Filters classpath to use logback over log4j
- Monitors process health, logs to output dir
- Ctrl+C stops all three cleanly

AKS mode (run-soak.sh):
- Cleaned up to reference only pod-kill + partition-split chaos
- Removed references to deleted chaos scripts
- Simplified health check (pod readiness, no /metrics scraping)

Removed run-cutover.sh (SSH-to-VM orchestration — superseded).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
AllVersionsAndDeletes mode does not support startTime option.
Also removed from LV reader (not needed for fresh soak runs).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CFP lease management requires contentResponseOnWriteEnabled(true).
AvadReader and Ingestor were missing it — only LatestVersionReader
had it. Now all three clients set it consistently.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CFP's handleChanges callback must complete before the lease is
checkpointed. The old async Sink/Flux pipeline let the lease
advance before writes were persisted — causing drops on backpressure
and data loss on crash.

Now record() blocks until the upsert succeeds or retries exhaust.
This ensures no lease checkpoint races. Retries use exponential
backoff (500ms, 1s, 2s, 4s) with the same CosmosException-based
retryability check.

Removed: Sinks buffer, Disposable subscription, requeue logic,
dropCount (writes either succeed or error — no silent drops).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The old approach matched bulk responses to metadata by scanning a
List for matching docId — this failed when multiple ops targeted the
same docId or when op.getId() didn't match expectations.

Now uses IdentityHashMap<CosmosItemOperation, OpMeta> keyed by object
reference. executeBulkOperations returns the same CosmosItemOperation
in the response, so reference equality gives O(1) lookup with no
ambiguity.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Flux.interval + concatMap causes OverflowException when bulk batch
takes longer than the tick interval — concatMap requests 1 at a time
but interval keeps emitting ticks that can't be buffered.

Now uses a simple while loop: submit batch (blocking via toStream),
sleep for remaining tick time. This guarantees rate limiting without
reactive backpressure issues.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
CRTS ordering validation belongs in the Reconciler (offline), not
in the reader. The reader should just record what it sees — the
Reconciler already has checkOrderingByCrts() for this.

Removes ConcurrentHashMap<String, AtomicLong> that would grow
unboundedly over multi-day runs (one entry per partition key).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
record() now throws RuntimeException on permanent write failure
instead of silently returning. This causes CFP's handleChanges to
fail, which prevents the lease continuation token from advancing.
CFP will retry the batch on the next poll.

Without this, a recon write failure would silently drop the event
and advance the lease — making the gap undetectable.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
HealthMonitor no longer writes snapshots to soak-health. It still
queries the reconciliation container for gap/parity/previousImage
checks and logs results, but doesn't persist them.

The offline Reconciler is the authoritative validation path.
soak-health was redundant and produced misleading UNHEALTHY entries
due to flaky N+1 gap detection queries.

Removed: healthContainer field, writeHealthSnapshot method,
soak-health from setup-cosmos.sh, unused Jackson imports.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Remove EventLog (CSV file logging) entirely. Single reconciliation
path through Cosmos 'reconciliation' container.

Reconciler rewritten to query Cosmos by source field:
- 5 source types: ingestor, cfp-lv, cfp-avad, spark-lv, spark-avad
- Per-reader gap detection, LSN/CRTS ordering, previousImage checks
- Auto-selects checks by source type (AVAD gets CRTS + previousImage)
- Skips sources that don't exist yet (e.g., spark-* before Spark runs)

CLI modes:
  --mode reconcile --source ingestor --against cfp-avad
  --mode reconcile --full  (runs all 8 check pairs)

Removed: EventLog.java, producedLogFile/consumedLogFile config,
--produced/--consumed/--lv/--avad CLI args, logging section from
config.json.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Databricks-compatible PySpark notebooks that read the change feed
using azure-cosmos-spark connector and write to the reconciliation
container with source='spark-lv' and source='spark-avad'.

spark_lv_reader.py:
  - Incremental (Latest Version) mode
  - Structured streaming with 10s trigger
  - Writes correlationId, seqNo, opType, partitionKey, lsn

spark_avad_reader.py:
  - Full Fidelity (AVAD) mode
  - Extracts operationType, lsn, crts from metadata
  - Checks previousImage on replace/delete
  - Handles delete tombstones (extracts fields from previous image)
  - Includes ad-hoc correctness check cell

Both write to the reconciliation container using the same doc schema
as the Java CFP readers, enabling cross-engine parity checks via
the Reconciler's --full suite.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Java reconciler chokes on cross-partition SELECT DISTINCT over
millions of docs. Spark handles this natively with bulk reads +
DataFrame operations.

spark_reconciler.py runs all 8 reconciliation checks:
  Q1: Summary dashboard (count/unique/min/max per source)
  Q2: Gap detection (ingestor → each of 4 consumers)
  Q3: Parity (cfp-lv ⊆ cfp-avad, spark-lv ⊆ spark-avad)
  Q4: Cross-engine (cfp ↔ spark, both modes)
  Q5: LSN ordering per partition (window function + lag)
  Q6: CRTS ordering per partition (AVAD only)
  Q7: previousImage validation (AVAD only)
  Q8: Duplicate detection (at-least-once rate)

Skips sources with 0 events (e.g., spark-* before notebooks run).
Returns PASS/FAIL via dbutils.notebook.exit for job orchestration.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The fat jar's manifest mainClass is com.azure.cosmos.benchmark.Main
(the benchmark's main), not our avadtest.Main. Use -cp instead of
-jar so we can specify the correct main class.

Simplified to single-stage Dockerfile — build fat jar locally first
(mvn package -Dpackage-with-dependencies), then COPY into runtime
image. Avoids dependency resolution failures in ACR Tasks.

Updated setup-acr.sh to build locally before pushing.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
ReconciliationWriter now uses bulk upsert (executeBulkOperations)
instead of per-event blocking upserts. Events are buffered via
record(), then flushed as a single bulk batch at the end of each
handleChanges callback. Still synchronous — flush blocks until
all writes complete, so lease doesn't advance until persisted.

CFP tuning:
- feedPollDelay: 1s → 100ms (tighter polling loop)
- maxItemCount: default(100) → 1000 (larger pages per poll)
- preferredRegion should match AKS region (East US)

AKS result: ~14x improvement (1,405 → 19,398 ops in 2 min).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
- Config: use try/except for dbutils.widgets.get() instead of
  getAll().name (which returns strings, not objects)
- Checkpoint: use /Workspace/ path instead of /tmp/ (DBFS disabled)
- LV reader: remove _lsn column reference (not exposed by connector)
- AVAD reader: remove _rawBody/previous/metadata references — Spark
  connector flattens AVAD events to the same schema as LV. Use
  available columns directly.

Verified: spark-lv wrote 60,801 docs to reconciliation container.
spark-avad running but needs live ingestor data (startFrom=Now).

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
The Spark connector uses 'AllVersionsAndDeletes' (not 'FullFidelity')
as the changeFeed.mode value. startFrom must be 'Now' for AVAD mode.

Note: spark-avad requires the container to have changeFeedPolicy
enabled (Full Fidelity retention). Without it, the AVAD change feed
stream produces no events. Enable via Azure Portal:
Container → Settings → Change Feed → Full Fidelity.

Co-authored-by: Copilot <223556219+Copilot@users.noreply.github.com>
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant