Skip to content

Extend TransactionalBulkWriter with Additional Write Strategies#48422

Open
dibahlfi wants to merge 10 commits intomainfrom
users/dibahl/transactional-bulk-support
Open

Extend TransactionalBulkWriter with Additional Write Strategies#48422
dibahlfi wants to merge 10 commits intomainfrom
users/dibahl/transactional-bulk-support

Conversation

@dibahlfi
Copy link
Member

This PR extends TransactionalBulkWriter in the Azure Cosmos DB Spark Connector to support additional write strategies (up from only ItemOverwrite) and introduces a batch marker document mechanism to eliminate retry ambiguity — preventing silent data loss (false positives) and spurious failures (false negatives) on transient error retries.

New Write Strategies
Previously, TransactionalBulkWriter only supported ItemOverwrite (upsert). This PR adds:

Strategy Cosmos DB Operation shouldIgnore on Retry
ItemOverwrite upsertItemOperation None (upsert always succeeds)
ItemAppend createItemOperation 409 Conflict
ItemDelete deleteItemOperation 404 Not Found
ItemDeleteIfNotModified deleteItemOperation + ETag 404 Not Found (412 excluded)
ItemOverwriteIfNotModified replaceItemOperation + ETag (or createItemOperation if no ETag) 409 Conflict, 404 Not Found (412 excluded)
ItemPatch patchItemOperation None
ItemPatchIfExists patchItemOperation 404 Not Found
``

ItemBulkUpdate is deferred to the next PR — it requires ReadMany infrastructure to fetch current ETags before building the batch.

Batch Marker Document — Eliminating Retry Ambiguity
The problem: After a transient error (408 timeout, 500, 503), the client cannot know whether the server committed the batch. On retry, a shouldIgnore-eligible response (e.g., 409 "item already exists" for ItemAppend) is ambiguous — it could mean our original batch committed (the item exists because we created it) or an external process created the item (our batch never committed). Getting this wrong has serious consequences:

False positive (infer SUCCESS when batch never committed): All N items in the batch are silently lost.
Example: Batch contains [create doc1, create doc2, create doc3] with ItemAppend. Attempt 1 gets a 408 timeout — the batch never committed. On retry, an external process happened to create doc1 independently. The retry sees 409 on doc1 and infers "our batch already committed" → reports SUCCESS. But doc2 and doc3 were never written. Silent data loss of 2 items.

False negative (infer FAIL when batch actually committed): The customer sees a spurious error even though the data was written correctly.
Example: Same batch [create doc1, create doc2, create doc3]. Attempt 1 gets a 408 timeout — but the batch actually DID commit on the server. On retry, doc1 returns 409. Without verification, the connector conservatively FAILs. The customer's job reports an error even though all 3 documents were successfully written.
The solution: Every transactional batch includes a marker document as the last upsert operation. The marker is a minimal ~100-byte document (id + partition key fields + ttl) with a deterministic ID (__tbw:{jobRunId}:{sparkPartitionId}:{batchSeq}). On ambiguous retry, the connector performs a point-read of the marker:

Marker present -> batch committed -> SUCCESS (then delete marker)
Marker absent -> batch did NOT commit -> FAIL
Marker read fails (transient) -> consume a retry attempt
This approach is deterministic — only our batch creates the marker, so an external process cannot produce a false signal.

RU cost: The marker adds minimal overhead:
Every batch: The marker upsert is included inside the transactional batch request — it consumes 1 of the 100 operation slots (effective limit: 99 business operations per batch) but does NOT cost an additional network round-trip or separate RU charge. The batch RU cost increases by ~5-10 RU for the extra ~100-byte document.
Every successful batch: 1 point-delete for cleanup .
Ambiguous retries only (rare): 1 point-read for verification + 1 point-delete for cleanup.

TTL and Marker Cleanup
Primary cleanup: Active deletion after every terminal outcome (SUCCESS or FAIL). The marker is deleted via a best-effort container.deleteItem() call — fire-and-forget, no Spark-level retry (the Java SDK still applies its own internal retry policies for 429/410).
The marker carries a ttl field (default 24 hours, configurable via spark.cosmos.write.bulk.transactional.marker.ttlSeconds). If the container has TTL enabled, orphan markers from crashed runs auto-expire.
If TTL is not enabled: The connector logs an INFO message at startup. Active deletion is the primary cleanup mechanism — TTL is not required for correctness. If a marker's active delete fails (rare), it persists as a ~100-byte orphan. No correctness impact — orphan markers are never read by subsequent runs (new runs create new marker IDs).
100-item edge case: If a batch already has 100 business items (Cosmos DB server limit), the marker is skipped. That batch falls back to shouldIgnore-only inference (index 0 check) without marker verification.

Bug Fixes

