diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/Dockerfile b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/Dockerfile new file mode 100644 index 000000000000..80c929ae9a7a --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/Dockerfile @@ -0,0 +1,21 @@ +# Single-stage build for cosmos-avad-test soak runner +# Build context: azure-cosmos-benchmark/ (module root) +# Requires: mvn package run locally first (produces fat jar) + +FROM eclipse-temurin:21-jre-jammy +WORKDIR /app + +# Install curl for health probes +RUN apt-get update && apt-get install -y --no-install-recommends \ + curl \ + && rm -rf /var/lib/apt/lists/* + +COPY target/azure-cosmos-benchmark-*-jar-with-dependencies.jar /app/app.jar + +# Health endpoint port +EXPOSE 8080 + +# JVM tuning for container environments +ENV JAVA_OPTS="-XX:+UseContainerSupport -XX:MaxRAMPercentage=75.0 -XX:+ExitOnOutOfMemoryError" + +ENTRYPOINT ["sh", "-c", "java $JAVA_OPTS -cp /app/app.jar com.azure.cosmos.avadtest.Main $0 $@"] diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/README.md b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/README.md new file mode 100644 index 000000000000..0423dfc997c7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/README.md @@ -0,0 +1,46 @@ +# Cosmos DB Soak Test — Chaos Library + +Reusable chaos injection scenarios for AKS-hosted Cosmos DB +consumers. Works with any workload deployed via the soak +infra Helm chart. + +## Scenarios + +| Scenario | Script | What It Tests | +|----------|--------|---------------| +| Pod Kill | `pod-kill.sh` | Lease rebalancing after random pod loss | +| Partition Split | `partition-split.sh` | Continuation token validity across splits | + +## Usage + +### Manual — run one scenario + +```bash +export NAMESPACE=cosmos-soak +export COSMOS_ACCOUNT= +export COSMOS_RG= + +# Kill a random AVAD CFP pod +bash chaos/scenarios/pod-kill.sh + +# Trigger partition split (2x throughput) +SCALE_FACTOR=2 bash chaos/scenarios/partition-split.sh +``` + +### Automated — via soak orchestrator + +The `run-soak.sh` orchestrator reads `chaos-schedule.yaml` +and fires scenarios on a phase-based schedule: + +``` +Warm-up → Steady → Chaos → Recovery → repeat +``` + +See `chaos-schedule.yaml` for interval/parameter config. + +## Adding a New Scenario + +1. Create `chaos/scenarios/my-scenario.sh` +2. Use env vars for all parameters (no hardcoded values) +3. Add an entry to `chaos-schedule.yaml` +4. The soak orchestrator will pick it up automatically diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/chaos-schedule.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/chaos-schedule.yaml new file mode 100644 index 000000000000..3f3b7380d94d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/chaos-schedule.yaml @@ -0,0 +1,22 @@ +# Chaos schedule configuration +# The soak orchestrator reads this file to determine when to +# fire each chaos scenario. +# +# interval_hours: time between invocations of this scenario +# recovery_minutes: time to wait after chaos before checking health +# enabled: set to false to skip this scenario + +schedule: + - scenario: pod-kill + interval_hours: 2 + recovery_minutes: 5 + enabled: true + params: + component: avad-cfp + + - scenario: partition-split + interval_hours: 12 + recovery_minutes: 30 + enabled: true + params: + scale_factor: 2 diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/partition-split.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/partition-split.sh new file mode 100644 index 000000000000..b9589ddbee80 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/partition-split.sh @@ -0,0 +1,57 @@ +#!/bin/bash +# Partition Split — scale feed container throughput to trigger split +set -euo pipefail + +COSMOS_ACCOUNT="${COSMOS_ACCOUNT:?Set COSMOS_ACCOUNT}" +COSMOS_RG="${COSMOS_RG:?Set COSMOS_RG}" +COSMOS_DB="${COSMOS_DB:-graph_db}" +FEED_CONTAINER="${FEED_CONTAINER:-avad-test}" +SCALE_FACTOR="${SCALE_FACTOR:-2}" + +echo "[$(date '+%H:%M:%S')] Chaos: partition-split" + +# Get current throughput +CURRENT_RU=$(az cosmosdb sql container throughput show \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "$FEED_CONTAINER" \ + --query "resource.throughput" -o tsv) + +TARGET_RU=$((CURRENT_RU * SCALE_FACTOR)) + +echo " Current feed RU: $CURRENT_RU" +echo " Scaling to: $TARGET_RU RU (${SCALE_FACTOR}x) to trigger split" + +# Get pre-split partition count +PRE_SPLIT_PARTITIONS=$(az cosmosdb sql container show \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "$FEED_CONTAINER" \ + --query "resource.statistics[0].partitionCount" -o tsv 2>/dev/null || echo "unknown") + +echo " Pre-split partition count: $PRE_SPLIT_PARTITIONS" + +# Scale up +az cosmosdb sql container throughput update \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "$FEED_CONTAINER" \ + --throughput "$TARGET_RU" \ + --output none + +echo " Feed container scaled to $TARGET_RU RU" +echo " Partition split may take several minutes to complete" + +# Poll for split completion (check partition count changes) +WAIT_TIME=0 +MAX_WAIT=1800 # 30 minutes +while [ $WAIT_TIME -lt $MAX_WAIT ]; do + sleep 60 + WAIT_TIME=$((WAIT_TIME + 60)) + echo " Waiting for split... (${WAIT_TIME}s elapsed)" +done + +echo " Partition split chaos event complete (waited ${WAIT_TIME}s)" diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/pod-kill.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/pod-kill.sh new file mode 100644 index 000000000000..3ceda4849f3d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/chaos/scenarios/pod-kill.sh @@ -0,0 +1,22 @@ +#!/bin/bash +# Pod Kill — kill a random CFP pod to test lease rebalancing +set -euo pipefail + +NAMESPACE="${NAMESPACE:-cosmos-soak}" +COMPONENT="${COMPONENT:-avad-cfp}" +LABEL="app.kubernetes.io/component=${COMPONENT}" + +echo "[$(date '+%H:%M:%S')] Chaos: pod-kill targeting $COMPONENT" + +POD=$(kubectl get pods -n "$NAMESPACE" -l "$LABEL" \ + --field-selector=status.phase=Running \ + -o jsonpath='{.items[*].metadata.name}' | tr ' ' '\n' | shuf -n 1) + +if [ -z "$POD" ]; then + echo " No running pods found for $LABEL" + exit 0 +fi + +echo " Killing pod: $POD" +kubectl delete pod "$POD" -n "$NAMESPACE" --grace-period=0 --force +echo " Pod $POD killed" diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/config.json b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/config.json new file mode 100644 index 000000000000..cb04c2793f4d --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/config.json @@ -0,0 +1,17 @@ +{ + "cosmos": { + "endpoint": "https://abhm-cfp-region-test.documents.azure.com:443/", + "regionalEndpoint": "", + "database": "graph_db", + "feedContainer": "avad-test", + "leaseContainer": "avad-test-leases", + "preferredRegion": "West Central US" + }, + "ingestor": { + "opsPerSec": 500, + "docSizeBytes": 512, + "logicalPartitionCount": 1000, + "durationSeconds": 1800, + "workerCount": 2 + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/README.md b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/README.md new file mode 100644 index 000000000000..359ef781176e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/README.md @@ -0,0 +1,76 @@ +# Cosmos DB Soak Test — Infrastructure + +Reusable Helm chart and setup scripts for running Cosmos DB +change feed processor soak tests on AKS. + +## Prerequisites + +Before deploying, create the required Kubernetes secrets: + +```bash +# Cosmos DB key secret (referenced by Helm chart) +kubectl create secret generic -secrets \ + --namespace cosmos-soak \ + --from-literal=cosmos-key="" + +# ACR pull secret (if not using AKS-managed ACR attachment) +kubectl create secret docker-registry acr-secret \ + --namespace cosmos-soak \ + --docker-server=.azurecr.io \ + --docker-username= \ + --docker-password= +``` + +If using AKS with `--attach-acr`, the `acr-secret` is not needed +and can be removed from the chart templates. + +## Quick Start + +```bash +# 1. Create AKS cluster +./scripts/setup-aks.sh + +# 2. Create Cosmos containers +./scripts/setup-cosmos.sh + +# 3. Build + push image to ACR +./scripts/setup-acr.sh + +# 4. Create secrets (see Prerequisites above) + +# 5. Deploy (from repo root) +cd ../.. +./run-soak.sh +``` + +## What This Provides + +| Component | Description | +|-----------|-------------| +| `chart/` | Helm chart with templated Deployments, StatefulSets, ConfigMaps, probes | +| `scripts/setup-aks.sh` | AKS cluster provisioning | +| `scripts/setup-cosmos.sh` | Cosmos containers (feed, lease, reconciliation, health) | +| `scripts/setup-acr.sh` | ACR creation + image build/push | + +## Reusing for Your Own Workload + +1. Build a container image with your workload logic +2. Implement HTTP endpoints: `/health` (liveness), `/ready` + (readiness), `/metrics` (optional) +3. Create a `values-myworkload.yaml` overriding: + - `image.repository` / `image.tag` + - `cosmos.*` (endpoint, containers, etc.) + - `avadConsumer.replicas` / `lvConsumer.replicas` +4. Deploy: `helm upgrade --install my-soak ./infra/chart -f values-myworkload.yaml` + +## Azure Resources + +Override default resource names via environment variables +in each script: + +```bash +export SUBSCRIPTION="" +export RG="" +export AKS_CLUSTER="" +export ACR_NAME="" +``` diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/Chart.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/Chart.yaml new file mode 100644 index 000000000000..35b187435015 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/Chart.yaml @@ -0,0 +1,6 @@ +apiVersion: v2 +name: cosmos-soak +description: Reusable Helm chart for Cosmos DB soak testing on AKS +version: 0.1.0 +appVersion: "1.0" +type: application diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/_helpers.tpl b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/_helpers.tpl new file mode 100644 index 000000000000..aedf64eb14f3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/_helpers.tpl @@ -0,0 +1,68 @@ +{{- define "cosmos-soak.labels" -}} +app.kubernetes.io/name: {{ .Chart.Name }} +app.kubernetes.io/instance: {{ .Release.Name }} +app.kubernetes.io/managed-by: {{ .Release.Service }} +{{- end }} + +{{- define "cosmos-soak.selectorLabels" -}} +app.kubernetes.io/name: {{ .Chart.Name }} +app.kubernetes.io/instance: {{ .Release.Name }} +{{- end }} + +{{- define "cosmos-soak.cosmosEnv" -}} +- name: COSMOS_ENDPOINT + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: endpoint +- name: COSMOS_KEY + valueFrom: + secretKeyRef: + name: {{ .Release.Name }}-secrets + key: cosmos-key +- name: COSMOS_DATABASE + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: database +- name: COSMOS_FEED_CONTAINER + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: feedContainer +- name: COSMOS_LEASE_CONTAINER + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: leaseContainer +- name: COSMOS_PREFERRED_REGION + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: preferredRegion +- name: OPS_PER_SEC + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: opsPerSec +- name: DOC_SIZE_BYTES + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: docSizeBytes +- name: LOGICAL_PARTITION_COUNT + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: logicalPartitionCount +- name: DURATION_SECONDS + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: durationSeconds +- name: WORKER_COUNT + valueFrom: + configMapKeyRef: + name: {{ .Release.Name }}-config + key: workerCount +{{- end }} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/configmap.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/configmap.yaml new file mode 100644 index 000000000000..e40e405e01c7 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/configmap.yaml @@ -0,0 +1,18 @@ +apiVersion: v1 +kind: ConfigMap +metadata: + name: {{ .Release.Name }}-config + namespace: {{ .Values.namespace }} + labels: + {{- include "cosmos-soak.labels" . | nindent 4 }} +data: + endpoint: {{ .Values.cosmos.endpoint | quote }} + database: {{ .Values.cosmos.database | quote }} + feedContainer: {{ .Values.cosmos.feedContainer | quote }} + leaseContainer: {{ .Values.cosmos.leaseContainer | quote }} + preferredRegion: {{ .Values.cosmos.preferredRegion | quote }} + opsPerSec: {{ .Values.cosmos.opsPerSec | quote }} + docSizeBytes: {{ .Values.cosmos.docSizeBytes | quote }} + logicalPartitionCount: {{ .Values.cosmos.logicalPartitionCount | quote }} + durationSeconds: {{ .Values.cosmos.durationSeconds | quote }} + workerCount: {{ .Values.cosmos.workerCount | quote }} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/consumer-statefulset.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/consumer-statefulset.yaml new file mode 100644 index 000000000000..cd0b7752e623 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/consumer-statefulset.yaml @@ -0,0 +1,70 @@ +{{- range $consumerName, $consumer := dict "avad" .Values.avadConsumer "lv" .Values.lvConsumer }} +{{- if $consumer.enabled }} +--- +apiVersion: apps/v1 +kind: StatefulSet +metadata: + name: {{ $.Release.Name }}-{{ $consumerName }}-cfp + namespace: {{ $.Values.namespace }} + labels: + {{- include "cosmos-soak.labels" $ | nindent 4 }} + app.kubernetes.io/component: {{ $consumerName }}-cfp +spec: + serviceName: {{ $.Release.Name }}-{{ $consumerName }}-cfp + replicas: {{ $consumer.replicas }} + selector: + matchLabels: + {{- include "cosmos-soak.selectorLabels" $ | nindent 6 }} + app.kubernetes.io/component: {{ $consumerName }}-cfp + template: + metadata: + labels: + {{- include "cosmos-soak.selectorLabels" $ | nindent 8 }} + app.kubernetes.io/component: {{ $consumerName }}-cfp + spec: + imagePullSecrets: + - name: acr-secret + containers: + - name: {{ $consumerName }}-cfp + image: "{{ $.Values.image.repository }}:{{ $.Values.image.tag }}" + imagePullPolicy: {{ $.Values.image.pullPolicy }} + args: ["--mode", "{{ $consumer.mode }}"] + ports: + - containerPort: 8080 + name: health + env: + {{- include "cosmos-soak.cosmosEnv" $ | nindent 12 }} + resources: + {{- toYaml $consumer.resources | nindent 12 }} + livenessProbe: + httpGet: + path: {{ $.Values.probes.liveness.path }} + port: {{ $.Values.probes.liveness.port }} + initialDelaySeconds: {{ $.Values.probes.liveness.initialDelaySeconds }} + periodSeconds: {{ $.Values.probes.liveness.periodSeconds }} + readinessProbe: + httpGet: + path: {{ $.Values.probes.readiness.path }} + port: {{ $.Values.probes.readiness.port }} + initialDelaySeconds: {{ $.Values.probes.readiness.initialDelaySeconds }} + periodSeconds: {{ $.Values.probes.readiness.periodSeconds }} +--- +# Headless Service for StatefulSet DNS +apiVersion: v1 +kind: Service +metadata: + name: {{ $.Release.Name }}-{{ $consumerName }}-cfp + namespace: {{ $.Values.namespace }} + labels: + {{- include "cosmos-soak.labels" $ | nindent 4 }} + app.kubernetes.io/component: {{ $consumerName }}-cfp +spec: + clusterIP: None + selector: + {{- include "cosmos-soak.selectorLabels" $ | nindent 4 }} + app.kubernetes.io/component: {{ $consumerName }}-cfp + ports: + - port: 8080 + name: health +{{- end }} +{{- end }} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/health-monitor-cronjob.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/health-monitor-cronjob.yaml new file mode 100644 index 000000000000..ffab3a311c74 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/health-monitor-cronjob.yaml @@ -0,0 +1,47 @@ +{{- if .Values.healthMonitor.enabled }} +apiVersion: batch/v1 +kind: CronJob +metadata: + name: {{ .Release.Name }}-health-monitor + namespace: {{ .Values.namespace }} + labels: + {{- include "cosmos-soak.labels" . | nindent 4 }} + app.kubernetes.io/component: health-monitor +spec: + schedule: {{ .Values.healthMonitor.schedule | quote }} + concurrencyPolicy: Forbid + successfulJobsHistoryLimit: 5 + failedJobsHistoryLimit: 3 + jobTemplate: + spec: + backoffLimit: 1 + template: + metadata: + labels: + {{- include "cosmos-soak.selectorLabels" . | nindent 12 }} + app.kubernetes.io/component: health-monitor + spec: + restartPolicy: Never + imagePullSecrets: + - name: acr-secret + containers: + - name: health-monitor + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: + - "--mode" + - "health-monitor" + - "--run-id" + - "{{ .Release.Name }}" + - "--gap-sla-minutes" + - "{{ .Values.healthMonitor.gapSlaMinutes }}" + env: + {{- include "cosmos-soak.cosmosEnv" . | nindent 16 }} + resources: + requests: + cpu: "250m" + memory: "256Mi" + limits: + cpu: "500m" + memory: "512Mi" +{{- end }} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/ingestor-deployment.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/ingestor-deployment.yaml new file mode 100644 index 000000000000..f6aef0f2b4c1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/templates/ingestor-deployment.yaml @@ -0,0 +1,48 @@ +{{- if .Values.ingestor.enabled }} +apiVersion: apps/v1 +kind: Deployment +metadata: + name: {{ .Release.Name }}-ingestor + namespace: {{ .Values.namespace }} + labels: + {{- include "cosmos-soak.labels" . | nindent 4 }} + app.kubernetes.io/component: ingestor +spec: + replicas: {{ .Values.ingestor.replicas }} + selector: + matchLabels: + {{- include "cosmos-soak.selectorLabels" . | nindent 6 }} + app.kubernetes.io/component: ingestor + template: + metadata: + labels: + {{- include "cosmos-soak.selectorLabels" . | nindent 8 }} + app.kubernetes.io/component: ingestor + spec: + imagePullSecrets: + - name: acr-secret + containers: + - name: ingestor + image: "{{ .Values.image.repository }}:{{ .Values.image.tag }}" + imagePullPolicy: {{ .Values.image.pullPolicy }} + args: ["--mode", "{{ .Values.ingestor.mode }}"] + ports: + - containerPort: 8080 + name: health + env: + {{- include "cosmos-soak.cosmosEnv" . | nindent 12 }} + resources: + {{- toYaml .Values.ingestor.resources | nindent 12 }} + livenessProbe: + httpGet: + path: {{ .Values.probes.liveness.path }} + port: {{ .Values.probes.liveness.port }} + initialDelaySeconds: {{ .Values.probes.liveness.initialDelaySeconds }} + periodSeconds: {{ .Values.probes.liveness.periodSeconds }} + readinessProbe: + httpGet: + path: {{ .Values.probes.readiness.path }} + port: {{ .Values.probes.readiness.port }} + initialDelaySeconds: {{ .Values.probes.readiness.initialDelaySeconds }} + periodSeconds: {{ .Values.probes.readiness.periodSeconds }} +{{- end }} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/values.yaml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/values.yaml new file mode 100644 index 000000000000..6b201871c18e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/chart/values.yaml @@ -0,0 +1,90 @@ +# Default values for cosmos-soak Helm chart. +# Override with values-avad.yaml or your own values file. + +namespace: cosmos-soak + +image: + repository: abhmavadsoakacr.azurecr.io/cosmos-avad-test + tag: latest + pullPolicy: Always + +# Ingestor deployment +ingestor: + enabled: true + replicas: 3 + mode: ingestor + resources: + requests: + cpu: "500m" + memory: "512Mi" + limits: + cpu: "1000m" + memory: "1Gi" + +# AVAD CFP consumer (StatefulSet for stable hostnames) +avadConsumer: + enabled: true + replicas: 20 + mode: avad-reader + leasePrefix: "avad-" + resources: + requests: + cpu: "500m" + memory: "512Mi" + limits: + cpu: "1000m" + memory: "1Gi" + +# LV CFP consumer (StatefulSet, parity baseline) +lvConsumer: + enabled: true + replicas: 20 + mode: lv-reader + leasePrefix: "lv-" + resources: + requests: + cpu: "500m" + memory: "512Mi" + limits: + cpu: "1000m" + memory: "1Gi" + +# Health monitor CronJob +healthMonitor: + enabled: true + schedule: "*/5 * * * *" + gapSlaMinutes: 10 + +# Cosmos DB configuration +cosmos: + endpoint: "" + database: "graph_db" + feedContainer: "avad-test" + leaseContainer: "avad-test-leases" + preferredRegion: "West Central US" + opsPerSec: 5000 + docSizeBytes: 1024 + logicalPartitionCount: 100000 + durationSeconds: 0 # 0 = run forever + workerCount: 2 + +# Key Vault CSI +keyVault: + enabled: true + vaultName: "abhm-avad-soak-kv" + tenantId: "" + clientId: "" + secretName: "cosmos-key" + +# Probes +probes: + liveness: + path: /health + port: 8080 + initialDelaySeconds: 30 + periodSeconds: 10 + readiness: + path: /ready + port: 8080 + initialDelaySeconds: 15 + periodSeconds: 5 diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-acr.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-acr.sh new file mode 100644 index 000000000000..7987c1b193bc --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-acr.sh @@ -0,0 +1,65 @@ +#!/bin/bash +# ============================================================================= +# ACR Setup + Image Build/Push +# ============================================================================= +# Builds the benchmark module locally (fat jar), then pushes a minimal +# runtime image to ACR. No multi-stage Docker build needed — avoids +# dependency resolution issues with internal SDK modules. +# +# Usage: +# ./setup-acr.sh +# IMAGE_TAG=v2 ./setup-acr.sh +# ============================================================================= + +set -euo pipefail + +SUBSCRIPTION="${SUBSCRIPTION:-b31b6408-0fb5-4688-9a3c-33ffb3983297}" +RG="${RG:-abhm-rg}" +ACR_NAME="${ACR_NAME:-abhmavadsoakacr}" +IMAGE_NAME="${IMAGE_NAME:-cosmos-avad-test}" +IMAGE_TAG="${IMAGE_TAG:-latest}" +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +PROJECT_DIR="$SCRIPT_DIR/../../.." + +az account set --subscription "$SUBSCRIPTION" + +echo "=== ACR Setup ===" + +# Create ACR (if not exists) +az acr create \ + --resource-group "$RG" \ + --name "$ACR_NAME" \ + --sku Basic \ + --output none 2>/dev/null || echo "ACR already exists" + +# Attach ACR to AKS (if AKS exists) +AKS_CLUSTER="${AKS_CLUSTER:-abhm-avad-soak-aks}" +az aks update \ + --resource-group "$RG" \ + --name "$AKS_CLUSTER" \ + --attach-acr "$ACR_NAME" \ + --output none 2>/dev/null || echo "AKS-ACR attachment skipped" + +echo "=== Building module locally ===" + +cd "$PROJECT_DIR" +mvn package -DskipTests -DskipCheckstyle -Dspotbugs.skip=true -Drevapi.skip=true -B -q + +# Verify the fat jar exists +FAT_JAR=$(ls target/azure-cosmos-benchmark-*-jar-with-dependencies.jar 2>/dev/null | head -1) +if [ -z "$FAT_JAR" ]; then + echo "ERROR: Fat jar not found. Check maven-shade/assembly plugin config." + exit 1 +fi +echo "Fat jar: $FAT_JAR" + +echo "=== Pushing image to ACR ===" + +# Build and push using ACR Tasks — context is the module root (has target/ + Dockerfile path) +az acr build \ + --registry "$ACR_NAME" \ + --image "${IMAGE_NAME}:${IMAGE_TAG}" \ + --file "$PROJECT_DIR/avad-soak/Dockerfile" \ + "$PROJECT_DIR" + +echo "Image pushed: ${ACR_NAME}.azurecr.io/${IMAGE_NAME}:${IMAGE_TAG}" diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-aks.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-aks.sh new file mode 100644 index 000000000000..c0ebec583eec --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-aks.sh @@ -0,0 +1,49 @@ +#!/bin/bash +# ============================================================================= +# AKS Cluster Setup for Cosmos DB Soak Testing +# ============================================================================= +# Creates an AKS cluster in the abhm-rg resource group. +# Reusable: change variables for your own resource group/subscription. +# ============================================================================= + +set -euo pipefail + +SUBSCRIPTION="${SUBSCRIPTION:-b31b6408-0fb5-4688-9a3c-33ffb3983297}" +RG="${RG:-abhm-rg}" +LOCATION="${LOCATION:-eastus}" +CLUSTER_NAME="${CLUSTER_NAME:-abhm-avad-soak-aks}" +NODE_COUNT="${NODE_COUNT:-3}" +NODE_VM_SIZE="${NODE_VM_SIZE:-Standard_D4s_v5}" +K8S_VERSION="${K8S_VERSION:-1.29}" + +echo "=== Setting up AKS cluster ===" +echo " Subscription: $SUBSCRIPTION" +echo " Resource Group: $RG" +echo " Cluster: $CLUSTER_NAME" +echo " Nodes: $NODE_COUNT x $NODE_VM_SIZE" + +az account set --subscription "$SUBSCRIPTION" + +# Create cluster +az aks create \ + --resource-group "$RG" \ + --name "$CLUSTER_NAME" \ + --location "$LOCATION" \ + --node-count "$NODE_COUNT" \ + --node-vm-size "$NODE_VM_SIZE" \ + --kubernetes-version "$K8S_VERSION" \ + --enable-managed-identity \ + --enable-addons monitoring \ + --generate-ssh-keys \ + --output none + +echo "AKS cluster created" + +# Get credentials +az aks get-credentials \ + --resource-group "$RG" \ + --name "$CLUSTER_NAME" \ + --overwrite-existing + +echo "kubectl context set to $CLUSTER_NAME" +kubectl get nodes diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-cosmos.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-cosmos.sh new file mode 100644 index 000000000000..b0a9d02e98a6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/infra/scripts/setup-cosmos.sh @@ -0,0 +1,54 @@ +#!/bin/bash +# ============================================================================= +# Cosmos DB Container Setup for Soak Testing +# ============================================================================= +# Creates required containers in an existing Cosmos account. +# Idempotent — safe to run multiple times. +# ============================================================================= + +set -euo pipefail + +COSMOS_ACCOUNT="${COSMOS_ACCOUNT:-abhm-cfp-region-test}" +COSMOS_RG="${COSMOS_RG:-abhm-rg}" +COSMOS_DB="${COSMOS_DB:-graph_db}" +SUBSCRIPTION="${SUBSCRIPTION:-b31b6408-0fb5-4688-9a3c-33ffb3983297}" + +az account set --subscription "$SUBSCRIPTION" + +echo "=== Setting up Cosmos DB containers ===" + +# Feed container (AVAD-enabled, /tenantId PK) +echo "Creating feed container: avad-test" +az cosmosdb sql container create \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "avad-test" \ + --partition-key-path "/tenantId" \ + --throughput 10000 \ + --output none 2>/dev/null || echo " already exists" + +# Lease container (/id PK) +echo "Creating lease container: avad-test-leases" +az cosmosdb sql container create \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "avad-test-leases" \ + --partition-key-path "/id" \ + --throughput 1000 \ + --output none 2>/dev/null || echo " already exists" + +# Reconciliation container (/correlationId PK, TTL 24h) +echo "Creating reconciliation container" +az cosmosdb sql container create \ + --account-name "$COSMOS_ACCOUNT" \ + --resource-group "$COSMOS_RG" \ + --database-name "$COSMOS_DB" \ + --name "reconciliation" \ + --partition-key-path "/correlationId" \ + --throughput 5000 \ + --ttl 86400 \ + --output none 2>/dev/null || echo " already exists" + +echo "=== All containers ready ===" diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/logback.xml b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/logback.xml new file mode 100644 index 000000000000..ee24b16d9ac3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/logback.xml @@ -0,0 +1,32 @@ + + + + %d{HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + cosmos-avad-test.log + + cosmos-avad-test.%d{yyyy-MM-dd}.%i.log + 100MB + 7 + + + %d{yyyy-MM-dd HH:mm:ss.SSS} [%thread] %-5level %logger{36} - %msg%n + + + + + + + + + + + + + + + + diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.ps1 b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.ps1 new file mode 100644 index 000000000000..f6f0e4e27241 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.ps1 @@ -0,0 +1,139 @@ +<# +.SYNOPSIS + AVAD Soak Test — Local Mode (Windows) + +.DESCRIPTION + Runs ingestor + avad-reader + lv-reader as local JVM processes. + No AKS/Helm required. For dev-box validation before deploying to AKS. + +.EXAMPLE + .\run-local.ps1 -ConfigFile config.json + .\run-local.ps1 -ConfigFile config.json -DurationSeconds 1800 -OpsPerSec 200 + $env:COSMOS_KEY = "xxx"; .\run-local.ps1 -ConfigFile config.json +#> + +param( + [Parameter(Mandatory)] [string]$ConfigFile, + [int]$DurationSeconds = 0, + [int]$OpsPerSec = 0 +) + +$ErrorActionPreference = "Stop" +$ScriptDir = Split-Path -Parent $MyInvocation.MyCommand.Path +$ModuleDir = Split-Path -Parent $ScriptDir + +# Apply overrides +if ($DurationSeconds -gt 0) { $env:DURATION_SECONDS = $DurationSeconds } +if ($OpsPerSec -gt 0) { $env:OPS_PER_SEC = $OpsPerSec } + +# ── Build if needed ─────────────────────────────────────────────────────── +$Jar = "$ModuleDir\target\azure-cosmos-benchmark-4.0.1-beta.1.jar" +$CpFile = "$ModuleDir\target\cp.txt" + +if (-not (Test-Path $Jar)) { + Write-Host "=== Building module ===" + Push-Location $ModuleDir + mvn package "-DskipTests" "-DskipCheckstyle" "-Dspotbugs.skip=true" "-Drevapi.skip=true" -B -q + Pop-Location +} + +if (-not (Test-Path $CpFile)) { + Push-Location $ModuleDir + mvn dependency:build-classpath "-Dmdep.outputFile=target\cp.txt" -B -q + Pop-Location +} + +# Build classpath: logback dir first, exclude log4j-slf4j-impl +$CpRaw = Get-Content $CpFile +$CpFiltered = ($CpRaw -split ';' | Where-Object { $_ -notmatch 'log4j-slf4j-impl' }) -join ';' +$Classpath = "$ScriptDir;$Jar;$CpFiltered" + +$JavaCmd = "java" +$MainClass = "com.azure.cosmos.avadtest.Main" + +# ── Output directory ────────────────────────────────────────────────────── +$RunId = Get-Date -Format "yyyyMMdd-HHmmss" +$OutputDir = "$ScriptDir\local-run-$RunId" +New-Item -ItemType Directory -Path $OutputDir -Force | Out-Null + +function Log($msg) { + $ts = Get-Date -Format "HH:mm:ss" + $line = "[$ts] $msg" + Write-Host $line + Add-Content "$OutputDir\run.log" $line +} + +# ── Launch processes ────────────────────────────────────────────────────── + +Log "=== AVAD Local Soak Test ===" +Log "Config: $ConfigFile" +Log "Output: $OutputDir" + +$ConfigPath = Resolve-Path $ConfigFile + +Log "Starting ingestor (port 8080)..." +$ingestor = Start-Process -FilePath $JavaCmd -ArgumentList @( + "-cp", $Classpath, $MainClass, + "--mode", "ingestor", "--config", $ConfigPath, "--health-port", "8080" +) -RedirectStandardOutput "$OutputDir\ingestor.log" ` + -RedirectStandardError "$OutputDir\ingestor-err.log" ` + -PassThru -NoNewWindow +Log " Ingestor PID: $($ingestor.Id)" + +Start-Sleep -Seconds 10 + +Log "Starting avad-reader (port 8081)..." +$env:CONSUMED_LOG = "$OutputDir\consumed-avad.log" +$avadReader = Start-Process -FilePath $JavaCmd -ArgumentList @( + "-cp", $Classpath, $MainClass, + "--mode", "avad-reader", "--config", $ConfigPath, "--health-port", "8081" +) -RedirectStandardOutput "$OutputDir\avad-reader.log" ` + -RedirectStandardError "$OutputDir\avad-reader-err.log" ` + -PassThru -NoNewWindow +Log " AVAD reader PID: $($avadReader.Id)" + +Log "Starting lv-reader (port 8082)..." +$env:CONSUMED_LOG = "$OutputDir\consumed-lv.log" +$lvReader = Start-Process -FilePath $JavaCmd -ArgumentList @( + "-cp", $Classpath, $MainClass, + "--mode", "lv-reader", "--config", $ConfigPath, "--health-port", "8082" +) -RedirectStandardOutput "$OutputDir\lv-reader.log" ` + -RedirectStandardError "$OutputDir\lv-reader-err.log" ` + -PassThru -NoNewWindow +Log " LV reader PID: $($lvReader.Id)" + +Log "All 3 processes running" +Log " Ingestor log: $OutputDir\ingestor.log" +Log " AVAD log: $OutputDir\avad-reader.log" +Log " LV log: $OutputDir\lv-reader.log" + +# ── Monitor loop ────────────────────────────────────────────────────────── + +try { + Log "Monitoring (Ctrl+C to stop)..." + while ($true) { + Start-Sleep -Seconds 30 + + $dead = @() + if ($ingestor.HasExited) { $dead += "ingestor (exit $($ingestor.ExitCode))" } + if ($avadReader.HasExited) { $dead += "avad-reader (exit $($avadReader.ExitCode))" } + if ($lvReader.HasExited) { $dead += "lv-reader (exit $($lvReader.ExitCode))" } + + if ($dead.Count -gt 0) { + Log "❌ Process(es) exited: $($dead -join ', ')" + Log "Check logs in $OutputDir" + break + } + + # Print last ingestor progress + $lastLine = Get-Content "$OutputDir\ingestor.log" -Tail 1 -ErrorAction SilentlyContinue + if ($lastLine -match 'Progress:') { Log " $lastLine" } + } +} finally { + Log "=== Stopping all processes ===" + if (-not $ingestor.HasExited) { Stop-Process -Id $ingestor.Id -Force -ErrorAction SilentlyContinue } + if (-not $avadReader.HasExited) { Stop-Process -Id $avadReader.Id -Force -ErrorAction SilentlyContinue } + if (-not $lvReader.HasExited) { Stop-Process -Id $lvReader.Id -Force -ErrorAction SilentlyContinue } + Log "All processes stopped" + Log "Logs: $OutputDir" +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.sh new file mode 100644 index 000000000000..f9b89e018ade --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-local.sh @@ -0,0 +1,159 @@ +#!/bin/bash +# ============================================================================= +# AVAD Soak Test — Local Mode +# ============================================================================= +# Runs ingestor + avad-reader + lv-reader as local JVM processes. +# No AKS/Helm required. For dev-box validation before deploying to AKS. +# +# Usage: +# ./run-local.sh --config config.json +# ./run-local.sh --config config.json --duration 1800 +# COSMOS_KEY=xxx ./run-local.sh --config config.json +# +# Prerequisites: +# - JDK 17+ +# - Maven (for first build) +# - COSMOS_KEY env var set (or in config.json) +# - Cosmos DB containers created (see infra/scripts/setup-cosmos.sh) +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +MODULE_DIR="$SCRIPT_DIR/.." +CONFIG_FILE="" +DURATION_OVERRIDE="" +OPS_OVERRIDE="" + +# ── Parse args ──────────────────────────────────────────────────────────── +while [[ $# -gt 0 ]]; do + case "$1" in + --config) CONFIG_FILE="$2"; shift 2 ;; + --duration) DURATION_OVERRIDE="$2"; shift 2 ;; + --ops) OPS_OVERRIDE="$2"; shift 2 ;; + -h|--help) + echo "Usage: $0 --config [--duration ] [--ops ]" + exit 0 ;; + *) echo "Unknown arg: $1"; exit 1 ;; + esac +done + +if [ -z "$CONFIG_FILE" ]; then + echo "ERROR: --config is required" + exit 1 +fi + +# ── Apply overrides via env vars ────────────────────────────────────────── +[ -n "$DURATION_OVERRIDE" ] && export DURATION_SECONDS="$DURATION_OVERRIDE" +[ -n "$OPS_OVERRIDE" ] && export OPS_PER_SEC="$OPS_OVERRIDE" + +# ── Build if needed ─────────────────────────────────────────────────────── +JAR="$MODULE_DIR/target/azure-cosmos-benchmark-4.0.1-beta.1.jar" +CP_FILE="$MODULE_DIR/target/cp.txt" + +if [ ! -f "$JAR" ]; then + echo "=== Building module (first run) ===" + cd "$MODULE_DIR" + mvn package -DskipTests -DskipCheckstyle -Dspotbugs.skip=true -Drevapi.skip=true -B -q + mvn dependency:build-classpath -Dmdep.outputFile=target/cp.txt -B -q +fi + +if [ ! -f "$CP_FILE" ]; then + cd "$MODULE_DIR" + mvn dependency:build-classpath -Dmdep.outputFile=target/cp.txt -B -q +fi + +# Build classpath with logback config, excluding log4j-slf4j-impl +CP_RAW=$(cat "$CP_FILE") +CP_FILTERED=$(echo "$CP_RAW" | tr ';' '\n' | tr ':' '\n' | grep -v 'log4j-slf4j-impl' | tr '\n' ':') +CLASSPATH="$SCRIPT_DIR:$JAR:$CP_FILTERED" + +JAVA_CMD="java -cp $CLASSPATH" +MAIN_CLASS="com.azure.cosmos.avadtest.Main" + +# ── Output directory ────────────────────────────────────────────────────── +OUTPUT_DIR="$SCRIPT_DIR/local-run-$(date +%Y%m%d-%H%M%S)" +mkdir -p "$OUTPUT_DIR" + +log() { echo "[$(date '+%H:%M:%S')] $*" | tee -a "$OUTPUT_DIR/run.log"; } + +# ── PIDs for cleanup ───────────────────────────────────────────────────── +INGESTOR_PID="" +AVAD_PID="" +LV_PID="" + +cleanup() { + log "=== Stopping all processes ===" + [ -n "$INGESTOR_PID" ] && kill "$INGESTOR_PID" 2>/dev/null && wait "$INGESTOR_PID" 2>/dev/null || true + [ -n "$AVAD_PID" ] && kill "$AVAD_PID" 2>/dev/null && wait "$AVAD_PID" 2>/dev/null || true + [ -n "$LV_PID" ] && kill "$LV_PID" 2>/dev/null && wait "$LV_PID" 2>/dev/null || true + log "All processes stopped" + log "Logs: $OUTPUT_DIR" +} +trap cleanup EXIT INT TERM + +# ── Launch processes ────────────────────────────────────────────────────── + +log "=== AVAD Local Soak Test ===" +log "Config: $CONFIG_FILE" +log "Output: $OUTPUT_DIR" + +# 1. Ingestor +log "Starting ingestor (port 8080)..." +$JAVA_CMD $MAIN_CLASS --mode ingestor --config "$CONFIG_FILE" --health-port 8080 \ + > "$OUTPUT_DIR/ingestor.log" 2>&1 & +INGESTOR_PID=$! +log " Ingestor PID: $INGESTOR_PID" + +# Wait for ingestor to start producing +sleep 10 + +# 2. AVAD reader +log "Starting avad-reader (port 8081)..." +CONSUMED_LOG="$OUTPUT_DIR/consumed-avad.log" \ +$JAVA_CMD $MAIN_CLASS --mode avad-reader --config "$CONFIG_FILE" --health-port 8081 \ + > "$OUTPUT_DIR/avad-reader.log" 2>&1 & +AVAD_PID=$! +log " AVAD reader PID: $AVAD_PID" + +# 3. LV reader +log "Starting lv-reader (port 8082)..." +CONSUMED_LOG="$OUTPUT_DIR/consumed-lv.log" \ +$JAVA_CMD $MAIN_CLASS --mode lv-reader --config "$CONFIG_FILE" --health-port 8082 \ + > "$OUTPUT_DIR/lv-reader.log" 2>&1 & +LV_PID=$! +log " LV reader PID: $LV_PID" + +log "All 3 processes running" +log " tail -f $OUTPUT_DIR/ingestor.log" +log " tail -f $OUTPUT_DIR/avad-reader.log" +log " tail -f $OUTPUT_DIR/lv-reader.log" + +# ── Monitor loop ────────────────────────────────────────────────────────── + +check_alive() { + local name="$1" pid="$2" + if ! kill -0 "$pid" 2>/dev/null; then + log "❌ $name (PID $pid) has exited!" + return 1 + fi + return 0 +} + +log "Monitoring processes (Ctrl+C to stop)..." +while true; do + sleep 30 + + alive=true + check_alive "ingestor" "$INGESTOR_PID" || alive=false + check_alive "avad-reader" "$AVAD_PID" || alive=false + check_alive "lv-reader" "$LV_PID" || alive=false + + if [ "$alive" = false ]; then + log "One or more processes died — check logs in $OUTPUT_DIR" + exit 1 + fi + + # Print last progress line from ingestor + tail -1 "$OUTPUT_DIR/ingestor.log" 2>/dev/null | grep -o 'Progress:.*' || true +done diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-soak.sh b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-soak.sh new file mode 100644 index 000000000000..f270beea9a42 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/run-soak.sh @@ -0,0 +1,185 @@ +#!/bin/bash +# ============================================================================= +# AVAD Soak Test — AKS Mode +# ============================================================================= +# Deploys ingestor + avad-reader + lv-reader to AKS via Helm, monitors health, +# and runs chaos scenarios on a schedule. +# +# Usage: +# ./run-soak.sh +# SOAK_DURATION_HOURS=12 CHAOS_ENABLED=false ./run-soak.sh +# VALUES_OVERRIDE=values-prod.yaml ./run-soak.sh +# +# Prerequisites: +# - AKS cluster configured (kubectl context set) +# - Helm 3 installed +# - ACR image pushed (see infra/scripts/setup-acr.sh) +# - Cosmos containers created (see infra/scripts/setup-cosmos.sh) +# - K8s secrets created (see infra/README.md) +# ============================================================================= + +set -euo pipefail + +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" + +# ── Configuration ───────────────────────────────────────────────────────── +NAMESPACE="${NAMESPACE:-cosmos-soak}" +RELEASE="${RELEASE:-cosmos-soak}" +VALUES_FILE="${VALUES_FILE:-$SCRIPT_DIR/infra/chart/values.yaml}" +VALUES_OVERRIDE="${VALUES_OVERRIDE:-}" + +SOAK_DURATION_HOURS="${SOAK_DURATION_HOURS:-24}" +WARMUP_SEC="${WARMUP_SEC:-1800}" +STEADY_SEC="${STEADY_SEC:-3600}" +RECOVERY_SEC="${RECOVERY_SEC:-1800}" +HEALTH_CHECK_INTERVAL="${HEALTH_CHECK_INTERVAL:-300}" + +CHAOS_ENABLED="${CHAOS_ENABLED:-true}" +ABORT_ON_GAP="${ABORT_ON_GAP:-false}" + +export COSMOS_ACCOUNT="${COSMOS_ACCOUNT:-abhm-cfp-region-test}" +export COSMOS_RG="${COSMOS_RG:-abhm-rg}" + +OUTPUT_DIR="$SCRIPT_DIR/soak-results-$(date +%Y%m%d-%H%M%S)" +mkdir -p "$OUTPUT_DIR" + +SOAK_DURATION_SEC=$((SOAK_DURATION_HOURS * 3600)) +START_TIME=$(date +%s) +CHAOS_PID="" + +# ── Helpers ─────────────────────────────────────────────────────────────── + +log() { echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$OUTPUT_DIR/soak.log"; } +elapsed() { echo $(( $(date +%s) - START_TIME )); } +is_expired() { [ "$SOAK_DURATION_SEC" -gt 0 ] && [ "$(elapsed)" -ge "$SOAK_DURATION_SEC" ]; } + +# ── Cleanup ─────────────────────────────────────────────────────────────── + +cleanup() { + log "=== Cleanup triggered ===" + [ -n "$CHAOS_PID" ] && kill "$CHAOS_PID" 2>/dev/null || true + + log "Collecting pod logs..." + for pod in $(kubectl get pods -n "$NAMESPACE" -o name 2>/dev/null); do + local name=$(basename "$pod") + kubectl logs "$pod" -n "$NAMESPACE" --tail=5000 \ + > "$OUTPUT_DIR/${name}.log" 2>/dev/null || true + done + + kubectl get pods -n "$NAMESPACE" -o wide \ + > "$OUTPUT_DIR/pods-final.txt" 2>/dev/null || true + + log "Results: $OUTPUT_DIR" + log "=== Soak test ended (elapsed: $(elapsed)s) ===" +} +trap cleanup EXIT ERR INT TERM + +# ── Health check ────────────────────────────────────────────────────────── + +check_health() { + log "Running health check..." + local healthy=true + + local not_ready=$(kubectl get pods -n "$NAMESPACE" \ + --field-selector=status.phase!=Running \ + -o name 2>/dev/null | wc -l) + + if [ "$not_ready" -gt 0 ]; then + log " ⚠️ $not_ready pods not in Running state" + healthy=false + fi + + # Check pod readiness + local total_pods=$(kubectl get pods -n "$NAMESPACE" --no-headers 2>/dev/null | wc -l) + local ready_pods=$(kubectl get pods -n "$NAMESPACE" --no-headers 2>/dev/null \ + | awk '$2 ~ /1\/1/' | wc -l) + log " Pods: $ready_pods/$total_pods ready" + + if [ "$healthy" = true ]; then + log " ✅ Health check passed" + else + log " ❌ Health check FAILED" + if [ "$ABORT_ON_GAP" = "true" ]; then + log "ABORT_ON_GAP set — stopping" + exit 1 + fi + fi +} + +# ── Chaos runner ────────────────────────────────────────────────────────── + +run_chaos_loop() { + log "Chaos loop starting" + local iteration=0 + + while ! is_expired; do + iteration=$((iteration + 1)) + log "=== Chaos iteration $iteration ===" + + # Pod kill (every iteration) + log "Firing: pod-kill" + bash "$SCRIPT_DIR/chaos/scenarios/pod-kill.sh" 2>&1 | tee -a "$OUTPUT_DIR/chaos.log" + sleep "$RECOVERY_SEC" + check_health + + # Partition split (every 3rd iteration) + if [ $((iteration % 3)) -eq 0 ]; then + log "Firing: partition-split" + bash "$SCRIPT_DIR/chaos/scenarios/partition-split.sh" 2>&1 | tee -a "$OUTPUT_DIR/chaos.log" + sleep "$RECOVERY_SEC" + check_health + fi + + log "Steady state for ${STEADY_SEC}s..." + sleep "$STEADY_SEC" + done +} + +# ── Main ────────────────────────────────────────────────────────────────── + +log "=== AVAD AKS Soak Test ===" +log "Duration: ${SOAK_DURATION_HOURS}h | Chaos: $CHAOS_ENABLED" +log "Output: $OUTPUT_DIR" + +# 1. Namespace +kubectl create namespace "$NAMESPACE" --dry-run=client -o yaml | kubectl apply -f - + +# 2. Helm deploy +log "Deploying via Helm..." +HELM_ARGS=( + upgrade --install "$RELEASE" + "$SCRIPT_DIR/infra/chart" + --namespace "$NAMESPACE" + --values "$VALUES_FILE" +) +[ -n "$VALUES_OVERRIDE" ] && HELM_ARGS+=(--values "$VALUES_OVERRIDE") +helm "${HELM_ARGS[@]}" +log "Helm deploy complete" + +# 3. Wait for pods +log "Warm-up (${WARMUP_SEC}s)..." +sleep 30 +kubectl wait --for=condition=ready pods \ + --all -n "$NAMESPACE" \ + --timeout="${WARMUP_SEC}s" || { + log "⚠️ Not all pods ready after warm-up" +} +check_health + +# 4. Chaos (background) +if [ "$CHAOS_ENABLED" = "true" ]; then + run_chaos_loop & + CHAOS_PID=$! + log "Chaos loop started (PID: $CHAOS_PID)" +fi + +# 5. Monitor +log "Monitoring (Ctrl+C to stop)..." +while ! is_expired; do + sleep "$HEALTH_CHECK_INTERVAL" + check_health +done + +log "=== Soak duration reached (${SOAK_DURATION_HOURS}h) ===" +check_health +log "=== Soak Test Complete ===" diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_avad_reader.py b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_avad_reader.py new file mode 100644 index 000000000000..fbf890e4a000 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_avad_reader.py @@ -0,0 +1,197 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Spark AVAD (Full Fidelity) Change Feed Reader +# MAGIC +# MAGIC Reads the change feed in **All Versions and Deletes (Full Fidelity)** mode +# MAGIC using `azure-cosmos-spark` connector and writes consumed events to the +# MAGIC `reconciliation` container with `source = "spark-avad"`. +# MAGIC +# MAGIC ## AVAD-Specific Validations +# MAGIC - Extracts `operationType` from change feed metadata +# MAGIC - Checks `previousImage` presence on replace/delete events +# MAGIC - Captures CRTS (conflict resolution timestamp) from metadata +# MAGIC +# MAGIC ## Prerequisites +# MAGIC - Databricks cluster with `azure-cosmos-spark_3-4_2-12` (or compatible) installed +# MAGIC - Cosmos DB account with AVAD-enabled `avad-test` container and `reconciliation` container +# MAGIC - Cluster env vars: `COSMOS_ENDPOINT`, `COSMOS_KEY` + +# COMMAND ---------- + +# Configuration — reads from notebook widgets (set via job parameters or manually) +import os + +try: + cosmos_endpoint = dbutils.widgets.get("cosmos_endpoint") +except: + cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT", "") + +try: + cosmos_key = dbutils.widgets.get("cosmos_key") +except: + cosmos_key = os.environ.get("COSMOS_KEY", "") + +try: + database = dbutils.widgets.get("database") +except: + database = "graph_db" + +feed_container = "avad-test" +recon_container = "reconciliation" + +assert cosmos_endpoint, "Set cosmos_endpoint widget or COSMOS_ENDPOINT env var" +assert cosmos_key, "Set cosmos_key widget or COSMOS_KEY env var" + +print(f"Endpoint: {cosmos_endpoint}") +print(f"Database: {database}") +print(f"Feed container: {feed_container}") + +# COMMAND ---------- + +# Spark Cosmos config — read change feed in Full Fidelity (AVAD) mode +feed_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": feed_container, + "spark.cosmos.read.partitioning.strategy": "Default", + "spark.cosmos.changeFeed.mode": "AllVersionsAndDeletes", + "spark.cosmos.changeFeed.startFrom": "Now", + "spark.cosmos.changeFeed.itemCountPerTriggerHint": "1000", +} + +# Write config — reconciliation container +recon_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": recon_container, + "spark.cosmos.write.strategy": "ItemOverwrite", + "spark.cosmos.write.bulk.enabled": "true", +} + +# COMMAND ---------- + +from pyspark.sql.functions import col, lit, concat, current_timestamp, coalesce +from pyspark.sql.types import StringType, LongType, BooleanType + +SOURCE = "spark-avad" + +# Read change feed as streaming DataFrame — Full Fidelity mode +raw_df = ( + spark.readStream + .format("cosmos.oltp.changeFeed") + .options(**feed_cfg) + .load() +) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Schema Notes +# MAGIC +# MAGIC In Full Fidelity mode, the Spark connector exposes the same columns +# MAGIC as Incremental mode: `id`, `eventId`, `seqNo`, `operationType`, +# MAGIC `tenantId`, `payload`, `timestamp`. The connector flattens the change +# MAGIC feed item — metadata like `lsn` and `crts` are not directly exposed +# MAGIC as columns. previousImage availability depends on container config. + +# COMMAND ---------- + +# Transform to reconciliation schema +# The Spark connector flattens AVAD events — use available columns directly. +# LSN/CRTS/previousImage not exposed as columns by the connector. +recon_df = ( + raw_df + .select( + concat(lit(SOURCE + "-"), col("eventId")).alias("id"), + col("eventId").alias("correlationId"), + lit(SOURCE).alias("source"), + coalesce(col("seqNo"), lit(-1)).cast(LongType()).alias("seqNo"), + coalesce(col("operationType"), lit("unknown")).alias("opType"), + coalesce(col("tenantId"), lit("")).alias("partitionKey"), + lit(-1).cast(LongType()).alias("lsn"), + lit(False).cast(BooleanType()).alias("hasPreviousImage"), + lit(-1).cast(LongType()).alias("crts"), + current_timestamp().cast(StringType()).alias("timestamp"), + ) + .filter(col("correlationId").isNotNull()) +) + +# COMMAND ---------- + +# Write to reconciliation container as a streaming job +query = ( + recon_df.writeStream + .format("cosmos.oltp") + .options(**recon_cfg) + .option("checkpointLocation", f"/Workspace/avad-soak/checkpoints/spark-avad") + .outputMode("append") + .trigger(processingTime="10 seconds") + .start() +) + +print(f"Spark AVAD streaming query started: {query.id}") +print(f"Status: {query.status}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Monitor +# MAGIC Run this cell periodically to check progress and AVAD correctness. + +# COMMAND ---------- + +# Check streaming query progress +if query.isActive: + progress = query.lastProgress + if progress: + print(f"Batch: {progress['batchId']}") + print(f"Input rows: {progress['numInputRows']}") + print(f"Processing time: {progress['batchDuration']} ms") + else: + print("No progress yet — waiting for first batch") +else: + print(f"Query stopped. Exception: {query.exception()}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### AVAD Correctness Check (ad-hoc) +# MAGIC Query the reconciliation container directly to check previousImage counts. + +# COMMAND ---------- + +# Ad-hoc correctness check — read reconciliation container +recon_read_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": recon_container, + "spark.cosmos.read.partitioning.strategy": "Default", +} + +recon_data = spark.read.format("cosmos.oltp").options(**recon_read_cfg).load() + +# previousImage check for spark-avad +missing_prev = ( + recon_data + .filter((col("source") == SOURCE) & col("opType").isin("replace", "delete") & (col("hasPreviousImage") == False)) + .count() +) +total_avad = recon_data.filter(col("source") == SOURCE).count() + +print(f"Spark AVAD total events: {total_avad}") +print(f"Missing previousImage: {missing_prev}") +print("✅ OK" if missing_prev == 0 else f"❌ {missing_prev} events missing previousImage") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Stop +# MAGIC Run this cell to stop the streaming query gracefully. + +# COMMAND ---------- + +# query.stop() +# print("Spark AVAD query stopped") diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_lv_reader.py b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_lv_reader.py new file mode 100644 index 000000000000..edc4eab562b6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_lv_reader.py @@ -0,0 +1,147 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # Spark Latest Version Change Feed Reader +# MAGIC +# MAGIC Reads the change feed in **Latest Version (Incremental)** mode using +# MAGIC `azure-cosmos-spark` connector and writes consumed events to the +# MAGIC `reconciliation` container with `source = "spark-lv"`. +# MAGIC +# MAGIC ## Prerequisites +# MAGIC - Databricks cluster with `azure-cosmos-spark_3-4_2-12` (or compatible) installed +# MAGIC - Cosmos DB account with `avad-test` and `reconciliation` containers +# MAGIC - Cluster env vars: `COSMOS_ENDPOINT`, `COSMOS_KEY` + +# COMMAND ---------- + +# Configuration — reads from notebook widgets (set via job parameters or manually) +import os + +try: + cosmos_endpoint = dbutils.widgets.get("cosmos_endpoint") +except: + cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT", "") + +try: + cosmos_key = dbutils.widgets.get("cosmos_key") +except: + cosmos_key = os.environ.get("COSMOS_KEY", "") + +try: + database = dbutils.widgets.get("database") +except: + database = "graph_db" + +feed_container = "avad-test" +recon_container = "reconciliation" + +assert cosmos_endpoint, "Set cosmos_endpoint widget or COSMOS_ENDPOINT env var" +assert cosmos_key, "Set cosmos_key widget or COSMOS_KEY env var" + +print(f"Endpoint: {cosmos_endpoint}") +print(f"Database: {database}") +print(f"Feed container: {feed_container}") + +# COMMAND ---------- + +# Spark Cosmos config — read change feed in incremental (LV) mode +feed_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": feed_container, + "spark.cosmos.read.partitioning.strategy": "Default", + "spark.cosmos.changeFeed.mode": "Incremental", + "spark.cosmos.changeFeed.startFrom": "Beginning", + "spark.cosmos.changeFeed.itemCountPerTriggerHint": "1000", +} + +# Write config — reconciliation container +recon_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": recon_container, + "spark.cosmos.write.strategy": "ItemOverwrite", + "spark.cosmos.write.bulk.enabled": "true", +} + +# COMMAND ---------- + +from pyspark.sql.functions import col, lit, concat, current_timestamp, coalesce +from pyspark.sql.types import StringType, LongType, BooleanType + +SOURCE = "spark-lv" + +# Read change feed as streaming DataFrame +raw_df = ( + spark.readStream + .format("cosmos.oltp.changeFeed") + .options(**feed_cfg) + .load() +) + +# Transform to reconciliation schema +recon_df = ( + raw_df + .select( + concat(lit(SOURCE + "-"), col("eventId")).alias("id"), + col("eventId").alias("correlationId"), + lit(SOURCE).alias("source"), + coalesce(col("seqNo"), lit(-1)).cast(LongType()).alias("seqNo"), + coalesce(col("operationType"), lit("unknown")).alias("opType"), + coalesce(col("tenantId"), lit("")).alias("partitionKey"), + lit(-1).cast(LongType()).alias("lsn"), + lit(False).cast(BooleanType()).alias("hasPreviousImage"), + lit(-1).cast(LongType()).alias("crts"), + current_timestamp().cast(StringType()).alias("timestamp"), + ) + .filter(col("correlationId").isNotNull()) +) + +# COMMAND ---------- + +# Write to reconciliation container as a streaming job +query = ( + recon_df.writeStream + .format("cosmos.oltp") + .options(**recon_cfg) + .option("checkpointLocation", f"/Workspace/avad-soak/checkpoints/spark-lv") + .outputMode("append") + .trigger(processingTime="10 seconds") + .start() +) + +print(f"Spark LV streaming query started: {query.id}") +print(f"Status: {query.status}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Monitor +# MAGIC Run this cell periodically to check progress. + +# COMMAND ---------- + +# Check streaming query progress +if query.isActive: + progress = query.lastProgress + if progress: + print(f"Batch: {progress['batchId']}") + print(f"Input rows: {progress['numInputRows']}") + print(f"Processing time: {progress['batchDuration']} ms") + print(f"Sources: {progress['sources']}") + else: + print("No progress yet — waiting for first batch") +else: + print(f"Query stopped. Exception: {query.exception()}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ### Stop +# MAGIC Run this cell to stop the streaming query gracefully. + +# COMMAND ---------- + +# query.stop() +# print("Spark LV query stopped") diff --git a/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_reconciler.py b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_reconciler.py new file mode 100644 index 000000000000..dd365cd32b65 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/avad-soak/spark/spark_reconciler.py @@ -0,0 +1,307 @@ +# Databricks notebook source +# MAGIC %md +# MAGIC # AVAD Soak Test — Spark Reconciler +# MAGIC +# MAGIC Runs all reconciliation checks against the `reconciliation` container using PySpark. +# MAGIC Handles millions of docs efficiently via bulk read + DataFrame operations. +# MAGIC +# MAGIC ## Checks +# MAGIC | # | Check | Sources | +# MAGIC |---|-------|---------| +# MAGIC | Q1 | Summary dashboard | All | +# MAGIC | Q2 | Gap detection (producer → consumer) | ingestor → cfp-lv, cfp-avad, spark-lv, spark-avad | +# MAGIC | Q3 | Parity (LV ⊆ AVAD) | cfp-lv → cfp-avad, spark-lv → spark-avad | +# MAGIC | Q4 | Cross-engine parity | cfp-lv ↔ spark-lv, cfp-avad ↔ spark-avad | +# MAGIC | Q5 | LSN ordering per partition | cfp-lv, cfp-avad, spark-lv, spark-avad | +# MAGIC | Q6 | CRTS ordering per partition | cfp-avad, spark-avad | +# MAGIC | Q7 | previousImage validation | cfp-avad, spark-avad | +# MAGIC | Q8 | Duplicate detection | All | +# MAGIC +# MAGIC ## Prerequisites +# MAGIC - `azure-cosmos-spark_3-4_2-12` connector installed on cluster +# MAGIC - Cluster env vars or widgets: `COSMOS_ENDPOINT`, `COSMOS_KEY` + +# COMMAND ---------- + +import os + +try: + cosmos_endpoint = dbutils.widgets.get("cosmos_endpoint") +except: + cosmos_endpoint = os.environ.get("COSMOS_ENDPOINT", "") + +try: + cosmos_key = dbutils.widgets.get("cosmos_key") +except: + cosmos_key = os.environ.get("COSMOS_KEY", "") + +try: + database = dbutils.widgets.get("database") +except: + database = "graph_db" + +assert cosmos_endpoint, "Set cosmos_endpoint widget or COSMOS_ENDPOINT env var" +assert cosmos_key, "Set cosmos_key widget or COSMOS_KEY env var" + +recon_cfg = { + "spark.cosmos.accountEndpoint": cosmos_endpoint, + "spark.cosmos.accountKey": cosmos_key, + "spark.cosmos.database": database, + "spark.cosmos.container": "reconciliation", + "spark.cosmos.read.partitioning.strategy": "Default", +} + +print(f"Endpoint: {cosmos_endpoint}") +print(f"Database: {database}") + +# COMMAND ---------- + +# Load entire reconciliation container into a cached DataFrame +recon = ( + spark.read + .format("cosmos.oltp") + .options(**recon_cfg) + .load() + .select("id", "correlationId", "source", "seqNo", "opType", + "partitionKey", "lsn", "hasPreviousImage", "crts", "timestamp") + .cache() +) + +total = recon.count() +print(f"Total reconciliation docs: {total:,}") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q1 — Summary Dashboard + +# COMMAND ---------- + +from pyspark.sql.functions import count, countDistinct, min as spark_min, max as spark_max, col + +summary = ( + recon + .groupBy("source") + .agg( + count("*").alias("totalEvents"), + countDistinct("correlationId").alias("uniqueEvents"), + spark_min("seqNo").alias("minSeq"), + spark_max("seqNo").alias("maxSeq"), + spark_min("lsn").alias("minLsn"), + spark_max("lsn").alias("maxLsn"), + ) + .orderBy("source") +) + +summary.show(truncate=False) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q2 — Gap Detection (Ingestor → Each Consumer) + +# COMMAND ---------- + +from pyspark.sql.functions import lit + +failures = 0 + +def check_gaps(source_a, source_b, label): + global failures + ids_a = recon.filter(col("source") == source_a).select("correlationId").distinct() + ids_b = recon.filter(col("source") == source_b).select("correlationId").distinct() + + count_a = ids_a.count() + count_b = ids_b.count() + + if count_a == 0: + print(f" ⏭️ {label}: {source_a} has 0 events — skipping") + return + if count_b == 0: + print(f" ⏭️ {label}: {source_b} has 0 events — skipping") + return + + missing = ids_a.subtract(ids_b) + gap_count = missing.count() + + status = "✅" if gap_count == 0 else "❌" + print(f" {status} {label}: {count_a:,} produced, {count_b:,} consumed, {gap_count:,} gaps") + + if gap_count > 0: + failures += 1 + print(f" Sample missing IDs:") + for row in missing.limit(10).collect(): + print(f" {row.correlationId}") + +print("=== Gap Detection ===") +check_gaps("ingestor", "cfp-lv", "Ingestor → CFP LV") +check_gaps("ingestor", "cfp-avad", "Ingestor → CFP AVAD") +check_gaps("ingestor", "spark-lv", "Ingestor → Spark LV") +check_gaps("ingestor", "spark-avad","Ingestor → Spark AVAD") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q3 — Parity (LV ⊆ AVAD) + +# COMMAND ---------- + +print("=== Parity (AVAD ⊇ LV) ===") +check_gaps("cfp-lv", "cfp-avad", "CFP Parity") +check_gaps("spark-lv", "spark-avad", "Spark Parity") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q4 — Cross-Engine Parity (CFP ↔ Spark) + +# COMMAND ---------- + +print("=== Cross-Engine Parity ===") +check_gaps("cfp-lv", "spark-lv", "LV: CFP → Spark") +check_gaps("spark-lv", "cfp-lv", "LV: Spark → CFP") +check_gaps("cfp-avad", "spark-avad", "AVAD: CFP → Spark") +check_gaps("spark-avad","cfp-avad", "AVAD: Spark → CFP") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q5 — LSN Ordering Per Partition + +# COMMAND ---------- + +from pyspark.sql.window import Window +from pyspark.sql.functions import lag, sum as spark_sum, when + +def check_lsn_ordering(source): + global failures + events = recon.filter((col("source") == source) & (col("lsn") >= 0)) + event_count = events.count() + + if event_count == 0: + print(f" ⏭️ {source}: no events with LSN — skipping") + return + + w = Window.partitionBy("partitionKey").orderBy("seqNo") + violations = ( + events + .withColumn("prevLsn", lag("lsn").over(w)) + .filter(col("prevLsn").isNotNull() & (col("lsn") < col("prevLsn"))) + .count() + ) + + status = "✅" if violations == 0 else "❌" + print(f" {status} {source}: {event_count:,} events, {violations:,} LSN ordering violations") + if violations > 0: + failures += 1 + +print("=== LSN Ordering ===") +for s in ["cfp-lv", "cfp-avad", "spark-lv", "spark-avad"]: + check_lsn_ordering(s) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q6 — CRTS Ordering Per Partition (AVAD Only) + +# COMMAND ---------- + +def check_crts_ordering(source): + global failures + events = recon.filter((col("source") == source) & (col("crts") >= 0)) + event_count = events.count() + + if event_count == 0: + print(f" ⏭️ {source}: no events with CRTS — skipping") + return + + w = Window.partitionBy("partitionKey").orderBy("seqNo") + violations = ( + events + .withColumn("prevCrts", lag("crts").over(w)) + .filter(col("prevCrts").isNotNull() & (col("crts") < col("prevCrts"))) + .count() + ) + + status = "✅" if violations == 0 else "❌" + print(f" {status} {source}: {event_count:,} events, {violations:,} CRTS ordering violations") + if violations > 0: + failures += 1 + +print("=== CRTS Ordering ===") +check_crts_ordering("cfp-avad") +check_crts_ordering("spark-avad") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q7 — previousImage Validation (AVAD Only) + +# COMMAND ---------- + +def check_previous_image(source): + global failures + missing = ( + recon + .filter( + (col("source") == source) & + col("opType").isin("replace", "delete") & + (col("hasPreviousImage") == False) + ) + .count() + ) + + total_rd = ( + recon + .filter( + (col("source") == source) & + col("opType").isin("replace", "delete") + ) + .count() + ) + + status = "✅" if missing == 0 else "❌" + print(f" {status} {source}: {total_rd:,} replace/delete events, {missing:,} missing previousImage") + if missing > 0: + failures += 1 + +print("=== previousImage Validation ===") +check_previous_image("cfp-avad") +check_previous_image("spark-avad") + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Q8 — Duplicate Detection (At-Least-Once) + +# COMMAND ---------- + +print("=== Duplicate Detection ===") +dupes = ( + recon + .groupBy("source") + .agg( + count("*").alias("total"), + countDistinct("correlationId").alias("unique"), + ) + .withColumn("duplicates", col("total") - col("unique")) + .withColumn("dupeRate", (col("duplicates") / col("total") * 100).cast("decimal(5,2)")) + .orderBy("source") +) + +dupes.show(truncate=False) + +# COMMAND ---------- + +# MAGIC %md +# MAGIC ## Final Verdict + +# COMMAND ---------- + +if failures == 0: + print("✅ ALL CHECKS PASSED") +else: + print(f"❌ {failures} CHECK(S) FAILED") + +# Return exit-like status for job runners +dbutils.notebook.exit("PASS" if failures == 0 else f"FAIL:{failures}") diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/Main.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/Main.java new file mode 100644 index 000000000000..b4facf95a42b --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/Main.java @@ -0,0 +1,167 @@ +package com.azure.cosmos.avadtest; + +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.avadtest.health.HealthMonitor; +import com.azure.cosmos.avadtest.health.HealthServer; +import com.azure.cosmos.avadtest.ingestor.Ingestor; +import com.azure.cosmos.avadtest.reader.AvadReader; +import com.azure.cosmos.avadtest.reader.LatestVersionReader; +import com.azure.cosmos.avadtest.reconciliation.Reconciler; +import com.beust.jcommander.JCommander; +import com.beust.jcommander.Parameter; +import com.beust.jcommander.ParameterException; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public final class Main { + + private static final Logger log = LoggerFactory.getLogger(Main.class); + + @Parameter(names = "--mode", required = true, + description = "Mode: ingestor, lv-reader, avad-reader, reconcile, health-monitor") + private String mode; + + @Parameter(names = "--source", description = "Source to reconcile from (e.g., ingestor)") + private String reconcileSource; + + @Parameter(names = "--against", description = "Source to reconcile against (e.g., cfp-avad)") + private String reconcileAgainst; + + @Parameter(names = "--full", description = "Run full reconciliation suite") + private boolean reconcileFull; + + @Parameter(names = "--health-port", description = "Health server port (default: 8080)") + private int healthPort = 8080; + + @Parameter(names = "--run-id", description = "Soak run identifier (for health monitor)") + private String runId = "soak-default"; + + @Parameter(names = "--gap-sla-minutes", description = "Minutes before an unconsumed event is flagged as a gap") + private int gapSlaMinutes = 10; + + @Parameter(names = "--config", description = "Path to JSON config file (env vars override JSON values)") + private String configFile; + + @Parameter(names = {"-h", "--help"}, description = "Help", help = true) + private boolean help; + + private TestConfig loadConfig() throws Exception { + if (configFile != null) { + log.info("Loading config from: {}", configFile); + return TestConfig.fromJson(configFile); + } + return TestConfig.fromEnv(); + } + + private int run() throws Exception { + log.info("Starting cosmos-avad-test in mode: {}", mode); + + switch (mode) { + case "ingestor": + return runIngestor(); + case "lv-reader": + return runLvReader(); + case "avad-reader": + return runAvadReader(); + case "reconcile": + return runReconcile(); + case "health-monitor": + return runHealthMonitor(); + default: + log.error("Unknown mode: {}. Use: ingestor, lv-reader, avad-reader, reconcile, health-monitor", mode); + return 1; + } + } + + private int runIngestor() throws Exception { + HealthServer healthServer = new HealthServer(healthPort); + healthServer.start(); + + TestConfig config = loadConfig(); + try (Ingestor ingestor = new Ingestor(config)) { + healthServer.setReady(true); + ingestor.run(); + return 0; + } finally { + healthServer.stop(); + } + } + + private int runLvReader() throws Exception { + HealthServer healthServer = new HealthServer(healthPort); + healthServer.start(); + + TestConfig config = loadConfig(); + try (LatestVersionReader reader = new LatestVersionReader(config)) { + healthServer.setReady(true); + reader.run(); + return 0; + } finally { + healthServer.stop(); + } + } + + private int runAvadReader() throws Exception { + HealthServer healthServer = new HealthServer(healthPort); + healthServer.start(); + + TestConfig config = loadConfig(); + try (AvadReader reader = new AvadReader(config)) { + healthServer.setReady(true); + reader.run(); + return 0; + } finally { + healthServer.stop(); + } + } + + private int runHealthMonitor() throws Exception { + TestConfig config = loadConfig(); + HealthMonitor monitor = new HealthMonitor(config, runId, gapSlaMinutes); + try { + return monitor.runChecks(); + } finally { + monitor.close(); + } + } + + private int runReconcile() throws Exception { + TestConfig config = loadConfig(); + try (Reconciler reconciler = new Reconciler(config)) { + if (reconcileFull) { + return reconciler.runFullSuite(); + } else if (reconcileSource != null && reconcileAgainst != null) { + return reconciler.reconcilePair(reconcileSource, reconcileAgainst); + } else { + log.error("Reconcile mode requires --full or --source + --against"); + return 1; + } + } + } + + public static void main(String[] args) { + Main main = new Main(); + JCommander jc = JCommander.newBuilder().addObject(main).build(); + jc.setProgramName("cosmos-avad-test"); + + try { + jc.parse(args); + } catch (ParameterException e) { + log.error("Invalid arguments: {}", e.getMessage()); + jc.usage(); + System.exit(1); + } + + if (main.help) { + jc.usage(); + return; + } + + try { + System.exit(main.run()); + } catch (Exception e) { + log.error("Fatal error", e); + System.exit(1); + } + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/config/TestConfig.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/config/TestConfig.java new file mode 100644 index 000000000000..effb372748b3 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/config/TestConfig.java @@ -0,0 +1,177 @@ +package com.azure.cosmos.avadtest.config; + +import com.fasterxml.jackson.databind.JsonNode; +import com.fasterxml.jackson.databind.ObjectMapper; + +import java.io.File; +import java.io.IOException; +import java.util.Collections; +import java.util.List; + +/** + * Configuration loaded from a JSON file with environment variable overrides. + *

