From 2dbe50f94d948075188a99120708ecc688c5a26c Mon Sep 17 00:00:00 2001 From: Trisha Jain Date: Thu, 18 Jun 2026 18:07:44 +0000 Subject: [PATCH] [SPARK-57537][SS] Add metrics support for deleteRange operation in RocksDB --- .../execution/streaming/state/RocksDB.scala | 70 +++++++++- .../state/RocksDBStateStoreSuite.scala | 132 ++++++++++++++++++ 2 files changed, 201 insertions(+), 1 deletion(-) diff --git a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala index bd479ffc6d2a8..f65f0e12a6804 100644 --- a/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala +++ b/sql/core/src/main/scala/org/apache/spark/sql/execution/streaming/state/RocksDB.scala @@ -1583,6 +1583,18 @@ class RocksDB( originalEndKey } + // Count the keys that are about to be deleted (gated on trackTotalNumberOfRows). + if (conf.trackTotalNumberOfRows) { + val (deletedKeys, deletedInternalKeys) = countKeysInRange( + beginKeyWithPrefix, + endKeyWithPrefix, + cfName, + includesPrefix + ) + numKeysOnWritingVersion -= deletedKeys + numInternalKeysOnWritingVersion -= deletedInternalKeys + } + db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix) changelogWriter.foreach { writer => val endKeyForChangelog = if (conf.rowChecksumEnabled) { @@ -1593,7 +1605,63 @@ class RocksDB( } writer.deleteRange(beginKeyWithPrefix, endKeyForChangelog) } - // TODO: Add metrics update for deleteRange operations + } + + /** Count the keys currently present in `[beginKeyWithPrefix, + * endKeyWithPrefix)`, split into non-internal vs internal column-family + * buckets. + * + * @param beginKeyWithPrefix + * Already-prefixed start key (inclusive) + * @param endKeyWithPrefix + * Already-prefixed end key (exclusive); used as the iterator upper bound + * so we never touch keys beyond it. + * @param cfName + * Caller-supplied column family. Used when `includesPrefix` is false; + * @param includesPrefix + * When true, the keys already carry their cfId prefix. + * @return + * (numKeys, numInternalKeys) - the counts to subtract from + * `numKeysOnWritingVersion` and `numInternalKeysOnWritingVersion` + * respectively. + */ + private def countKeysInRange( + beginKeyWithPrefix: Array[Byte], + endKeyWithPrefix: Array[Byte], + cfName: String, + includesPrefix: Boolean + ): (Long, Long) = { + val upperBoundSlice = new Slice(endKeyWithPrefix) + val rangeReadOptions = new ReadOptions() + rangeReadOptions.setIterateUpperBound(upperBoundSlice) + val iter = db.newIterator(rangeReadOptions) + try { + // Resolve the CF once for the whole range. When the keys carry their cfId prefix, that + // on-key prefix is authoritative; otherwise trust the caller-supplied cfName. + val cfIsInternal = if (useColumnFamilies) { + val resolvedCfName = if (includesPrefix) { + decodeStateRowWithPrefix(beginKeyWithPrefix)._2 + } else { + cfName + } + getColumnFamilyInfo(resolvedCfName).isInternal + } else { + false + } + + iter.seek(beginKeyWithPrefix) + var count = 0L + while (iter.isValid) { + count += 1 + iter.next() + } + + if (cfIsInternal) (0L, count) else (count, 0L) + } finally { + iter.close() + rangeReadOptions.close() + upperBoundSlice.close() + } } /** diff --git a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala index e501366b7f98f..42649272891eb 100644 --- a/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala +++ b/sql/core/src/test/scala/org/apache/spark/sql/execution/streaming/state/RocksDBStateStoreSuite.scala @@ -2993,6 +2993,138 @@ class RocksDBStateStoreSuite extends StateStoreSuiteBase[RocksDBStateStoreProvid } } + test("deleteRange - numKeys metric is decremented by the number of keys removed") { + tryWithProviderResource( + newStoreProvider( + keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)), + useColumnFamilies = true)) { provider => + val store = provider.getStore(0) + try { + // Insert 5 keys + store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10)) + store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20)) + store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30)) + store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40)) + store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50)) + + // deleteRange [2, 4) removes 2 keys (ts=2 and ts=3) -> numKeys should drop to 3 + val beginKey = dataToKeyRowWithRangeScan(2L, "") + val endKey = dataToKeyRowWithRangeScan(4L, "") + store.deleteRange(beginKey, endKey) + assert(store.commit() === 1) + + assert(store.metrics.numKeys === 3, + "numKeys should reflect the 3 keys remaining after deleteRange removed 2 of 5") + } finally { + if (!store.hasCommitted) store.abort() + } + } + } + + test("deleteRange - empty range does not change numKeys metric") { + tryWithProviderResource( + newStoreProvider( + keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)), + useColumnFamilies = true)) { provider => + val store = provider.getStore(0) + try { + store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10)) + store.put(dataToKeyRowWithRangeScan(5L, "e"), dataToValueRow(50)) + + // deleteRange [2, 4) covers no existing keys -> numKeys stays at 2 + store.deleteRange( + dataToKeyRowWithRangeScan(2L, ""), + dataToKeyRowWithRangeScan(4L, "")) + assert(store.commit() === 1) + + assert(store.metrics.numKeys === 2, + "Empty-range deleteRange should not decrement the numKeys metric") + } finally { + if (!store.hasCommitted) store.abort() + } + } + } + + test("deleteRange - internal column family keys are tracked separately") { + val cfName = "testColFamily" + val internalCfName = "$internalCf" + + tryWithProviderResource( + newStoreProvider( + keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)), + useColumnFamilies = true)) { provider => + val store = provider.getStore(0) + try { + store.createColFamilyIfAbsent(cfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0))) + store.createColFamilyIfAbsent(internalCfName, + keySchemaWithRangeScan, valueSchema, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)), + isInternal = true) + + // 4 keys in the non-internal cf, 3 in the internal cf + store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10), cfName) + store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20), cfName) + store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30), cfName) + store.put(dataToKeyRowWithRangeScan(4L, "d"), dataToValueRow(40), cfName) + + store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(11), internalCfName) + store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(22), internalCfName) + store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(33), internalCfName) + + // Delete [2, 4) on the internal cf only -> 2 internal keys gone, non-internal untouched + store.deleteRange( + dataToKeyRowWithRangeScan(2L, ""), + dataToKeyRowWithRangeScan(4L, ""), + internalCfName) + assert(store.commit() === 1) + + assert(store.metrics.numKeys === 4, + "Non-internal numKeys should be unchanged when deleteRange targets an internal CF") + val internalMetric = store.metrics.customMetrics + .find(_._1.name == "rocksdbNumInternalColFamiliesKeys") + assert(internalMetric.isDefined && internalMetric.get._2 === 1, + "Internal CF key counter should drop from 3 to 1 after deleting 2 internal keys") + } finally { + if (!store.hasCommitted) store.abort() + } + } + } + + test("deleteRange - metric is not updated when trackTotalNumberOfRows is disabled") { + withSQLConf( + RocksDBConf.ROCKSDB_SQL_CONF_NAME_PREFIX + ".trackTotalNumberOfRows" -> "false") { + tryWithProviderResource( + newStoreProvider( + keySchemaWithRangeScan, + RangeKeyScanStateEncoderSpec(keySchemaWithRangeScan, Seq(0)), + useColumnFamilies = true)) { provider => + val store = provider.getStore(0) + try { + store.put(dataToKeyRowWithRangeScan(1L, "a"), dataToValueRow(10)) + store.put(dataToKeyRowWithRangeScan(2L, "b"), dataToValueRow(20)) + store.put(dataToKeyRowWithRangeScan(3L, "c"), dataToValueRow(30)) + + // Should complete without iterating the range (and without touching the counter, + // which is held at -1 when row-count tracking is disabled). + store.deleteRange( + dataToKeyRowWithRangeScan(2L, ""), + dataToKeyRowWithRangeScan(4L, "")) + assert(store.commit() === 1) + + assert(store.metrics.numKeys === -1, + "numKeys should remain at -1 (untracked) when trackTotalNumberOfRows is off") + } finally { + if (!store.hasCommitted) store.abort() + } + } + } + } + test("Rocks DB task completion listener does not double unlock acquireThread") { // This test verifies that a thread that locks then unlocks the db and then // fires a completion listener (Thread 1) does not unlock the lock validly