Issue Fix
transactionalBatchPartitionKeyScheduled duplicate PK guard was a no-op because PartitionKey.hashCode() uses Object.hashCode() (identity-based, not value-based) Changed the ConcurrentHashMap key from PartitionKey to String using PartitionKey.toString()
CosmosPatchHelper.isAllowedProperty produced false positives with HPK — substring match on concatenated PK paths blocked legitimate fields like /tenant (substring of /tenantId) Replaced StringUtils.join(...).contains(path) with partitionKeyDefinition.getPaths().contains(path) (exact match per path)
flushAndClose re-enqueue could cause double-execution of non-idempotent operations (e.g., increment patch) Added isIdempotent flag to OperationContext; re-enqueue skipped for non-idempotent batches

Trade-offs
Marker visibility in queries: Between batch commit and active deletion (~milliseconds), marker documents exist in the container and could appear in broad queries (SELECT * FROM c). The marker has a __tbw: prefix in its id field. Customers running unfiltered queries during active writes may see these transient documents. In practice, this window is extremely short and the markers are deleted immediately after each batch completes.

Orphan markers from crashes: If a Spark task is killed (OOM, YARN preemption) before the active delete runs, in-flight markers become orphans. Worst case with HPK high-cardinality workloads: 50,000 batches × 4 Spark task retries × 3 no-progress recovery cycles = 600,000 orphans (~120 MB) — but only if every task crashes AND TTL is disabled. In practice, most batches succeed and markers are deleted immediately.

412 excluded from shouldIgnore: For strategies that use ETags (ItemDeleteIfNotModified, ItemOverwriteIfNotModified), 412 Precondition Failed is never treated as a shouldIgnore-eligible status code — it always results in FAIL. For normal batches (with marker), this is defense-in-depth — if 412 were included, the marker verification would resolve the ambiguity correctly, but we exclude it anyway for simplicity and consistency. For batches with exactly 100 items (no marker), the exclusion is required for correctness — without a marker, there is no way to distinguish "our batch committed and the item now has a new ETag" from "an external process modified the item." The trade-off: some retriable 412 scenarios produce a spurious failure that would succeed on Spark task retry.

Non-idempotent patch (increment) double-application: On transient error retry, an increment patch may be applied twice if the original batch committed but the response was lost. This matches BulkWriter's existing behavior.

Configuration

Property Default Description
spark.cosmos.write.bulk.transactional.marker.ttlSeconds 86400 (24h) TTL for marker documents. Primary cleanup is active deletion; TTL acts as defense‑in‑depth to clean up orphaned marker documents.

@dibahlfi dibahlfi requested review from a team and kirankumarkolli as code owners March 16, 2026 01:31
Copilot AI review requested due to automatic review settings March 16, 2026 01:31
Copy link
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR extends the Cosmos Spark connector’s transactional bulk support by adding more write strategies beyond pure upsert, and introduces a marker-document mechanism to disambiguate ambiguous retry outcomes for transactional batches. It also adjusts patch immutability checks and expands test coverage (unit + integration/e2e).

Changes:

  • Add transactional support for additional write strategies (append/create, delete, conditional delete/replace, patch) and enhance retry/ignore handling.
  • Introduce per-batch marker documents (with TTL + best-effort cleanup) to verify commit vs rollback on ambiguous retries.
  • Update/expand integration and E2E tests to cover the newly supported strategies and marker behavior.

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.

Show a summary per file
File Description
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala Adds multi-strategy transactional batch construction, marker-based commit verification, recovery changes, and retry/ignore logic updates.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosWriterBase.scala Passes PartitionKeyDefinition into TransactionalBulkWriter for marker construction/patch behavior.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosPatchHelper.scala Changes PK-path immutability check to use exact match rather than substring match.
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala Adds config for marker TTL and relaxes transactional write-strategy validation.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala Updates integration tests to assert transactional acceptance for ItemAppend/Delete/OverwriteIfNotModified.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2ETransactionalBulkWriterITest.scala Adds E2E transactional tests per strategy + atomicity/error cases + marker cleanup verification.
sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBulkWriterSpec.scala Adds a large unit-test-style spec covering strategy mapping, retry/ignore patterns, marker patterns, and PK keying behavior assumptions.

Comment on lines +1046 to +1051
partitionKeyPaths.foreach(path => {
val fieldName = path.stripPrefix("/")
val value = firstBusinessItem.get(fieldName)
if (value != null) {
markerNode.set(fieldName, value.deepCopy())
}
Comment on lines +488 to +491
log.logError(s"Partition key value '$partitionKeyString' has already been scheduled in this writer instance. " +
s"This indicates a bug in the data distribution or ordering pipeline. " +
s"Atomicity guarantee may be violated for this partition key value. " +
s"Context: ${operationContext.toString} $getThreadInfo")
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants