feat(flink): Add metrics for RocksDB index backend in bucket assigner#18484
feat(flink): Add metrics for RocksDB index backend in bucket assigner#18484cshuo wants to merge 2 commits intoapache:masterfrom
Conversation
6902d19 to
1b6b7e6
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Thanks for contributing! Adding observability to the RocksDB index backend is genuinely useful, and the overall design is clean — the metrics abstraction, the ratio computation's divide-by-zero guard, and the idempotent registerMetrics check are all well done. There's one concurrency issue worth addressing before merge: a potential use-after-free on the native Statistics object when the metric reporter thread and the shutdown thread race.
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
Show resolved
Hide resolved
1b6b7e6 to
4c19cc1
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
LGTM — nice addition of observability for the RocksDB index backend. The design is clean, metrics are well-chosen, and tests cover the important scenarios. Just a couple of thread-safety notes in the inline comments worth considering.
hudi-common/src/main/java/org/apache/hudi/common/util/collection/RocksDBDAO.java
Show resolved
Hide resolved
| try { | ||
| return this.rocksDBDAO.getLongProperty(property); | ||
| } catch (RocksDBException | RuntimeException e) { | ||
| log.debug("Failed to read RocksDB metric property {}", property, e); |
There was a problem hiding this comment.
🤖 The two getTickerCount calls are not atomic — the lock is released between them. On shutdown, this could produce a brief 100% hit ratio (e.g., first call returns a real hit count, then close() runs, second call returns 0 for misses). Is this acceptable for your use case, or would it be worth adding a single synchronized method on RocksDBDAO that fetches both tickers atomically?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
There was a problem hiding this comment.
That's acceptable for metrics, since it only happens when the job is being shutdown.
| @Override | ||
| public void initializeState(FunctionInitializationContext context) throws Exception { | ||
| this.indexBackend = IndexBackendFactory.create(conf, context, getRuntimeContext()); | ||
| this.indexBackend.registerMetrics(getRuntimeContext().getMetricGroup()); |
There was a problem hiding this comment.
Seems like we are emitting the metrics only at the intialization. Can we emit these metrics as part of every checkpoint and reset the hit and miss counters?
There was a problem hiding this comment.
These are registered as Flink gauges, so they are not emitted only once during initializeState(). After registration, the reporter keeps polling the current values periodically.
| processRecord(value, ctx.getCurrentKey(), out); | ||
| } | ||
|
|
||
| protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception { |
There was a problem hiding this comment.
Is there a way we can time the lookup latencies without impacting the job runtime? Especially, avg time taken to tag a record.
There was a problem hiding this comment.
Yes, this should be possible. Since this is a hot path, I would avoid per-record timing and instead use sampling (for example, once every N records) so we can get a reasonable latency signal without adding noticeable overhead.
| } | ||
|
|
||
| @Override | ||
| public HoodieRecordGlobalLocation get(String recordKey) { |
There was a problem hiding this comment.
Is it possible to get some statistics around this call without increasing job runtime?
There was a problem hiding this comment.
This metric is semantically similar to the previously mentioned lookup metric, so we can keep just one.
| } | ||
|
|
||
| private long getTotalSstFilesSize() { | ||
| return rocksDBIndexBackend.getLongMetric(TOTAL_SST_FILES_SIZE_PROPERTY); |
There was a problem hiding this comment.
All these metrics on RocksDB do they belong to index_cache column family or does it include all the column families?
There was a problem hiding this comment.
These metrics are not limited to the index_cache column family. The long properties are read via getAggregatedLongProperty(), and the ticker/statistics values are maintained at the RocksDB instance level. But in practice that still reflects this index backend, since the RocksDB instance is dedicated to it.
| processRecord(value, ctx.getCurrentKey(), out); | ||
| } | ||
|
|
||
| protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception { |
There was a problem hiding this comment.
Can we also include insert vs update count metric? I am not sure if this is emitted in any other code path.
There was a problem hiding this comment.
Yes, we can include those metrics.
4c19cc1 to
12bfc6e
Compare
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Style & Readability Review — Overall code looks clean and well-structured. One minor naming suggestion regarding a method that has a side effect but uses a predicate-style name.
| metricGroup.gauge(UPDATE_COUNT, (Gauge<Long>) () -> updateCount); | ||
| } | ||
|
|
||
| public boolean shouldSampleLookup() { |
There was a problem hiding this comment.
🤖 nit: shouldSampleLookup() has a side effect (increments lookupCount) but uses a predicate-style name, which typically implies a pure query. Consider renaming to reflect that it mutates state, e.g., recordLookupAndShouldSample() or split into separate concerns.
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
yihua
left a comment
There was a problem hiding this comment.
🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
LGTM — clean implementation that adds useful RocksDB observability to the Flink bucket assigner, with solid test coverage and proper metrics lifecycle management. One minor consideration: windowed hit ratios might be more operationally useful than the current cumulative ones computed from tickers since DB open.
| } | ||
| } | ||
|
|
||
| public double getRatioMetric(TickerType hitTicker, TickerType missTicker) { |
There was a problem hiding this comment.
🤖 The hit ratios here are computed from cumulative tickers since DB open, so they reflect the all-time ratio rather than a recent window. For long-running Flink jobs, this means the metric becomes very stable over time and won't surface recent changes in cache behavior. Have you considered computing a delta-based ratio (comparing ticker values between consecutive gauge polls) to make the metric more responsive?
- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18484 +/- ##
=========================================
Coverage 68.83% 68.84%
- Complexity 28171 28210 +39
=========================================
Files 2459 2461 +2
Lines 135095 135189 +94
Branches 16378 16381 +3
=========================================
+ Hits 92992 93065 +73
- Misses 34737 34759 +22
+ Partials 7366 7365 -1
Flags with carried forward coverage won't be shown. Click here to find out more.
🚀 New features to boost your workflow:
|
Describe the issue this Pull Request addresses
The Flink bucket assigner uses a RocksDB-backed index backend but does not expose metrics to monitor its disk usage, block cache, or memtable state. This makes it harder to understand index growth and runtime behavior.
Summary and Changelog
Impact
Adds observability for the RocksDB-backed Flink index backend.
Risk Level
low
Documentation Update
Contributor's checklist