Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,7 @@ private[spark] object CosmosConfigNames {
val WriteBulkInitialBatchSize = "spark.cosmos.write.bulk.initialBatchSize"
val WriteBulkTransactionalMaxOperationsConcurrency = "spark.cosmos.write.bulk.transactional.maxOperationsConcurrency"
val WriteBulkTransactionalMaxBatchesConcurrency = "spark.cosmos.write.bulk.transactional.maxBatchesConcurrency"
val WriteBulkTransactionalMarkerTtlSeconds = "spark.cosmos.write.bulk.transactional.marker.ttlSeconds"
val WritePointMaxConcurrency = "spark.cosmos.write.point.maxConcurrency"
val WritePatchDefaultOperationType = "spark.cosmos.write.patch.defaultOperationType"
val WritePatchColumnConfigs = "spark.cosmos.write.patch.columnConfigs"
Expand Down Expand Up @@ -1509,7 +1510,8 @@ private case class CosmosWriteBulkExecutionConfigs(
private case class CosmosWriteTransactionalBulkExecutionConfigs(
maxConcurrentCosmosPartitions: Option[Int] = None,
maxConcurrentOperations: Option[Int] = None,
maxConcurrentBatches: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase
maxConcurrentBatches: Option[Int] = None,
markerTtlSeconds: Option[Int] = None) extends CosmosWriteBulkExecutionConfigsBase

private object CosmosWriteConfig {
private val DefaultMaxRetryCount = 10
Expand Down Expand Up @@ -1601,6 +1603,22 @@ private object CosmosWriteConfig {
helpMessage = "Max concurrent transactional batches per Cosmos partition (1..5). Controls batch-level parallelism; default 5." +
"Each batch may contain multiple operations; tune together with 'spark.cosmos.write.bulk.transactional.maxOperationsConcurrency' to balance throughput and throttling.")

private val bulkTransactionalMarkerTtlSeconds = CosmosConfigEntry[Int](
key = CosmosConfigNames.WriteBulkTransactionalMarkerTtlSeconds,
defaultValue = Option.apply(86400),
mandatory = false,
parseFromStringFunction = ttlSeconds => {
val value = ttlSeconds.toInt
if (value <= 0) {
throw new IllegalArgumentException(
s"'${CosmosConfigNames.WriteBulkTransactionalMarkerTtlSeconds}' must be a positive number of seconds, but was $value.")
}
value
},
helpMessage = "TTL in seconds for batch marker documents used for retry ambiguity resolution in transactional bulk mode. " +
"Markers are actively deleted after each batch completes; TTL is defense-in-depth for orphan cleanup from crashed runs. " +
"Default: 86400 (24 hours). Set to a lower value (e.g., 3600) if container storage is constrained.")

private val pointWriteConcurrency = CosmosConfigEntry[Int](key = CosmosConfigNames.WritePointMaxConcurrency,
mandatory = false,
parseFromStringFunction = bulkMaxConcurrencyAsString => bulkMaxConcurrencyAsString.toInt,
Expand Down Expand Up @@ -1844,18 +1862,17 @@ private object CosmosWriteConfig {
if (bulkEnabledOpt.isDefined && bulkEnabledOpt.get) {

if (bulkTransactionalOpt.isDefined && bulkTransactionalOpt.get) {
// Validate write strategy for transactional batches
assert(itemWriteStrategyOpt.get == ItemWriteStrategy.ItemOverwrite,
s"Transactional batches only support ItemOverwrite (upsert) write strategy. Requested: ${itemWriteStrategyOpt.get}")

val maxConcurrentCosmosPartitionsOpt = CosmosConfigEntry.parse(cfg, bulkMaxConcurrentPartitions)
val maxBulkTransactionalOpsConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxOpsConcurrency)
val maxBulkTransactionalBatchesConcurrencyOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMaxBatchesConcurrency)
val markerTtlSecondsOpt = CosmosConfigEntry.parse(cfg, bulkTransactionalMarkerTtlSeconds)

bulkExecutionConfigsOpt = Some(CosmosWriteTransactionalBulkExecutionConfigs(
maxConcurrentCosmosPartitionsOpt,
maxBulkTransactionalOpsConcurrencyOpt,
maxBulkTransactionalBatchesConcurrencyOpt
maxBulkTransactionalBatchesConcurrencyOpt,
markerTtlSecondsOpt
))

} else {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -42,9 +42,11 @@ private class CosmosPatchHelper(diagnosticsConfig: DiagnosticsConfig,
// There are some properties are immutable, these kind properties include:
// 1. System properties : _ts, _rid, _etag
// 2. id, and partitionKeyPath
if ((path.startsWith("/") && !systemProperties.contains(path.substring(1)) && IdAttributeName != path.substring(1))
&& !StringUtils.join(partitionKeyDefinition.getPaths, "").contains(path)) {
true
if (path.startsWith("/") && !systemProperties.contains(path.substring(1)) && IdAttributeName != path.substring(1)) {
// Check each partition key path individually with exact match to avoid false positives.
// e.g., "/tenant" was blocked because it's
// a substring of "/tenantId" in the joined string "/tenantId/userId/sessionId".
!partitionKeyDefinition.getPaths.contains(path)
} else {
false
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -69,6 +69,7 @@ private abstract class CosmosWriterBase(
new TransactionalBulkWriter(
container,
cosmosTargetContainerConfig,
partitionKeyDefinition,
cosmosWriteConfig,
diagnosticsConfig,
getOutputMetricsPublisher(),
Expand Down Expand Up @@ -153,6 +154,7 @@ private abstract class CosmosWriterBase(
new TransactionalBulkWriter(
container,
cosmosTargetContainerConfig,
partitionKeyDefinition,
cosmosWriteConfig,
diagnosticsConfig,
getOutputMetricsPublisher(),
Expand Down
Loading
Loading