Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -1583,6 +1583,18 @@ class RocksDB(
originalEndKey
}

// Count the keys that are about to be deleted (gated on trackTotalNumberOfRows).
if (conf.trackTotalNumberOfRows) {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

deleteRange in RocksDB is O(1) - it writes a single range tombstone without touching individual keys. Scanning the full range with an iterator to count keys makes this O(n). For TTL-based state expiration (the primary deleteRange use case in structured streaming), ranges can contain millions of expired keys. When trackTotalNumberOfRows is enabled (the default), every deleteRange now iterates the full range before deleting it. Worth documenting this cost tradeoff, or considering RocksDB's approximate-count APIs (GetApproximateSizes) as an alternative for large ranges.

val (deletedKeys, deletedInternalKeys) = countKeysInRange(
beginKeyWithPrefix,
endKeyWithPrefix,
cfName,
includesPrefix
)
numKeysOnWritingVersion -= deletedKeys

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Correctness concern: The counters are decremented before db.deleteRange() executes. If deleteRange throws (RocksDB write stall, disk full, or a closed-DB exception), the keys still exist but the counters have already been reduced - permanently under-counting for the rest of this version. Move the decrement after the successful db.deleteRange() call.

numInternalKeysOnWritingVersion -= deletedInternalKeys
}

db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix)
changelogWriter.foreach { writer =>
val endKeyForChangelog = if (conf.rowChecksumEnabled) {
Expand All @@ -1593,7 +1605,63 @@ class RocksDB(
}
writer.deleteRange(beginKeyWithPrefix, endKeyForChangelog)
}
// TODO: Add metrics update for deleteRange operations
}

/** Count the keys currently present in `[beginKeyWithPrefix,
* endKeyWithPrefix)`, split into non-internal vs internal column-family
* buckets.
*
* @param beginKeyWithPrefix
* Already-prefixed start key (inclusive)
* @param endKeyWithPrefix
* Already-prefixed end key (exclusive); used as the iterator upper bound
* so we never touch keys beyond it.
* @param cfName
* Caller-supplied column family. Used when `includesPrefix` is false;
* @param includesPrefix
* When true, the keys already carry their cfId prefix.
* @return
* (numKeys, numInternalKeys) - the counts to subtract from
* `numKeysOnWritingVersion` and `numInternalKeysOnWritingVersion`
* respectively.
*/
private def countKeysInRange(
beginKeyWithPrefix: Array[Byte],
endKeyWithPrefix: Array[Byte],
cfName: String,
includesPrefix: Boolean
): (Long, Long) = {
val upperBoundSlice = new Slice(endKeyWithPrefix)
val rangeReadOptions = new ReadOptions()
rangeReadOptions.setIterateUpperBound(upperBoundSlice)
val iter = db.newIterator(rangeReadOptions)
try {

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

nit: upperBoundSlice and rangeReadOptions are allocated before the try block. If db.newIterator(rangeReadOptions) throws, neither native resource is closed - the finally only runs after the try starts. Consider wrapping each in its own try-finally, or moving the allocations inside the try with a null-check in finally.

// Resolve the CF once for the whole range. When the keys carry their cfId prefix, that
// on-key prefix is authoritative; otherwise trust the caller-supplied cfName.
val cfIsInternal = if (useColumnFamilies) {
val resolvedCfName = if (includesPrefix) {
decodeStateRowWithPrefix(beginKeyWithPrefix)._2
} else {
cfName
}
getColumnFamilyInfo(resolvedCfName).isInternal
} else {
false
}

iter.seek(beginKeyWithPrefix)
var count = 0L
while (iter.isValid) {
count += 1
iter.next()
}

if (cfIsInternal) (0L, count) else (count, 0L)
} finally {
iter.close()
rangeReadOptions.close()
upperBoundSlice.close()
}
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -2993,6 +2993,138 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid
}
}

