-
Notifications
You must be signed in to change notification settings - Fork 29.2k
[SPARK-57537][SS] Add metrics support for deleteRange operation in RocksDB #56598
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -1583,6 +1583,18 @@ class RocksDB( | |
| originalEndKey | ||
| } | ||
|
|
||
| // Count the keys that are about to be deleted (gated on trackTotalNumberOfRows). | ||
| if (conf.trackTotalNumberOfRows) { | ||
| val (deletedKeys, deletedInternalKeys) = countKeysInRange( | ||
| beginKeyWithPrefix, | ||
| endKeyWithPrefix, | ||
| cfName, | ||
| includesPrefix | ||
| ) | ||
| numKeysOnWritingVersion -= deletedKeys | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Correctness concern: The counters are decremented before |
||
| numInternalKeysOnWritingVersion -= deletedInternalKeys | ||
| } | ||
|
|
||
| db.deleteRange(writeOptions, beginKeyWithPrefix, endKeyWithPrefix) | ||
| changelogWriter.foreach { writer => | ||
| val endKeyForChangelog = if (conf.rowChecksumEnabled) { | ||
|
|
@@ -1593,7 +1605,63 @@ class RocksDB( | |
| } | ||
| writer.deleteRange(beginKeyWithPrefix, endKeyForChangelog) | ||
| } | ||
| // TODO: Add metrics update for deleteRange operations | ||
| } | ||
|
|
||
| /** Count the keys currently present in `[beginKeyWithPrefix, | ||
| * endKeyWithPrefix)`, split into non-internal vs internal column-family | ||
| * buckets. | ||
| * | ||
| * @param beginKeyWithPrefix | ||
| * Already-prefixed start key (inclusive) | ||
| * @param endKeyWithPrefix | ||
| * Already-prefixed end key (exclusive); used as the iterator upper bound | ||
| * so we never touch keys beyond it. | ||
| * @param cfName | ||
| * Caller-supplied column family. Used when `includesPrefix` is false; | ||
| * @param includesPrefix | ||
| * When true, the keys already carry their cfId prefix. | ||
| * @return | ||
| * (numKeys, numInternalKeys) - the counts to subtract from | ||
| * `numKeysOnWritingVersion` and `numInternalKeysOnWritingVersion` | ||
| * respectively. | ||
| */ | ||
| private def countKeysInRange( | ||
| beginKeyWithPrefix: Array[Byte], | ||
| endKeyWithPrefix: Array[Byte], | ||
| cfName: String, | ||
| includesPrefix: Boolean | ||
| ): (Long, Long) = { | ||
| val upperBoundSlice = new Slice(endKeyWithPrefix) | ||
| val rangeReadOptions = new ReadOptions() | ||
| rangeReadOptions.setIterateUpperBound(upperBoundSlice) | ||
| val iter = db.newIterator(rangeReadOptions) | ||
| try { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. nit: |
||
| // Resolve the CF once for the whole range. When the keys carry their cfId prefix, that | ||
| // on-key prefix is authoritative; otherwise trust the caller-supplied cfName. | ||
| val cfIsInternal = if (useColumnFamilies) { | ||
| val resolvedCfName = if (includesPrefix) { | ||
| decodeStateRowWithPrefix(beginKeyWithPrefix)._2 | ||
| } else { | ||
| cfName | ||
| } | ||
| getColumnFamilyInfo(resolvedCfName).isInternal | ||
| } else { | ||
| false | ||
| } | ||
|
|
||
| iter.seek(beginKeyWithPrefix) | ||
| var count = 0L | ||
| while (iter.isValid) { | ||
| count += 1 | ||
| iter.next() | ||
| } | ||
|
|
||
| if (cfIsInternal) (0L, count) else (count, 0L) | ||
| } finally { | ||
| iter.close() | ||
| rangeReadOptions.close() | ||
| upperBoundSlice.close() | ||
| } | ||
| } | ||
|
|
||
| /** | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
deleteRangein RocksDB is O(1) - it writes a single range tombstone without touching individual keys. Scanning the full range with an iterator to count keys makes this O(n). For TTL-based state expiration (the primarydeleteRangeuse case in structured streaming), ranges can contain millions of expired keys. WhentrackTotalNumberOfRowsis enabled (the default), everydeleteRangenow iterates the full range before deleting it. Worth documenting this cost tradeoff, or considering RocksDB's approximate-count APIs (GetApproximateSizes) as an alternative for large ranges.