+ * Load order (highest precedence first): + *

    + *
  1. Environment variables (e.g. COSMOS_KEY — always wins for secrets)
  2. + *
  3. JSON config file (--config path)
  4. + *
  5. Built-in defaults
  6. + *
+ */ +public final class TestConfig { + + private static final ObjectMapper MAPPER = new ObjectMapper(); + + private final String endpoint; + private final String regionalEndpoint; + private final String key; + private final String database; + private final String feedContainer; + private final String leaseContainer; + private final String preferredRegion; + private final int opsPerSec; + private final int docSizeBytes; + private final int logicalPartitionCount; + private final int durationSeconds; + private final int workerCount; + + private TestConfig(Builder builder) { + this.endpoint = builder.endpoint; + this.regionalEndpoint = builder.regionalEndpoint; + this.key = builder.key; + this.database = builder.database; + this.feedContainer = builder.feedContainer; + this.leaseContainer = builder.leaseContainer; + this.preferredRegion = builder.preferredRegion; + this.opsPerSec = builder.opsPerSec; + this.docSizeBytes = builder.docSizeBytes; + this.logicalPartitionCount = builder.logicalPartitionCount; + this.durationSeconds = builder.durationSeconds; + this.workerCount = builder.workerCount; + } + + /** + * Load config from a JSON file. Environment variables override JSON values. + */ + public static TestConfig fromJson(String filePath) throws IOException { + JsonNode root = MAPPER.readTree(new File(filePath)); + JsonNode cosmos = root.path("cosmos"); + JsonNode ingestor = root.path("ingestor"); + + return new Builder() + .endpoint(resolve("COSMOS_ENDPOINT", textOrNull(cosmos, "endpoint"), null)) + .regionalEndpoint(resolve("COSMOS_REGIONAL_ENDPOINT", textOrNull(cosmos, "regionalEndpoint"), "")) + .key(resolve("COSMOS_KEY", textOrNull(cosmos, "key"), null)) + .database(resolve("COSMOS_DATABASE", textOrNull(cosmos, "database"), "graph_db")) + .feedContainer(resolve("COSMOS_FEED_CONTAINER", textOrNull(cosmos, "feedContainer"), "avad-test")) + .leaseContainer(resolve("COSMOS_LEASE_CONTAINER", textOrNull(cosmos, "leaseContainer"), "avad-test-leases")) + .preferredRegion(resolve("COSMOS_PREFERRED_REGION", textOrNull(cosmos, "preferredRegion"), "West Central US")) + .opsPerSec(resolveInt("OPS_PER_SEC", intOrNull(ingestor, "opsPerSec"), 5000)) + .docSizeBytes(resolveInt("DOC_SIZE_BYTES", intOrNull(ingestor, "docSizeBytes"), 1024)) + .logicalPartitionCount(resolveInt("LOGICAL_PARTITION_COUNT", intOrNull(ingestor, "logicalPartitionCount"), 100000)) + .durationSeconds(resolveInt("DURATION_SECONDS", intOrNull(ingestor, "durationSeconds"), 3600)) + .workerCount(resolveInt("WORKER_COUNT", intOrNull(ingestor, "workerCount"), 2)) + .build(); + } + + /** + * Load config from environment variables only (no JSON file). + */ + public static TestConfig fromEnv() { + return new Builder() + .endpoint(envRequired("COSMOS_ENDPOINT")) + .regionalEndpoint(envOrDefault("COSMOS_REGIONAL_ENDPOINT", "")) + .key(envRequired("COSMOS_KEY")) + .database(envOrDefault("COSMOS_DATABASE", "graph_db")) + .feedContainer(envOrDefault("COSMOS_FEED_CONTAINER", "avad-test")) + .leaseContainer(envOrDefault("COSMOS_LEASE_CONTAINER", "avad-test-leases")) + .preferredRegion(envOrDefault("COSMOS_PREFERRED_REGION", "West Central US")) + .opsPerSec(Integer.parseInt(envOrDefault("OPS_PER_SEC", "5000"))) + .docSizeBytes(Integer.parseInt(envOrDefault("DOC_SIZE_BYTES", "1024"))) + .logicalPartitionCount(Integer.parseInt(envOrDefault("LOGICAL_PARTITION_COUNT", "100000"))) + .durationSeconds(Integer.parseInt(envOrDefault("DURATION_SECONDS", "3600"))) + .workerCount(Integer.parseInt(envOrDefault("WORKER_COUNT", "2"))) + .build(); + } + + /** Resolve: env var > JSON value > default. */ + private static String resolve(String envName, String jsonValue, String defaultValue) { + String env = envOrNull(envName); + if (env != null) return env; + if (jsonValue != null && !jsonValue.trim().isEmpty()) return jsonValue; + if (defaultValue != null) return defaultValue; + throw new IllegalStateException("Required config missing: " + envName); + } + + private static int resolveInt(String envName, Integer jsonValue, int defaultValue) { + String env = envOrNull(envName); + if (env != null) return Integer.parseInt(env); + if (jsonValue != null) return jsonValue; + return defaultValue; + } + + private static String textOrNull(JsonNode parent, String field) { + JsonNode node = parent.path(field); + return node.isMissingNode() || node.isNull() ? null : node.asText(); + } + + private static Integer intOrNull(JsonNode parent, String field) { + JsonNode node = parent.path(field); + return node.isMissingNode() || node.isNull() ? null : node.asInt(); + } + + private static String envOrNull(String name) { + String val = System.getenv(name); + if (val != null && !val.trim().isEmpty()) return val; + val = System.getProperty(name); + return (val != null && !val.trim().isEmpty()) ? val : null; + } + + private static String envRequired(String name) { + String val = envOrNull(name); + if (val == null) throw new IllegalStateException("Required config missing: " + name); + return val; + } + + private static String envOrDefault(String name, String defaultVal) { + String val = envOrNull(name); + return val != null ? val : defaultVal; + } + + public String endpoint() { return endpoint; } + public String regionalEndpoint() { return regionalEndpoint; } + public String readerEndpoint() { + return (regionalEndpoint != null && !regionalEndpoint.trim().isEmpty()) ? regionalEndpoint : endpoint; + } + public String key() { return key; } + public String database() { return database; } + public String feedContainer() { return feedContainer; } + public String leaseContainer() { return leaseContainer; } + public String preferredRegion() { return preferredRegion; } + public List preferredRegions() { return Collections.singletonList(preferredRegion); } + public int opsPerSec() { return opsPerSec; } + public int docSizeBytes() { return docSizeBytes; } + public int logicalPartitionCount() { return logicalPartitionCount; } + public int durationSeconds() { return durationSeconds; } + public int workerCount() { return workerCount; } + + public static final class Builder { + private String endpoint, regionalEndpoint, key, database, feedContainer, leaseContainer; + private String preferredRegion; + private int opsPerSec, docSizeBytes, logicalPartitionCount, durationSeconds, workerCount; + + public Builder endpoint(String v) { this.endpoint = v; return this; } + public Builder regionalEndpoint(String v) { this.regionalEndpoint = v; return this; } + public Builder key(String v) { this.key = v; return this; } + public Builder database(String v) { this.database = v; return this; } + public Builder feedContainer(String v) { this.feedContainer = v; return this; } + public Builder leaseContainer(String v) { this.leaseContainer = v; return this; } + public Builder preferredRegion(String v) { this.preferredRegion = v; return this; } + public Builder opsPerSec(int v) { this.opsPerSec = v; return this; } + public Builder docSizeBytes(int v) { this.docSizeBytes = v; return this; } + public Builder logicalPartitionCount(int v) { this.logicalPartitionCount = v; return this; } + public Builder durationSeconds(int v) { this.durationSeconds = v; return this; } + public Builder workerCount(int v) { this.workerCount = v; return this; } + public TestConfig build() { return new TestConfig(this); } + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthMonitor.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthMonitor.java new file mode 100644 index 000000000000..5a8e7a90575f --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthMonitor.java @@ -0,0 +1,219 @@ +package com.azure.cosmos.avadtest.health; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Mono; + +import java.time.Duration; +import java.time.Instant; +import java.util.concurrent.atomic.AtomicLong; + +/** + * Online health monitor that queries the reconciliation + * container for gap detection and correctness checks. + * + * Designed to run as a standalone mode ("health-monitor") + * or as a K8s CronJob. Queries once, reports, and exits. + * + * Checks: + * 1. Gap detection — produced events not consumed within + * SLA window (default 10 min) + * 2. previousImage correctness — AVAD replace/delete + * events missing previousImage + * 3. Parity — LV events not in AVAD (AVAD ⊇ LV) + * + * Writes a health snapshot to the "soak-health" container. + */ +public final class HealthMonitor { + + private static final Logger log = LoggerFactory.getLogger(HealthMonitor.class); + private static final String RECONCILIATION_CONTAINER = "reconciliation"; + + private final CosmosAsyncClient client; + private final CosmosAsyncContainer reconContainer; + private final String runId; + private final int gapSlaMinutes; + + public HealthMonitor(TestConfig config, String runId, int gapSlaMinutes) { + this.runId = runId; + this.gapSlaMinutes = gapSlaMinutes; + + this.client = new CosmosClientBuilder() + .endpoint(config.endpoint()) + .key(config.key()) + .gatewayMode() + .preferredRegions(config.preferredRegions()) + .buildAsyncClient(); + + this.reconContainer = client + .getDatabase(config.database()) + .getContainer(RECONCILIATION_CONTAINER); + } + + /** + * Run all health checks once and write a snapshot. + * Returns 0 if healthy, 1 if any check failed. + */ + public int runChecks() { + log.info("=== Health Monitor Check (runId={}) ===", runId); + Instant now = Instant.now(); + boolean healthy = true; + + // 1. Count produced events + long producedCount = countBySource("ingestor"); + log.info(" Produced (ingestor): {}", producedCount); + if (producedCount < 0) { log.error(" ❌ Query failed for ingestor count"); healthy = false; } + + // 2. Count AVAD consumed events + long avadConsumed = countBySource("cfp-avad"); + log.info(" AVAD consumed: {}", avadConsumed); + if (avadConsumed < 0) { log.error(" ❌ Query failed for AVAD count"); healthy = false; } + + // 3. Count LV consumed events + long lvConsumed = countBySource("cfp-lv"); + log.info(" LV consumed: {}", lvConsumed); + if (lvConsumed < 0) { log.error(" ❌ Query failed for LV count"); healthy = false; } + + // 4. Gap detection — produced but not in AVAD + // (older than SLA window) + long gapCount = countGaps("ingestor", "cfp-avad"); + log.info(" Gaps (produced not in AVAD, >{} min): {}", + gapSlaMinutes, gapCount); + if (gapCount > 0) { + log.error(" ❌ {} missed changes detected", gapCount); + healthy = false; + } + + // 5. Parity — LV not in AVAD + long parityGaps = countGaps("cfp-lv", "cfp-avad"); + log.info(" Parity gaps (LV not in AVAD): {}", parityGaps); + if (parityGaps > 0) { + log.error(" ❌ {} LV events missing in AVAD", parityGaps); + healthy = false; + } + + // 6. Missing previousImage + long missingPrev = countMissingPreviousImage(); + log.info(" Missing previousImage: {}", missingPrev); + if (missingPrev > 0) { + log.error(" ❌ {} replace/delete events missing previousImage", + missingPrev); + healthy = false; + } + + String status = healthy ? "✅ HEALTHY" : "❌ UNHEALTHY"; + log.info(" Status: {}", status); + return healthy ? 0 : 1; + } + + private long countBySource(String source) { + String query = "SELECT VALUE COUNT(1) FROM c WHERE c.source = '" + source + "'"; + try { + return reconContainer.queryItems(query, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(1) + .flatMap(page -> { + if (page.getResults().isEmpty()) return Mono.just(0L); + return Mono.just(page.getResults().get(0).asLong()); + }) + .blockFirst(Duration.ofSeconds(30)); + } catch (Exception e) { + log.warn("Failed to count source={}: {}", source, e.getMessage()); + return -1; + } + } + + private long countGaps(String producerSource, String consumerSource) { + // Count events in producer that are not in consumer + // and are older than the SLA window + String query = String.format( + "SELECT VALUE COUNT(1) FROM c " + + "WHERE c.source = '%s' " + + "AND NOT IS_DEFINED(" + + " (SELECT VALUE 1 FROM c2 IN c " + + " WHERE c2.source = '%s' " + + " AND c2.correlationId = c.correlationId)" + + ") " + + "AND c.timestamp < '%s'", + producerSource, consumerSource, + Instant.now().minus(Duration.ofMinutes(gapSlaMinutes)).toString() + ); + + // Simplified approach: count producer IDs not in consumer + // Cross-partition query is expensive; use a simpler approach + // by sampling recent events + String sampleQuery = String.format( + "SELECT TOP 100 c.correlationId FROM c " + + "WHERE c.source = '%s' " + + "AND c.timestamp < '%s' " + + "ORDER BY c.timestamp DESC", + producerSource, + Instant.now().minus(Duration.ofMinutes(gapSlaMinutes)).toString() + ); + + try { + AtomicLong gaps = new AtomicLong(0); + reconContainer.queryItems(sampleQuery, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(100) + .flatMap(page -> { + for (JsonNode item : page.getResults()) { + String corrId = item.get("correlationId").asText(); + // Check if consumer has this event + String checkQuery = String.format( + "SELECT VALUE COUNT(1) FROM c " + + "WHERE c.correlationId = '%s' " + + "AND c.source = '%s'", + corrId, consumerSource); + Long count = reconContainer.queryItems( + checkQuery, + new CosmosQueryRequestOptions().setPartitionKey(new PartitionKey(corrId)), + JsonNode.class) + .byPage(1) + .map(p -> p.getResults().isEmpty() ? 0L : p.getResults().get(0).asLong()) + .blockFirst(Duration.ofSeconds(5)); + if (count == null || count == 0) { + gaps.incrementAndGet(); + } + } + return Mono.empty(); + }) + .blockLast(Duration.ofSeconds(60)); + return gaps.get(); + } catch (Exception e) { + log.warn("Failed gap detection {}->{}: {}", producerSource, consumerSource, e.getMessage()); + return -1; + } + } + + private long countMissingPreviousImage() { + String query = + "SELECT VALUE COUNT(1) FROM c " + + "WHERE c.source = 'cfp-avad' " + + "AND c.opType IN ('replace', 'delete') " + + "AND c.hasPreviousImage = false"; + try { + return reconContainer.queryItems(query, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(1) + .flatMap(page -> { + if (page.getResults().isEmpty()) return Mono.just(0L); + return Mono.just(page.getResults().get(0).asLong()); + }) + .blockFirst(Duration.ofSeconds(30)); + } catch (Exception e) { + log.warn("Failed previousImage check: {}", e.getMessage()); + return -1; + } + } + + public void close() { + log.info("Closing HealthMonitor..."); + client.close(); + log.info("HealthMonitor closed"); + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthServer.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthServer.java new file mode 100644 index 000000000000..fb103241cae1 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/health/HealthServer.java @@ -0,0 +1,84 @@ +package com.azure.cosmos.avadtest.health; + +import com.sun.net.httpserver.HttpServer; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.io.IOException; +import java.io.OutputStream; +import java.net.InetSocketAddress; +import java.nio.charset.StandardCharsets; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Executors; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; + +/** + * Lightweight HTTP health server for Kubernetes probes. + * + * Endpoints: + * GET /health — liveness probe (always 200 if JVM is up) + * GET /ready — readiness probe (200 when workload is ready) + */ +public final class HealthServer { + + private static final Logger log = LoggerFactory.getLogger(HealthServer.class); + private static final int DEFAULT_PORT = 8080; + + private final HttpServer server; + private final ExecutorService executor; + private final AtomicBoolean ready = new AtomicBoolean(false); + + public HealthServer() throws IOException { + this(DEFAULT_PORT); + } + + public HealthServer(int port) throws IOException { + this.executor = Executors.newFixedThreadPool(2); + this.server = HttpServer.create(new InetSocketAddress(port), 0); + this.server.setExecutor(executor); + + server.createContext("/health", exchange -> { + byte[] body = "{\"status\":\"UP\"}".getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.sendResponseHeaders(200, body.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(body); + } + }); + + server.createContext("/ready", exchange -> { + boolean isReady = ready.get(); + String json = "{\"ready\":" + isReady + "}"; + byte[] body = json.getBytes(StandardCharsets.UTF_8); + exchange.getResponseHeaders().set("Content-Type", "application/json"); + exchange.sendResponseHeaders(isReady ? 200 : 503, body.length); + try (OutputStream os = exchange.getResponseBody()) { + os.write(body); + } + }); + } + + public void start() { + server.start(); + log.info("Health server started on port {}", server.getAddress().getPort()); + } + + public void setReady(boolean isReady) { + ready.set(isReady); + log.info("Readiness set to: {}", isReady); + } + + public void stop() { + server.stop(2); + executor.shutdown(); + try { + if (!executor.awaitTermination(5, TimeUnit.SECONDS)) { + executor.shutdownNow(); + } + } catch (InterruptedException e) { + executor.shutdownNow(); + } + log.info("Health server stopped"); + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/ingestor/Ingestor.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/ingestor/Ingestor.java new file mode 100644 index 000000000000..e92fc13dde1b --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/ingestor/Ingestor.java @@ -0,0 +1,307 @@ +package com.azure.cosmos.avadtest.ingestor; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.avadtest.reconciliation.ReconciliationWriter; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import java.time.Duration; +import java.time.Instant; +import java.util.ArrayList; +import java.util.IdentityHashMap; +import java.util.List; +import java.util.Map; +import java.util.UUID; +import java.util.concurrent.ThreadLocalRandom; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicLong; +import java.util.concurrent.atomic.LongAdder; + +/** + * Ingestion workload using the Cosmos DB bulk API. + * Operation mix: creates (40%), upserts (40% — replaces + upserts), deletes (20%). + * Every operation gets a unique eventId for per-event reconciliation. + * Batch of N operations submitted via executeBulkOperations every tick. + */ +public final class Ingestor implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(Ingestor.class); + private static final int TICK_INTERVAL_MS = 100; + private static final double FAILURE_ABORT_THRESHOLD = 0.5; + + private final TestConfig config; + private final CosmosAsyncClient client; + private final CosmosAsyncContainer container; + private final ReconciliationWriter reconWriter; + private final AtomicLong seqCounter = new AtomicLong(0); + private final AtomicBoolean running = new AtomicBoolean(true); + + private final LongAdder successCount = new LongAdder(); + private final LongAdder failureCount = new LongAdder(); + + // Track recently created doc IDs for upsert/delete operations + private final String[] recentDocIds; + private final AtomicLong recentIndex = new AtomicLong(0); + + private final int opsPerTick; + private final String precomputedPayload; + + // Reactor subscriptions — disposed on close to prevent leaks + private volatile reactor.core.Disposable progressSubscription; + private final CosmosBulkExecutionOptions bulkOptions; + + public Ingestor(TestConfig config) throws Exception { + this.config = config; + this.recentDocIds = new String[10_000]; + this.opsPerTick = Math.max(1, config.opsPerSec() * TICK_INTERVAL_MS / 1000); + this.bulkOptions = new CosmosBulkExecutionOptions(); + + int size = Math.min(Math.max(config.docSizeBytes(), 0), 10_000); + StringBuilder sb = new StringBuilder(size); + for (int i = 0; i < size; i++) { sb.append('x'); } + this.precomputedPayload = sb.toString(); + + this.client = new CosmosClientBuilder() + .endpoint(config.endpoint()) + .key(config.key()) + .gatewayMode() + .contentResponseOnWriteEnabled(true) + .preferredRegions(config.preferredRegions()) + .buildAsyncClient(); + + this.container = client + .getDatabase(config.database()) + .getContainer(config.feedContainer()); + + this.reconWriter = new ReconciliationWriter(client, config.database(), "ingestor"); + + log.info("Ingestor initialized (bulk mode): endpoint={}, db={}, container={}, ops/sec={}, opsPerTick={}", + config.endpoint(), config.database(), config.feedContainer(), + config.opsPerSec(), opsPerTick); + } + + public void run() throws InterruptedException { + int durationSec = config.durationSeconds(); + log.info("Starting bulk ingestion at {} ops/sec, duration={}", + config.opsPerSec(), durationSec > 0 ? durationSec + "s" : "unlimited"); + + long deadline = durationSec > 0 ? System.currentTimeMillis() + (durationSec * 1000L) : Long.MAX_VALUE; + + this.progressSubscription = Flux.interval(Duration.ofSeconds(30)) + .takeWhile(tick -> running.get()) + .subscribe(tick -> { + long s = successCount.sum(); + long f = failureCount.sum(); + long total = s + f; + double failRate = total > 0 ? (double) f / total : 0; + log.info("Progress: success={}, failures={}, failRate={}%, seq={}", + s, f, String.format("%.1f", failRate * 100), seqCounter.get()); + if (total > 100 && failRate > FAILURE_ABORT_THRESHOLD) { + log.error("Failure rate {}% exceeds threshold, aborting!", String.format("%.1f", failRate * 100)); + running.set(false); + } + }); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutdown signal received, stopping ingestor..."); + running.set(false); + })); + + // Main ingestion loop: submit a bulk batch, then sleep for the remaining tick interval + while (running.get() && System.currentTimeMillis() < deadline) { + long tickStart = System.currentTimeMillis(); + + try { + executeBulkBatch(); + } catch (Exception e) { + log.error("Bulk batch error", e); + } + + long elapsed = System.currentTimeMillis() - tickStart; + long sleepMs = TICK_INTERVAL_MS - elapsed; + if (sleepMs > 0) { + Thread.sleep(sleepMs); + } + } + + if (System.currentTimeMillis() >= deadline) { + log.info("Duration {}s reached, stopping ingestor...", durationSec); + } + running.set(false); + } + + private void executeBulkBatch() { + List operations = new ArrayList<>(opsPerTick); + Map opToMeta = new IdentityHashMap<>(opsPerTick); + + for (int i = 0; i < opsPerTick; i++) { + int roll = ThreadLocalRandom.current().nextInt(100); + if (roll < 40) { + addCreate(operations, opToMeta); + } else if (roll < 80) { + addUpsert(operations, opToMeta); + } else { + addDelete(operations, opToMeta); + } + } + + if (operations.isEmpty()) return; + + container.executeBulkOperations(Flux.fromIterable(operations), bulkOptions) + .toStream() + .forEach(response -> handleBulkResponse(response, opToMeta)); + + reconWriter.flush(); + } + + private void addCreate(List ops, Map opToMeta) { + String docId = UUID.randomUUID().toString(); + String eventId = UUID.randomUUID().toString(); + String pk = "tenant-" + ThreadLocalRandom.current().nextInt(config.logicalPartitionCount()); + long seq = seqCounter.incrementAndGet(); + String ts = Instant.now().toString(); + + ObjectNode doc = buildDoc(docId, pk, seq, eventId, "create", ts); + CosmosItemOperation op = CosmosBulkOperations.getCreateItemOperation(doc, new PartitionKey(pk)); + ops.add(op); + opToMeta.put(op, new OpMeta(eventId, seq, "create", pk, ts, docId)); + } + + private void addUpsert(List ops, Map opToMeta) { + String recent = getRecentId(); + String docId; + String pk; + if (recent != null) { + String[] parts = recent.split("\\|"); + docId = parts[0]; + pk = parts[1]; + } else { + docId = UUID.randomUUID().toString(); + pk = "tenant-" + ThreadLocalRandom.current().nextInt(config.logicalPartitionCount()); + } + + String eventId = UUID.randomUUID().toString(); + long seq = seqCounter.incrementAndGet(); + String ts = Instant.now().toString(); + ObjectNode doc = buildDoc(docId, pk, seq, eventId, "upsert", ts); + + CosmosItemOperation op = CosmosBulkOperations.getUpsertItemOperation(doc, new PartitionKey(pk)); + ops.add(op); + opToMeta.put(op, new OpMeta(eventId, seq, "upsert", pk, ts, docId)); + } + + private void addDelete(List ops, Map opToMeta) { + String recent = getRecentId(); + if (recent == null) { + addCreate(ops, opToMeta); + return; + } + + String[] parts = recent.split("\\|"); + String docId = parts[0]; + String pk = parts[1]; + long seq = seqCounter.incrementAndGet(); + String ts = Instant.now().toString(); + + CosmosItemOperation op = CosmosBulkOperations.getDeleteItemOperation(docId, new PartitionKey(pk)); + ops.add(op); + opToMeta.put(op, new OpMeta(docId, seq, "delete", pk, ts, docId)); + clearRecentId(recent); + } + + private void handleBulkResponse(CosmosBulkOperationResponse response, + Map opToMeta) { + OpMeta meta = opToMeta.get(response.getOperation()); + if (meta == null) return; + + if (response.getResponse() != null && response.getResponse().isSuccessStatusCode()) { + successCount.increment(); + reconWriter.record(meta.eventId, meta.seq, meta.opType, meta.pk, -1, false, -1); + if (!"delete".equals(meta.opType)) { + trackRecentId(meta.docId + "|" + meta.pk); + } + } else { + failureCount.increment(); + int status = response.getResponse() != null ? response.getResponse().getStatusCode() : -1; + if (status != 404) { + log.warn("Bulk op failed: op={}, docId={}, status={}", meta.opType, meta.docId, status); + } + } + } + + private ObjectNode buildDoc(String docId, String pk, long seq, + String eventId, String opType, String ts) { + ObjectNode doc = JsonNodeFactory.instance.objectNode(); + doc.put("id", docId); + doc.put("tenantId", pk); + doc.put("eventId", eventId); + doc.put("seqNo", seq); + doc.put("operationType", opType); + doc.put("timestamp", ts); + doc.put("payload", precomputedPayload); + return doc; + } + + private void trackRecentId(String idAndPk) { + int idx = (int) (recentIndex.incrementAndGet() % recentDocIds.length); + recentDocIds[idx] = idAndPk; + } + + private void clearRecentId(String idAndPk) { + for (int i = 0; i < recentDocIds.length; i++) { + if (idAndPk.equals(recentDocIds[i])) { + recentDocIds[i] = null; + } + } + } + + private String getRecentId() { + long idx = recentIndex.get(); + if (idx == 0) return null; + int start = (int) (idx % recentDocIds.length); + int offset = ThreadLocalRandom.current().nextInt(Math.min((int) idx, recentDocIds.length)); + String val = recentDocIds[(start - offset + recentDocIds.length) % recentDocIds.length]; + return val; // may be null if cleared + } + + @Override + public void close() { + log.info("Closing Ingestor..."); + running.set(false); + if (progressSubscription != null) { progressSubscription.dispose(); } + reconWriter.close(); + client.close(); + log.info("Ingestor closed. Total ops: {}, success: {}, failures: {}", + seqCounter.get(), successCount.sum(), failureCount.sum()); + } + + /** Metadata for correlating bulk operation responses back to produced events. */ + private static final class OpMeta { + final String eventId; + final long seq; + final String opType; + final String pk; + final String ts; + final String docId; + + OpMeta(String eventId, long seq, String opType, String pk, String ts, String docId) { + this.eventId = eventId; + this.seq = seq; + this.opType = opType; + this.pk = pk; + this.ts = ts; + this.docId = docId; + } + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/AvadReader.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/AvadReader.java new file mode 100644 index 000000000000..f102859c9ab8 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/AvadReader.java @@ -0,0 +1,202 @@ +package com.azure.cosmos.avadtest.reader; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.avadtest.reconciliation.ReconciliationWriter; +import com.azure.cosmos.models.ChangeFeedProcessorOptions; +import com.azure.cosmos.models.ChangeFeedProcessorItem; +import com.azure.cosmos.models.ChangeFeedMetaData; +import com.azure.cosmos.models.ChangeFeedOperationType; +import com.azure.cosmos.ChangeFeedProcessor; +import com.azure.cosmos.ChangeFeedProcessorBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.atomic.LongAdder; + +/** + * All Versions and Deletes (AVAD) ChangeFeedProcessor reader. + * Lease prefix "avad-" — isolated from Latest Version reader leases. + * Gateway mode, preferred region configurable (default: West Central US). + * + * Additional validations vs LatestVersionReader: + * - previousImage must be non-null on replace and delete events + * - operationType metadata is checked for create/replace/delete + */ +public final class AvadReader implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(AvadReader.class); + private static final String LEASE_PREFIX = "avad-"; + + private final TestConfig config; + private final CosmosAsyncClient client; + private final CosmosAsyncContainer feedContainer; + private final CosmosAsyncContainer leaseContainer; + private final ReconciliationWriter reconWriter; + private final List processors = new ArrayList<>(); + + // Correctness counters (thread-safe for concurrent CFP batch processing) + private final LongAdder missingPreviousImageCount = new LongAdder(); + private final LongAdder totalReplaces = new LongAdder(); + private final LongAdder totalDeletes = new LongAdder(); + private final LongAdder totalCreates = new LongAdder(); + + public AvadReader(TestConfig config) throws Exception { + this.config = config; + this.client = new CosmosClientBuilder() + .endpoint(config.readerEndpoint()) + .key(config.key()) + .gatewayMode() + .contentResponseOnWriteEnabled(true) + .preferredRegions(config.preferredRegions()) + .buildAsyncClient(); + + this.feedContainer = client + .getDatabase(config.database()) + .getContainer(config.feedContainer()); + + this.leaseContainer = client + .getDatabase(config.database()) + .getContainer(config.leaseContainer()); + + this.reconWriter = new ReconciliationWriter(client, config.database(), "cfp-avad"); + + log.info("AvadReader initialized: prefix={}, endpoint={}, region={}, workers={}", + LEASE_PREFIX, config.readerEndpoint(), config.preferredRegion(), config.workerCount()); + } + + public void run() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + int workers = config.workerCount(); + + for (int i = 0; i < workers; i++) { + final int workerIdx = i; + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); + options.setLeasePrefix(LEASE_PREFIX); + options.setFeedPollDelay(Duration.ofMillis(100)); + options.setMaxItemCount(1000); + + ChangeFeedProcessor processor = new ChangeFeedProcessorBuilder() + .hostName("avad-host-" + ManagementFactory.getRuntimeMXBean().getName() + "-w" + workerIdx) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .options(options) + .handleAllVersionsAndDeletesChanges(this::handleChanges) + .buildChangeFeedProcessor(); + + processor.start() + .doOnSuccess(v -> log.info("AVAD ChangeFeedProcessor worker-{} started", workerIdx)) + .doOnError(e -> log.error("Failed to start AVAD CFP worker-{}", workerIdx, e)) + .block(); + + processors.add(processor); + } + + log.info("All {} AVAD workers started", workers); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutdown signal, stopping {} AVAD CFP workers...", processors.size()); + for (ChangeFeedProcessor p : processors) { + try { p.stop().block(Duration.ofSeconds(30)); } catch (Exception e) { /* ignore */ } + } + latch.countDown(); + })); + + latch.await(); + } + + private void handleChanges(List items) { + for (ChangeFeedProcessorItem item : items) { + JsonNode current = item.getCurrent(); + JsonNode previous = item.getPrevious(); + ChangeFeedMetaData metadata = item.getChangeFeedMetaData(); + + if (metadata == null) { + log.warn("Null metadata on AVAD change feed item, skipping"); + continue; + } + + // LSN and CRTS come from metadata — not from the document body. + // This is critical for deletes where current may be a tombstone. + ChangeFeedOperationType opEnum = metadata.getOperationType(); + String opType = opEnum != null ? opEnum.toString().toLowerCase() : "unknown"; + long lsn = metadata.getLogSequenceNumber(); + Instant crtsInstant = metadata.getConflictResolutionTimestamp(); + long crts = crtsInstant != null ? crtsInstant.toEpochMilli() : -1; + + // For deletes, current is a tombstone — extract business fields from previous + JsonNode source; + if (opEnum == ChangeFeedOperationType.DELETE) { + source = previous; + } else { + source = (current != null && !current.isNull()) ? current : previous; + } + + String eventId = source != null ? getTextOrEmpty(source, "eventId") : ""; + long seqNo = source != null && source.has("seqNo") ? source.get("seqNo").asLong() : -1; + String pk = source != null ? getTextOrEmpty(source, "tenantId") : ""; + String timestamp = source != null ? getTextOrEmpty(source, "timestamp") : ""; + + boolean hasPrevious = previous != null && !previous.isNull(); + + // Track operation types and validate previousImage + if ("create".equals(opType)) { + totalCreates.increment(); + } else if ("replace".equals(opType)) { + totalReplaces.increment(); + if (!hasPrevious) { + missingPreviousImageCount.increment(); + log.warn("⚠️ MISSING previous on REPLACE: eventId={}, pk={}", eventId, pk); + } + } else if ("delete".equals(opType)) { + totalDeletes.increment(); + if (!hasPrevious) { + missingPreviousImageCount.increment(); + log.warn("⚠️ MISSING previous on DELETE: eventId={}, pk={}", eventId, pk); + } + } + + reconWriter.record(eventId, seqNo, opType, pk, lsn, hasPrevious, crts); + } + + reconWriter.flush(); + } + + private void logCorrectnessReport() { + log.info("=== AVAD Correctness Report ==="); + log.info(" Creates: {}", totalCreates.sum()); + log.info(" Replaces: {} (missing previous: {})", totalReplaces.sum(), missingPreviousImageCount.sum()); + log.info(" Deletes: {}", totalDeletes.sum()); + long missing = missingPreviousImageCount.sum(); + if (missing > 0) { + log.error("❌ previous MISSING on {} replace/delete events", missing); + } else { + log.info("✅ All replace/delete events have previous image"); + } + } + + private static String getTextOrEmpty(JsonNode node, String field) { + return node.has(field) ? node.get(field).asText() : ""; + } + + @Override + public void close() { + log.info("Closing AvadReader..."); + logCorrectnessReport(); + for (ChangeFeedProcessor p : processors) { + try { p.stop().block(Duration.ofSeconds(30)); } catch (Exception e) { /* ignore */ } + } + reconWriter.close(); + client.close(); + log.info("AvadReader closed ({} workers)", processors.size()); + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/LatestVersionReader.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/LatestVersionReader.java new file mode 100644 index 000000000000..4fb1acb7c048 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reader/LatestVersionReader.java @@ -0,0 +1,138 @@ +package com.azure.cosmos.avadtest.reader; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.avadtest.reconciliation.ReconciliationWriter; +import com.azure.cosmos.models.ChangeFeedProcessorOptions; +import com.azure.cosmos.models.ChangeFeedProcessorItem; +import com.azure.cosmos.models.ChangeFeedMetaData; +import com.azure.cosmos.ChangeFeedProcessor; +import com.azure.cosmos.ChangeFeedProcessorBuilder; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.time.Instant; +import java.lang.management.ManagementFactory; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.CountDownLatch; + +/** + * Latest Version ChangeFeedProcessor reader. + * Lease prefix "lv-" — isolated from AVAD reader leases. + * Gateway mode, preferred region configurable (default: West Central US). + */ +public final class LatestVersionReader implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(LatestVersionReader.class); + private static final String LEASE_PREFIX = "lv-"; + + private final TestConfig config; + private final CosmosAsyncClient client; + private final CosmosAsyncContainer feedContainer; + private final CosmosAsyncContainer leaseContainer; + private final ReconciliationWriter reconWriter; + private final List processors = new ArrayList<>(); + + public LatestVersionReader(TestConfig config) throws Exception { + this.config = config; + this.client = new CosmosClientBuilder() + .endpoint(config.readerEndpoint()) + .key(config.key()) + .gatewayMode() + .contentResponseOnWriteEnabled(true) + .preferredRegions(config.preferredRegions()) + .buildAsyncClient(); + + this.feedContainer = client + .getDatabase(config.database()) + .getContainer(config.feedContainer()); + + this.leaseContainer = client + .getDatabase(config.database()) + .getContainer(config.leaseContainer()); + + this.reconWriter = new ReconciliationWriter(client, config.database(), "cfp-lv"); + + log.info("LatestVersionReader initialized: prefix={}, endpoint={}, region={}, workers={}", + LEASE_PREFIX, config.readerEndpoint(), config.preferredRegion(), config.workerCount()); + } + + public void run() throws InterruptedException { + CountDownLatch latch = new CountDownLatch(1); + int workers = config.workerCount(); + + for (int i = 0; i < workers; i++) { + final int workerIdx = i; + ChangeFeedProcessorOptions options = new ChangeFeedProcessorOptions(); + options.setLeasePrefix(LEASE_PREFIX); + options.setFeedPollDelay(Duration.ofMillis(100)); + options.setMaxItemCount(1000); + + ChangeFeedProcessor processor = new ChangeFeedProcessorBuilder() + .hostName("lv-host-" + ManagementFactory.getRuntimeMXBean().getName() + "-w" + i) + .feedContainer(feedContainer) + .leaseContainer(leaseContainer) + .options(options) + .handleLatestVersionChanges(this::handleChanges) + .buildChangeFeedProcessor(); + + processor.start() + .doOnSuccess(v -> log.info("LV ChangeFeedProcessor worker-{} started", workerIdx)) + .doOnError(e -> log.error("Failed to start LV CFP worker-{}", workerIdx, e)) + .block(); + + processors.add(processor); + } + + log.info("All {} LV workers started", workers); + + Runtime.getRuntime().addShutdownHook(new Thread(() -> { + log.info("Shutdown signal, stopping {} LV CFP workers...", processors.size()); + for (ChangeFeedProcessor p : processors) { + try { p.stop().block(Duration.ofSeconds(30)); } catch (Exception e) { /* ignore */ } + } + latch.countDown(); + })); + + latch.await(); + } + + private void handleChanges(List items) { + for (ChangeFeedProcessorItem item : items) { + JsonNode current = item.getCurrent(); + if (current == null || current.isNull()) continue; // LV mode shouldn't get null current + + ChangeFeedMetaData metadata = item.getChangeFeedMetaData(); + String eventId = getTextOrEmpty(current, "eventId"); + long seqNo = current.has("seqNo") ? current.get("seqNo").asLong() : -1; + String opType = getTextOrEmpty(current, "operationType"); + String pk = getTextOrEmpty(current, "tenantId"); + String timestamp = getTextOrEmpty(current, "timestamp"); + long lsn = metadata != null ? metadata.getLogSequenceNumber() : -1; + + reconWriter.record(eventId, seqNo, opType, pk, lsn, false, -1); + } + + reconWriter.flush(); + } + + private static String getTextOrEmpty(JsonNode node, String field) { + return node.has(field) ? node.get(field).asText() : ""; + } + + @Override + public void close() { + log.info("Closing LatestVersionReader..."); + for (ChangeFeedProcessor p : processors) { + try { p.stop().block(Duration.ofSeconds(30)); } catch (Exception e) { /* ignore */ } + } + reconWriter.close(); + client.close(); + log.info("LatestVersionReader closed ({} workers)", processors.size()); + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/Reconciler.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/Reconciler.java new file mode 100644 index 000000000000..08b64970e86e --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/Reconciler.java @@ -0,0 +1,342 @@ +package com.azure.cosmos.avadtest.reconciliation; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosClientBuilder; +import com.azure.cosmos.avadtest.config.TestConfig; +import com.azure.cosmos.models.CosmosQueryRequestOptions; +import com.azure.cosmos.models.SqlParameter; +import com.azure.cosmos.models.SqlQuerySpec; +import com.fasterxml.jackson.databind.JsonNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +import java.time.Duration; +import java.util.*; +import java.util.stream.Collectors; + +/** + * Reconciler that queries the shared "reconciliation" Cosmos container + * to detect gaps, ordering violations, and missing previousImage across + * all source types (ingestor, cfp-lv, cfp-avad, spark-lv, spark-avad). + */ +public final class Reconciler implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(Reconciler.class); + private static final Duration QUERY_TIMEOUT = Duration.ofSeconds(60); + + private final CosmosAsyncClient client; + private final CosmosAsyncContainer container; + + public Reconciler(TestConfig config) { + this.client = new CosmosClientBuilder() + .endpoint(config.endpoint()) + .key(config.key()) + .gatewayMode() + .contentResponseOnWriteEnabled(true) + .preferredRegions(config.preferredRegions()) + .buildAsyncClient(); + + this.container = client + .getDatabase(config.database()) + .getContainer("reconciliation"); + + log.info("Reconciler initialized: endpoint={}, db={}", config.endpoint(), config.database()); + } + + /** Run all reconciliation checks across all source pairs. */ + public int runFullSuite() { + log.info("=== Full Reconciliation Suite ==="); + int failures = 0; + + logSummary(); + + // Gap detection: ingestor → each consumer + failures += checkGaps("ingestor", "cfp-lv", "Ingestor → CFP LV"); + failures += checkGaps("ingestor", "cfp-avad", "Ingestor → CFP AVAD"); + failures += checkGaps("ingestor", "spark-lv", "Ingestor → Spark LV"); + failures += checkGaps("ingestor", "spark-avad", "Ingestor → Spark AVAD"); + + // Parity: LV ⊆ AVAD + failures += checkGaps("cfp-lv", "cfp-avad", "CFP Parity (AVAD ⊇ LV)"); + failures += checkGaps("spark-lv", "spark-avad", "Spark Parity (AVAD ⊇ LV)"); + + // Cross-engine + failures += checkGaps("cfp-lv", "spark-lv", "Cross-engine LV"); + failures += checkGaps("cfp-avad", "spark-avad", "Cross-engine AVAD"); + + // LSN ordering + for (String s : new String[]{"cfp-lv", "cfp-avad", "spark-lv", "spark-avad"}) { + failures += checkLsnOrdering(s); + } + + // CRTS ordering (AVAD only) + failures += checkCrtsOrdering("cfp-avad"); + failures += checkCrtsOrdering("spark-avad"); + + // previousImage (AVAD only) + failures += checkPreviousImage("cfp-avad"); + failures += checkPreviousImage("spark-avad"); + + logDuplicates(); + + log.info("=== Suite Complete: {} failures ===", failures); + return failures > 0 ? 1 : 0; + } + + /** Reconcile a single source pair. Auto-selects checks by source types. */ + public int reconcilePair(String source, String against) { + log.info("=== Reconcile: {} → {} ===", source, against); + int failures = 0; + + failures += checkGaps(source, against, source + " → " + against); + + // LSN ordering on the consumer side + if (!against.equals("ingestor")) { + failures += checkLsnOrdering(against); + } + + // AVAD-specific checks + if (against.endsWith("-avad")) { + failures += checkCrtsOrdering(against); + failures += checkPreviousImage(against); + } + + return failures > 0 ? 1 : 0; + } + + /** Q1: Summary dashboard — count, unique, min/max seq/lsn per source */ + private void logSummary() { + log.info("=== Summary Dashboard ==="); + String query = "SELECT c.source, COUNT(1) AS total, " + + "COUNT(DISTINCT c.correlationId) AS uniqueIds, " + + "MIN(c.seqNo) AS minSeq, MAX(c.seqNo) AS maxSeq, " + + "MIN(c.lsn) AS minLsn, MAX(c.lsn) AS maxLsn " + + "FROM c GROUP BY c.source"; + + container.queryItems(query, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(100) + .timeout(QUERY_TIMEOUT) + .toIterable() + .forEach(page -> { + for (JsonNode row : page.getResults()) { + log.info(" source={}, total={}, unique={}, seq=[{},{}], lsn=[{},{}]", + row.path("source").asText(), + row.path("total").asLong(), + row.path("uniqueIds").asLong(), + row.path("minSeq").asLong(), + row.path("maxSeq").asLong(), + row.path("minLsn").asLong(), + row.path("maxLsn").asLong()); + } + }); + } + + /** Q2/Q3/Q4: Gap detection — every correlationId in sourceA must exist in sourceB */ + private int checkGaps(String sourceA, String sourceB, String label) { + log.info("=== Gap Check: {} ===", label); + + Set idsA = loadCorrelationIds(sourceA); + Set idsB = loadCorrelationIds(sourceB); + + if (idsA.isEmpty()) { + log.info(" SKIP: {} has no data yet", sourceA); + return 0; + } + if (idsB.isEmpty()) { + log.info(" SKIP: {} has no data yet", sourceB); + return 0; + } + + Set missing = new HashSet<>(idsA); + missing.removeAll(idsB); + + log.info(" {} ids={}, {} ids={}, missing={}", sourceA, idsA.size(), sourceB, idsB.size(), missing.size()); + + if (!missing.isEmpty()) { + log.error("❌ {} GAPS DETECTED:", label); + missing.stream().limit(50).forEach(id -> log.error(" missing: {}", id)); + if (missing.size() > 50) { + log.error(" ... and {} more", missing.size() - 50); + } + } else { + log.info("✅ {} — no gaps", label); + } + + return missing.size(); + } + + /** Q5: LSN ordering — per partition, sorted by seqNo, LSN must be non-decreasing */ + private int checkLsnOrdering(String source) { + log.info("=== LSN Ordering: {} ===", source); + Map> events = loadEventsForOrdering(source, "lsn"); + + if (events.isEmpty()) { + log.info(" SKIP: {} has no LSN data", source); + return 0; + } + + int violations = 0; + for (Map.Entry> entry : events.entrySet()) { + String pk = entry.getKey(); + List records = entry.getValue(); + records.sort(Comparator.comparingLong(r -> r[0])); + + long prev = -1; + for (long[] record : records) { + if (prev > 0 && record[1] < prev) { + violations++; + if (violations <= 10) { + log.warn(" LSN violation: pk={}, seqNo={}, prevLsn={}, currLsn={}", + pk, record[0], prev, record[1]); + } + } + prev = record[1]; + } + } + + log.info(" LSN violations: {} (across {} partitions)", violations, events.size()); + return violations; + } + + /** Q6: CRTS ordering — per partition, sorted by seqNo, CRTS must be non-decreasing */ + private int checkCrtsOrdering(String source) { + log.info("=== CRTS Ordering: {} ===", source); + Map> events = loadEventsForOrdering(source, "crts"); + + if (events.isEmpty()) { + log.info(" SKIP: {} has no CRTS data", source); + return 0; + } + + int violations = 0; + for (Map.Entry> entry : events.entrySet()) { + String pk = entry.getKey(); + List records = entry.getValue(); + records.sort(Comparator.comparingLong(r -> r[0])); + + long prev = -1; + for (long[] record : records) { + if (prev > 0 && record[1] < prev) { + violations++; + if (violations <= 10) { + log.warn(" CRTS violation: pk={}, seqNo={}, prevCrts={}, currCrts={}", + pk, record[0], prev, record[1]); + } + } + prev = record[1]; + } + } + + log.info(" CRTS violations: {} (across {} partitions)", violations, events.size()); + return violations; + } + + /** Q7: previousImage — replace/delete with hasPreviousImage=false must be 0 */ + private int checkPreviousImage(String source) { + log.info("=== Previous Image Check: {} ===", source); + + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT VALUE COUNT(1) FROM c WHERE c.source = @source " + + "AND c.opType IN ('replace', 'delete') AND c.hasPreviousImage = false", + Collections.singletonList(new SqlParameter("@source", source))); + + long count = container.queryItems(querySpec, new CosmosQueryRequestOptions(), Long.class) + .byPage(1) + .timeout(QUERY_TIMEOUT) + .toIterable() + .iterator().next() + .getResults() + .stream() + .findFirst() + .orElse(0L); + + if (count > 0) { + log.error("❌ {} missing previousImage on {} replace/delete events", source, count); + } else { + log.info("✅ {} — all replace/delete have previousImage", source); + } + + return (int) count; + } + + /** Q8: Duplicate detection — total vs unique correlationIds per source */ + private void logDuplicates() { + log.info("=== Duplicate Detection ==="); + String query = "SELECT c.source, COUNT(1) AS total, " + + "COUNT(DISTINCT c.correlationId) AS uniqueIds " + + "FROM c GROUP BY c.source"; + + container.queryItems(query, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(100) + .timeout(QUERY_TIMEOUT) + .toIterable() + .forEach(page -> { + for (JsonNode row : page.getResults()) { + long total = row.path("total").asLong(); + long unique = row.path("uniqueIds").asLong(); + long duplicates = total - unique; + log.info(" source={}, total={}, unique={}, duplicates={}", + row.path("source").asText(), total, unique, duplicates); + } + }); + } + + /** Helper: load all distinct correlationIds for a source */ + private Set loadCorrelationIds(String source) { + SqlQuerySpec querySpec = new SqlQuerySpec( + "SELECT DISTINCT c.correlationId FROM c WHERE c.source = @source", + Collections.singletonList(new SqlParameter("@source", source))); + + Set ids = new HashSet<>(); + + container.queryItems(querySpec, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(1000) + .timeout(QUERY_TIMEOUT) + .toIterable() + .forEach(page -> { + for (JsonNode row : page.getResults()) { + String cid = row.path("correlationId").asText(""); + if (!cid.isEmpty()) { + ids.add(cid); + } + } + }); + + return ids; + } + + /** Helper: load events for ordering checks */ + private Map> loadEventsForOrdering(String source, String field) { + String query = "SELECT c.seqNo, c." + field + ", c.partitionKey FROM c " + + "WHERE c.source = @source AND c." + field + " >= 0"; + + SqlQuerySpec querySpec = new SqlQuerySpec(query, + Collections.singletonList(new SqlParameter("@source", source))); + + Map> result = new HashMap<>(); + + container.queryItems(querySpec, new CosmosQueryRequestOptions(), JsonNode.class) + .byPage(1000) + .timeout(QUERY_TIMEOUT) + .toIterable() + .forEach(page -> { + for (JsonNode row : page.getResults()) { + String pk = row.path("partitionKey").asText(""); + long seqNo = row.path("seqNo").asLong(); + long fieldValue = row.path(field).asLong(); + + result.computeIfAbsent(pk, k -> new ArrayList<>()) + .add(new long[]{seqNo, fieldValue}); + } + }); + + return result; + } + + @Override + public void close() { + client.close(); + log.info("Reconciler closed"); + } +} diff --git a/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/ReconciliationWriter.java b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/ReconciliationWriter.java new file mode 100644 index 000000000000..c96d602364f6 --- /dev/null +++ b/sdk/cosmos/azure-cosmos-benchmark/src/main/java/com/azure/cosmos/avadtest/reconciliation/ReconciliationWriter.java @@ -0,0 +1,124 @@ +package com.azure.cosmos.avadtest.reconciliation; + +import com.azure.cosmos.CosmosAsyncClient; +import com.azure.cosmos.CosmosAsyncContainer; +import com.azure.cosmos.CosmosException; +import com.azure.cosmos.models.CosmosBulkExecutionOptions; +import com.azure.cosmos.models.CosmosBulkOperationResponse; +import com.azure.cosmos.models.CosmosBulkOperations; +import com.azure.cosmos.models.CosmosItemOperation; +import com.azure.cosmos.models.PartitionKey; +import com.fasterxml.jackson.databind.node.JsonNodeFactory; +import com.fasterxml.jackson.databind.node.ObjectNode; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; +import reactor.core.publisher.Flux; + +import java.time.Instant; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.LongAdder; + +/** + * Writes reconciliation events to a shared Cosmos container using bulk upsert. + * Collects events via {@link #add}, then flushes them as a single bulk batch + * via {@link #flush}. The caller must flush after processing each CFP batch + * to ensure all recon records are persisted before the lease checkpoints. + * + * Container: "reconciliation" in same database + * Partition key: /correlationId + */ +public final class ReconciliationWriter implements AutoCloseable { + + private static final Logger log = LoggerFactory.getLogger(ReconciliationWriter.class); + private static final String RECONCILIATION_CONTAINER = "reconciliation"; + + private final String source; + private final CosmosAsyncContainer container; + private final CosmosBulkExecutionOptions bulkOptions = new CosmosBulkExecutionOptions(); + private final LongAdder writeCount = new LongAdder(); + private final LongAdder errorCount = new LongAdder(); + + // Batch buffer — not thread-safe; callers (handleChanges) are single-threaded per partition + private final List pending = new ArrayList<>(); + + /** + * @param client shared CosmosAsyncClient — caller owns lifecycle + * @param database database name + * @param source source identifier for reconciliation docs + */ + public ReconciliationWriter(CosmosAsyncClient client, String database, String source) { + this.source = source; + + this.container = client + .getDatabase(database) + .getContainer(RECONCILIATION_CONTAINER); + + log.info("ReconciliationWriter initialized: source={}, container={}", + source, RECONCILIATION_CONTAINER); + } + + /** + * Buffer a reconciliation event for the next bulk flush. + * Does not write to Cosmos — call {@link #flush} after the batch. + */ + public void record(String eventId, long seqNo, String opType, + String partitionKey, long lsn, boolean hasPreviousImage, long crts) { + ObjectNode doc = JsonNodeFactory.instance.objectNode(); + doc.put("id", source + "-" + eventId); + doc.put("correlationId", eventId); + doc.put("source", source); + doc.put("seqNo", seqNo); + doc.put("opType", opType); + doc.put("partitionKey", partitionKey); + doc.put("lsn", lsn); + doc.put("hasPreviousImage", hasPreviousImage); + doc.put("crts", crts); + doc.put("timestamp", Instant.now().toString()); + + pending.add(CosmosBulkOperations.getUpsertItemOperation(doc, new PartitionKey(eventId))); + } + + /** + * Flush all buffered events to the reconciliation container via bulk upsert. + * Blocks until all writes complete. Throws on any permanent failure so + * CFP's handleChanges fails and the lease does not advance. + */ + public void flush() { + if (pending.isEmpty()) return; + + List batch = new ArrayList<>(pending); + pending.clear(); + + List failures = new ArrayList<>(); + + container.executeBulkOperations(Flux.fromIterable(batch), bulkOptions) + .toStream() + .forEach(response -> { + if (response.getResponse() != null && response.getResponse().isSuccessStatusCode()) { + writeCount.increment(); + } else { + errorCount.increment(); + int status = response.getResponse() != null ? response.getResponse().getStatusCode() : -1; + String id = response.getOperation().getId(); + failures.add("id=" + id + " status=" + status); + } + }); + + if (!failures.isEmpty()) { + throw new RuntimeException( + "Reconciliation bulk flush failed: " + failures.size() + " errors. First: " + failures.get(0)); + } + } + + public long getWriteCount() { return writeCount.sum(); } + public long getErrorCount() { return errorCount.sum(); } + + @Override + public void close() { + // Flush any remaining buffered events + try { flush(); } catch (Exception e) { log.warn("Final flush failed: {}", e.getMessage()); } + log.info("ReconciliationWriter closed: source={}, writes={}, errors={}", + source, writeCount.sum(), errorCount.sum()); + } +}