|
23 | 23 | import org.apache.fluss.config.TableConfig; |
24 | 24 | import org.apache.fluss.exception.FencedLeaderEpochException; |
25 | 25 | import org.apache.fluss.exception.InvalidColumnProjectionException; |
| 26 | +import org.apache.fluss.exception.InvalidTableException; |
26 | 27 | import org.apache.fluss.exception.InvalidTimestampException; |
27 | 28 | import org.apache.fluss.exception.InvalidUpdateVersionException; |
28 | 29 | import org.apache.fluss.exception.KvStorageException; |
@@ -549,11 +550,20 @@ private void onBecomeNewLeader() { |
549 | 550 | private void registerLakeTieringMetrics() { |
550 | 551 | lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering"); |
551 | 552 | lakeTieringMetricGroup.gauge( |
552 | | - MetricNames.LOG_LAKE_PENDING_RECORDS, |
553 | | - () -> |
554 | | - getLakeLogEndOffset() < 0L |
555 | | - ? getLogHighWatermark() - getLogStartOffset() |
556 | | - : getLogHighWatermark() - getLakeLogEndOffset()); |
| 553 | + MetricNames.LAKE_PENDING_RECORDS, |
| 554 | + () -> { |
| 555 | + long lakeLogEndOffset = getLakeLogEndOffset(); |
| 556 | + if (lakeLogEndOffset < 0L) { |
| 557 | + try { |
| 558 | + return getRowCount(); |
| 559 | + } catch (InvalidTableException e) { |
| 560 | + // WAL mode or v0.9 old table: row count disabled, |
| 561 | + // fall back to log-level pending count |
| 562 | + return getLogHighWatermark() - getLogStartOffset(); |
| 563 | + } |
| 564 | + } |
| 565 | + return getLogHighWatermark() - lakeLogEndOffset; |
| 566 | + }); |
557 | 567 | lakeTieringMetricGroup.gauge( |
558 | 568 | MetricNames.LOG_LAKE_TIMESTAMP_LAG, |
559 | 569 | () -> |
|
0 commit comments