test("deleteRange - numKeys metric is decremented by the number of keys removed") {
tryWithProviderResource(
newStoreProvider(
keySchemaWithRangeScan,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
useColumnFamilies = true)) { provider =>
val store = provider.getStore(0)
try {
// Insert 5 keys
store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10))
store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20))
store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30))
store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40))
store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50))

// deleteRange [2, 4) removes 2 keys (ts=2 and ts=3) -> numKeys should drop to 3
val beginKey = dataToKeyRowWithRangeScan(2L, "")
val endKey = dataToKeyRowWithRangeScan(4L, "")
store.deleteRange(beginKey, endKey)
assert(store.commit() === 1)

assert(store.metrics.numKeys === 3,
"numKeys should reflect the 3 keys remaining after deleteRange removed 2 of 5")
} finally {
if (!store.hasCommitted) store.abort()
}
}
}

test("deleteRange - empty range does not change numKeys metric") {
tryWithProviderResource(
newStoreProvider(
keySchemaWithRangeScan,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
useColumnFamilies = true)) { provider =>
val store = provider.getStore(0)
try {
store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10))
store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50))

// deleteRange [2, 4) covers no existing keys -> numKeys stays at 2
store.deleteRange(
dataToKeyRowWithRangeScan(2L, ""),
dataToKeyRowWithRangeScan(4L, ""))
assert(store.commit() === 1)

assert(store.metrics.numKeys === 2,
"Empty-range deleteRange should not decrement the numKeys metric")
} finally {
if (!store.hasCommitted) store.abort()
}
}
}

test("deleteRange - internal column family keys are tracked separately") {
val cfName = "testColFamily"
val internalCfName = "$internalCf"

tryWithProviderResource(
newStoreProvider(
keySchemaWithRangeScan,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
useColumnFamilies = true)) { provider =>
val store = provider.getStore(0)
try {
store.createColFamilyIfAbsent(cfName,
keySchemaWithRangeScan, valueSchema,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)))
store.createColFamilyIfAbsent(internalCfName,
keySchemaWithRangeScan, valueSchema,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
isInternal = true)

// 4 keys in the non-internal cf, 3 in the internal cf
store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10), cfName)
store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20), cfName)
store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30), cfName)
store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40), cfName)

store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(11), internalCfName)
store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(22), internalCfName)
store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(33), internalCfName)

// Delete [2, 4) on the internal cf only -> 2 internal keys gone, non-internal untouched
store.deleteRange(
dataToKeyRowWithRangeScan(2L, ""),
dataToKeyRowWithRangeScan(4L, ""),
internalCfName)
assert(store.commit() === 1)

assert(store.metrics.numKeys === 4,
"Non-internal numKeys should be unchanged when deleteRange targets an internal CF")
val internalMetric = store.metrics.customMetrics
.find(_._1.name == "rocksdbNumInternalColFamiliesKeys")
assert(internalMetric.isDefined && internalMetric.get._2 === 1,
"Internal CF key counter should drop from 3 to 1 after deleting 2 internal keys")
} finally {
if (!store.hasCommitted) store.abort()
}
}
}

test("deleteRange - metric is not updated when trackTotalNumberOfRows is disabled") {
withSQLConf(
RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".trackTotalNumberOfRows" -> "false") {
tryWithProviderResource(
newStoreProvider(
keySchemaWithRangeScan,
RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)),
useColumnFamilies = true)) { provider =>
val store = provider.getStore(0)
try {
store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10))
store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20))
store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30))

// Should complete without iterating the range (and without touching the counter,
// which is held at -1 when row-count tracking is disabled).
store.deleteRange(
dataToKeyRowWithRangeScan(2L, ""),
dataToKeyRowWithRangeScan(4L, ""))
assert(store.commit() === 1)

assert(store.metrics.numKeys === -1,
"numKeys should remain at -1 (untracked) when trackTotalNumberOfRows is off")
} finally {
if (!store.hasCommitted) store.abort()
}
}
}
}

test("Rocks DB task completion listener does not double unlock acquireThread") {
// This test verifies that a thread that locks then unlocks the db and then
// fires a completion listener (Thread 1) does not unlock the lock validly
Expand Down