Skip to content

feat(flink): Add metrics for RocksDB index backend in bucket assigner#18484

Open
cshuo wants to merge 2 commits intoapache:masterfrom
cshuo:add_rocksdb_metrics
Open

feat(flink): Add metrics for RocksDB index backend in bucket assigner#18484
cshuo wants to merge 2 commits intoapache:masterfrom
cshuo:add_rocksdb_metrics

Conversation

@cshuo
Copy link
Copy Markdown
Collaborator

@cshuo cshuo commented Apr 9, 2026

Describe the issue this Pull Request addresses

The Flink bucket assigner uses a RocksDB-backed index backend but does not expose metrics to monitor its disk usage, block cache, or memtable state. This makes it harder to understand index growth and runtime behavior.

Summary and Changelog

  • Add FlinkRocksDBIndexMetrics for RocksDB index disk, block cache, and memtable metrics.
  • Register backend-specific metrics through IndexBackend.registerMetrics(...).
  • Update RocksDBIndexBackend and RocksDBDAO to expose the RocksDB properties and tickers needed by these metrics.

Impact

Adds observability for the RocksDB-backed Flink index backend.

Risk Level

low

Documentation Update

Contributor's checklist

  • Read through contributor's guide
  • Enough context is provided in the sections above
  • Adequate tests were added if applicable

@cshuo cshuo force-pushed the add_rocksdb_metrics branch from 6902d19 to 1b6b7e6 Compare April 9, 2026 03:45
Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Thanks for contributing! Adding observability to the RocksDB index backend is genuinely useful, and the overall design is clean — the metrics abstraction, the ratio computation's divide-by-zero guard, and the idempotent registerMetrics check are all well done. There's one concurrency issue worth addressing before merge: a potential use-after-free on the native Statistics object when the metric reporter thread and the shutdown thread race.

@github-actions github-actions bot added the size:L PR with lines of changes in (300, 1000] label Apr 9, 2026
@cshuo cshuo force-pushed the add_rocksdb_metrics branch from 1b6b7e6 to 4c19cc1 Compare April 9, 2026 04:27
@cshuo cshuo requested a review from danny0405 April 9, 2026 07:25
@cshuo
Copy link
Copy Markdown
Collaborator Author

cshuo commented Apr 9, 2026

cc @suryaprasanna

Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

LGTM — nice addition of observability for the RocksDB index backend. The design is clean, metrics are well-chosen, and tests cover the important scenarios. Just a couple of thread-safety notes in the inline comments worth considering.

try {
return this.rocksDBDAO.getLongProperty(property);
} catch (RocksDBException | RuntimeException e) {
log.debug("Failed to read RocksDB metric property {}", property, e);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The two getTickerCount calls are not atomic — the lock is released between them. On shutdown, this could produce a brief 100% hit ratio (e.g., first call returns a real hit count, then close() runs, second call returns 0 for misses). Is this acceptable for your use case, or would it be worth adding a single synchronized method on RocksDBDAO that fetches both tickers atomically?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Collaborator Author

@cshuo cshuo Apr 10, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

That's acceptable for metrics, since it only happens when the job is being shutdown.

@Override
public void initializeState(FunctionInitializationContext context) throws Exception {
this.indexBackend = IndexBackendFactory.create(conf, context, getRuntimeContext());
this.indexBackend.registerMetrics(getRuntimeContext().getMetricGroup());
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Seems like we are emitting the metrics only at the intialization. Can we emit these metrics as part of every checkpoint and reset the hit and miss counters?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These are registered as Flink gauges, so they are not emitted only once during initializeState(). After registration, the reporter keeps polling the current values periodically.

processRecord(value, ctx.getCurrentKey(), out);
}

protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is there a way we can time the lookup latencies without impacting the job runtime? Especially, avg time taken to tag a record.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, this should be possible. Since this is a hot path, I would avoid per-record timing and instead use sampling (for example, once every N records) so we can get a reasonable latency signal without adding noticeable overhead.

}

@Override
public HoodieRecordGlobalLocation get(String recordKey) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Is it possible to get some statistics around this call without increasing job runtime?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This metric is semantically similar to the previously mentioned lookup metric, so we can keep just one.

}

private long getTotalSstFilesSize() {
return rocksDBIndexBackend.getLongMetric(TOTAL_SST_FILES_SIZE_PROPERTY);
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

All these metrics on RocksDB do they belong to index_cache column family or does it include all the column families?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

These metrics are not limited to the index_cache column family. The long properties are read via getAggregatedLongProperty(), and the ticker/statistics values are maintained at the RocksDB instance level. But in practice that still reflects this index backend, since the RocksDB instance is dedicated to it.

processRecord(value, ctx.getCurrentKey(), out);
}

protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Collector<HoodieFlinkInternalRow> out) throws Exception {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can we also include insert vs update count metric? I am not sure if this is emitted in any other code path.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Yes, we can include those metrics.

@cshuo cshuo force-pushed the add_rocksdb_metrics branch from 4c19cc1 to 12bfc6e Compare April 13, 2026 13:11
Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Style & Readability Review — Overall code looks clean and well-structured. One minor naming suggestion regarding a method that has a side effect but uses a predicate-style name.

metricGroup.gauge(UPDATE_COUNT, (Gauge<Long>) () -> updateCount);
}

public boolean shouldSampleLookup() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 nit: shouldSampleLookup() has a side effect (increments lookupCount) but uses a predicate-style name, which typically implies a pure query. Consider renaming to reflect that it mutates state, e.g., recordLookupAndShouldSample() or split into separate concerns.

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

Copy link
Copy Markdown
Contributor

@yihua yihua left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 This review was generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

LGTM — clean implementation that adds useful RocksDB observability to the Flink bucket assigner, with solid test coverage and proper metrics lifecycle management. One minor consideration: windowed hit ratios might be more operationally useful than the current cumulative ones computed from tickers since DB open.

}
}

public double getRatioMetric(TickerType hitTicker, TickerType missTicker) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

🤖 The hit ratios here are computed from cumulative tickers since DB open, so they reflect the all-time ratio rather than a recent window. For long-running Flink jobs, this means the metric becomes very stable over time and won't surface recent changes in cache behavior. Have you considered computing a delta-based ratio (comparing ticker values between consecutive gauge polls) to make the metric more responsive?

- Generated by an AI agent and may contain mistakes. Please verify any suggestions before applying.

@codecov-commenter
Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 89.58333% with 10 lines in your changes missing coverage. Please review.
✅ Project coverage is 68.84%. Comparing base (fc7f303) to head (12bfc6e).

Files with missing lines Patch % Lines
...di/sink/partitioner/index/RocksDBIndexBackend.java 68.42% 6 Missing ⚠️
...apache/hudi/common/util/collection/RocksDBDAO.java 71.42% 2 Missing and 2 partials ⚠️
Additional details and impacted files
@@            Coverage Diff            @@
##             master   #18484   +/-   ##
=========================================
  Coverage     68.83%   68.84%           
- Complexity    28171    28210   +39     
=========================================
  Files          2459     2461    +2     
  Lines        135095   135189   +94     
  Branches      16378    16381    +3     
=========================================
+ Hits          92992    93065   +73     
- Misses        34737    34759   +22     
+ Partials       7366     7365    -1     
Flag Coverage Δ
common-and-other-modules 44.56% <89.58%> (+0.03%) ⬆️
hadoop-mr-java-client 44.83% <35.71%> (-0.01%) ⬇️
spark-client-hadoop-common 48.43% <35.71%> (-0.01%) ⬇️
spark-java-tests 48.88% <35.71%> (-0.04%) ⬇️
spark-scala-tests 45.48% <0.00%> (-0.01%) ⬇️
utilities 38.21% <0.00%> (+0.01%) ⬆️

Flags with carried forward coverage won't be shown. Click here to find out more.

Files with missing lines Coverage Δ
...va/org/apache/hudi/configuration/FlinkOptions.java 99.78% <100.00%> (+<0.01%) ⬆️
.../apache/hudi/metrics/FlinkBucketAssignMetrics.java 100.00% <100.00%> (ø)
.../apache/hudi/metrics/FlinkRocksDBIndexMetrics.java 100.00% <100.00%> (ø)
...he/hudi/sink/partitioner/BucketAssignFunction.java 94.31% <100.00%> (+0.98%) ⬆️
...ache/hudi/sink/partitioner/index/IndexBackend.java 100.00% <100.00%> (ø)
...apache/hudi/common/util/collection/RocksDBDAO.java 71.61% <71.42%> (-0.15%) ⬇️
...di/sink/partitioner/index/RocksDBIndexBackend.java 80.00% <68.42%> (-20.00%) ⬇️

... and 9 files with indirect coverage changes

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@hudi-bot
Copy link
Copy Markdown
Collaborator

CI report:

Bot commands @hudi-bot supports the following commands:
  • @hudi-bot run azure re-run the last Azure build

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

size:L PR with lines of changes in (300, 1000]

Projects

None yet

Development

Successfully merging this pull request may close these issues.

5 participants