Extend TransactionalBulkWriter with Additional Write Strategies#48422
Open
Extend TransactionalBulkWriter with Additional Write Strategies#48422
Conversation
Contributor
There was a problem hiding this comment.
Pull request overview
This PR extends the Cosmos Spark connector’s transactional bulk support by adding more write strategies beyond pure upsert, and introduces a marker-document mechanism to disambiguate ambiguous retry outcomes for transactional batches. It also adjusts patch immutability checks and expands test coverage (unit + integration/e2e).
Changes:
- Add transactional support for additional write strategies (append/create, delete, conditional delete/replace, patch) and enhance retry/ignore handling.
- Introduce per-batch marker documents (with TTL + best-effort cleanup) to verify commit vs rollback on ambiguous retries.
- Update/expand integration and E2E tests to cover the newly supported strategies and marker behavior.
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 9 comments.
Show a summary per file
| File | Description |
|---|---|
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala | Adds multi-strategy transactional batch construction, marker-based commit verification, recovery changes, and retry/ignore logic updates. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosWriterBase.scala | Passes PartitionKeyDefinition into TransactionalBulkWriter for marker construction/patch behavior. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosPatchHelper.scala | Changes PK-path immutability check to use exact match rather than substring match. |
| sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala | Adds config for marker TTL and relaxes transactional write-strategy validation. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBatchITest.scala | Updates integration tests to assert transactional acceptance for ItemAppend/Delete/OverwriteIfNotModified. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/SparkE2ETransactionalBulkWriterITest.scala | Adds E2E transactional tests per strategy + atomicity/error cases + marker cleanup verification. |
| sdk/cosmos/azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBulkWriterSpec.scala | Adds a large unit-test-style spec covering strategy mapping, retry/ignore patterns, marker patterns, and PK keying behavior assumptions. |
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Show resolved
Hide resolved
Comment on lines
+1046
to
+1051
| partitionKeyPaths.foreach(path => { | ||
| val fieldName = path.stripPrefix("/") | ||
| val value = firstBusinessItem.get(fieldName) | ||
| if (value != null) { | ||
| markerNode.set(fieldName, value.deepCopy()) | ||
| } |
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Show resolved
Hide resolved
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Outdated
Show resolved
Hide resolved
sdk/cosmos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/CosmosConfig.scala
Outdated
Show resolved
Hide resolved
...azure-cosmos-spark_3/src/test/scala/com/azure/cosmos/spark/TransactionalBulkWriterSpec.scala
Outdated
Show resolved
Hide resolved
Comment on lines
+488
to
+491
| log.logError(s"Partition key value '$partitionKeyString' has already been scheduled in this writer instance. " + | ||
| s"This indicates a bug in the data distribution or ordering pipeline. " + | ||
| s"Atomicity guarantee may be violated for this partition key value. " + | ||
| s"Context: ${operationContext.toString} $getThreadInfo") |
...mos/azure-cosmos-spark_3/src/main/scala/com/azure/cosmos/spark/TransactionalBulkWriter.scala
Show resolved
Hide resolved
This file contains hidden or bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Sign up for free
to join this conversation on GitHub.
Already have an account?
Sign in to comment
Add this suggestion to a batch that can be applied as a single commit.This suggestion is invalid because no changes were made to the code.Suggestions cannot be applied while the pull request is closed.Suggestions cannot be applied while viewing a subset of changes.Only one suggestion per line can be applied in a batch.Add this suggestion to a batch that can be applied as a single commit.Applying suggestions on deleted lines is not supported.You must change the existing code in this line in order to create a valid suggestion.Outdated suggestions cannot be applied.This suggestion has been applied or marked resolved.Suggestions cannot be applied from pending reviews.Suggestions cannot be applied on multi-line comments.Suggestions cannot be applied while the pull request is queued to merge.Suggestion cannot be applied right now. Please check back later.
This PR extends TransactionalBulkWriter in the Azure Cosmos DB Spark Connector to support additional write strategies (up from only ItemOverwrite) and introduces a batch marker document mechanism to eliminate retry ambiguity — preventing silent data loss (false positives) and spurious failures (false negatives) on transient error retries.
New Write Strategies
Previously, TransactionalBulkWriter only supported ItemOverwrite (upsert). This PR adds:
ItemBulkUpdate is deferred to the next PR — it requires ReadMany infrastructure to fetch current ETags before building the batch.
Batch Marker Document — Eliminating Retry Ambiguity
The problem: After a transient error (408 timeout, 500, 503), the client cannot know whether the server committed the batch. On retry, a shouldIgnore-eligible response (e.g., 409 "item already exists" for ItemAppend) is ambiguous — it could mean our original batch committed (the item exists because we created it) or an external process created the item (our batch never committed). Getting this wrong has serious consequences:
False positive (infer SUCCESS when batch never committed): All N items in the batch are silently lost.
Example: Batch contains [create doc1, create doc2, create doc3] with ItemAppend. Attempt 1 gets a 408 timeout — the batch never committed. On retry, an external process happened to create doc1 independently. The retry sees 409 on doc1 and infers "our batch already committed" → reports SUCCESS. But doc2 and doc3 were never written. Silent data loss of 2 items.
False negative (infer FAIL when batch actually committed): The customer sees a spurious error even though the data was written correctly.
Example: Same batch [create doc1, create doc2, create doc3]. Attempt 1 gets a 408 timeout — but the batch actually DID commit on the server. On retry, doc1 returns 409. Without verification, the connector conservatively FAILs. The customer's job reports an error even though all 3 documents were successfully written.
The solution: Every transactional batch includes a marker document as the last upsert operation. The marker is a minimal ~100-byte document (id + partition key fields + ttl) with a deterministic ID (__tbw:{jobRunId}:{sparkPartitionId}:{batchSeq}). On ambiguous retry, the connector performs a point-read of the marker:
Marker present -> batch committed -> SUCCESS (then delete marker)
Marker absent -> batch did NOT commit -> FAIL
Marker read fails (transient) -> consume a retry attempt
This approach is deterministic — only our batch creates the marker, so an external process cannot produce a false signal.
RU cost: The marker adds minimal overhead:
Every batch: The marker upsert is included inside the transactional batch request — it consumes 1 of the 100 operation slots (effective limit: 99 business operations per batch) but does NOT cost an additional network round-trip or separate RU charge. The batch RU cost increases by ~5-10 RU for the extra ~100-byte document.
Every successful batch: 1 point-delete for cleanup .
Ambiguous retries only (rare): 1 point-read for verification + 1 point-delete for cleanup.
TTL and Marker Cleanup
Primary cleanup: Active deletion after every terminal outcome (SUCCESS or FAIL). The marker is deleted via a best-effort container.deleteItem() call — fire-and-forget, no Spark-level retry (the Java SDK still applies its own internal retry policies for 429/410).
The marker carries a ttl field (default 24 hours, configurable via spark.cosmos.write.bulk.transactional.marker.ttlSeconds). If the container has TTL enabled, orphan markers from crashed runs auto-expire.
If TTL is not enabled: The connector logs an INFO message at startup. Active deletion is the primary cleanup mechanism — TTL is not required for correctness. If a marker's active delete fails (rare), it persists as a ~100-byte orphan. No correctness impact — orphan markers are never read by subsequent runs (new runs create new marker IDs).
100-item edge case: If a batch already has 100 business items (Cosmos DB server limit), the marker is skipped. That batch falls back to shouldIgnore-only inference (index 0 check) without marker verification.
Bug Fixes
PartitionKey.hashCode()usesObject.hashCode()(identity-based, not value-based)ConcurrentHashMapkey fromPartitionKeytoStringusingPartitionKey.toString()CosmosPatchHelper.isAllowedPropertyproduced false positives with HPK — substring match on concatenated PK paths blocked legitimate fields like/tenant(substring of/tenantId)StringUtils.join(...).contains(path)withpartitionKeyDefinition.getPaths().contains(path)(exact match per path)flushAndClosere-enqueue could cause double-execution of non-idempotent operations (e.g., increment patch)isIdempotentflag toOperationContext; re-enqueue skipped for non-idempotent batchesTrade-offs
Marker visibility in queries: Between batch commit and active deletion (~milliseconds), marker documents exist in the container and could appear in broad queries (SELECT * FROM c). The marker has a __tbw: prefix in its id field. Customers running unfiltered queries during active writes may see these transient documents. In practice, this window is extremely short and the markers are deleted immediately after each batch completes.
Orphan markers from crashes: If a Spark task is killed (OOM, YARN preemption) before the active delete runs, in-flight markers become orphans. Worst case with HPK high-cardinality workloads: 50,000 batches × 4 Spark task retries × 3 no-progress recovery cycles = 600,000 orphans (~120 MB) — but only if every task crashes AND TTL is disabled. In practice, most batches succeed and markers are deleted immediately.
412 excluded from shouldIgnore: For strategies that use ETags (ItemDeleteIfNotModified, ItemOverwriteIfNotModified), 412 Precondition Failed is never treated as a shouldIgnore-eligible status code — it always results in FAIL. For normal batches (with marker), this is defense-in-depth — if 412 were included, the marker verification would resolve the ambiguity correctly, but we exclude it anyway for simplicity and consistency. For batches with exactly 100 items (no marker), the exclusion is required for correctness — without a marker, there is no way to distinguish "our batch committed and the item now has a new ETag" from "an external process modified the item." The trade-off: some retriable 412 scenarios produce a spurious failure that would succeed on Spark task retry.
Non-idempotent patch (increment) double-application: On transient error retry, an increment patch may be applied twice if the original batch committed but the response was lost. This matches BulkWriter's existing behavior.
Configuration
spark.cosmos.write.bulk.transactional.marker.ttlSeconds86400(24h)