Skip to content

Commit fbffc5d

Browse files
committed
[metrics] Add pendingTime, freshness, fix pendingRecords for PK tables, and update lake tiering metrics docs
1 parent 0fccd3c commit fbffc5d

4 files changed

Lines changed: 48 additions & 9 deletions

File tree

fluss-common/src/main/java/org/apache/fluss/metrics/MetricNames.java

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -70,6 +70,8 @@ public class MetricNames {
7070
public static final String LAKE_TIERING_TABLE_FAILURES_TOTAL = "failuresTotal";
7171
public static final String LAKE_TIERING_TABLE_FILE_SIZE = "fileSize";
7272
public static final String LAKE_TIERING_TABLE_RECORD_COUNT = "recordCount";
73+
public static final String LAKE_TIERING_TABLE_PENDING_TIME = "pendingTime";
74+
public static final String LAKE_TIERING_TABLE_FRESHNESS = "freshness";
7375

7476
// --------------------------------------------------------------------------------------------
7577
// metrics for tablet server
@@ -222,11 +224,13 @@ public class MetricNames {
222224
// metrics for table bucket
223225
// --------------------------------------------------------------------------------------------
224226

227+
// for tablet
228+
public static final String LAKE_PENDING_RECORDS = "pendingRecords";
229+
225230
// for log tablet
226231
public static final String LOG_NUM_SEGMENTS = "numSegments";
227232
public static final String LOG_END_OFFSET = "endOffset";
228233
public static final String REMOTE_LOG_SIZE = "size";
229-
public static final String LOG_LAKE_PENDING_RECORDS = "pendingRecords";
230234
public static final String LOG_LAKE_TIMESTAMP_LAG = "timestampLag";
231235

232236
// for logic storage

fluss-server/src/main/java/org/apache/fluss/server/coordinator/LakeTableTieringManager.java

Lines changed: 23 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,9 @@ public class LakeTableTieringManager implements AutoCloseable {
141141
// table_id -> start time (ms) of the currently in-progress tiering round
142142
private final Map<Long, Long> currentTieringStartTime;
143143

144+
// table_id -> time (ms) when the table entered pending queue
145+
private final Map<Long, Long> pendingEnterTime;
146+
144147
// the live tables that are tiering,
145148
// from table_id -> last heartbeat time by the tiering service
146149
private final Map<Long, Long> liveTieringTableIds;
@@ -184,6 +187,7 @@ protected LakeTableTieringManager(
184187
this.delayedTieringByTableId = new HashMap<>();
185188
this.tableFailureCounters = new HashMap<>();
186189
this.currentTieringStartTime = new HashMap<>();
190+
this.pendingEnterTime = new HashMap<>();
187191
this.tieringMetricGroup = lakeTieringMetricGroup;
188192
registerMetrics();
189193
}
@@ -279,6 +283,17 @@ private void registerTableMetrics(long tableId, TablePath tablePath) {
279283
MetricNames.LAKE_TIERING_TABLE_TIER_DURATION,
280284
() -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.tierDuration)));
281285

286+
// pendingTime: how long the table has been waiting in the pending queue
287+
tableMetricGroup.gauge(
288+
MetricNames.LAKE_TIERING_TABLE_PENDING_TIME,
289+
() ->
290+
inReadLock(
291+
lock,
292+
() -> {
293+
long enterTime = pendingEnterTime.getOrDefault(tableId, 0L);
294+
return enterTime > 0 ? clock.milliseconds() - enterTime : 0L;
295+
}));
296+
282297
// failuresTotal: total failure count for this table
283298
Counter failuresCounter =
284299
tableMetricGroup.counter(MetricNames.LAKE_TIERING_TABLE_FAILURES_TOTAL);
@@ -293,6 +308,11 @@ private void registerTableMetrics(long tableId, TablePath tablePath) {
293308
tableMetricGroup.gauge(
294309
MetricNames.LAKE_TIERING_TABLE_RECORD_COUNT,
295310
() -> inReadLock(lock, () -> getLastResultField(tableId, r -> r.recordCount)));
311+
312+
// freshness: the user-configured table data freshness interval in milliseconds
313+
tableMetricGroup.gauge(
314+
MetricNames.LAKE_TIERING_TABLE_FRESHNESS,
315+
() -> inReadLock(lock, () -> tableLakeFreshness.getOrDefault(tableId, -1L)));
296316
}
297317

298318
/**
@@ -316,6 +336,7 @@ public void removeLakeTable(long tableId) {
316336
tableLakeFreshness.remove(tableId);
317337
lastTieringResult.remove(tableId);
318338
currentTieringStartTime.remove(tableId);
339+
pendingEnterTime.remove(tableId);
319340
tableFailureCounters.remove(tableId);
320341
// close and remove the metric group to unregister metrics
321342
tieringMetricGroup.removeTableLakeTieringMetricGroup(tableId);
@@ -579,10 +600,12 @@ private void doHandleStateChange(long tableId, TieringState targetState) {
579600
// increase tiering epoch and initialize the heartbeat of the tiering table
580601
tableTierEpoch.computeIfPresent(tableId, (t, v) -> v + 1);
581602
pendingTieringTables.add(tableId);
603+
pendingEnterTime.put(tableId, clock.milliseconds());
582604
break;
583605
case Tiering:
584606
liveTieringTableIds.put(tableId, clock.milliseconds());
585607
currentTieringStartTime.put(tableId, clock.milliseconds());
608+
pendingEnterTime.put(tableId, 0L);
586609
break;
587610
case Tiered:
588611
liveTieringTableIds.remove(tableId);

fluss-server/src/main/java/org/apache/fluss/server/replica/Replica.java

Lines changed: 7 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -549,11 +549,13 @@ private void onBecomeNewLeader() {
549549
private void registerLakeTieringMetrics() {
550550
lakeTieringMetricGroup = bucketMetricGroup.addGroup("lakeTiering");
551551
lakeTieringMetricGroup.gauge(
552-
MetricNames.LOG_LAKE_PENDING_RECORDS,
553-
() ->
554-
getLakeLogEndOffset() < 0L
555-
? getLogHighWatermark() - getLogStartOffset()
556-
: getLogHighWatermark() - getLakeLogEndOffset());
552+
MetricNames.LAKE_PENDING_RECORDS,
553+
() -> {
554+
long lakeLogEndOffset = getLakeLogEndOffset();
555+
return lakeLogEndOffset < 0L
556+
? getRowCount()
557+
: getLogHighWatermark() - lakeLogEndOffset;
558+
});
557559
lakeTieringMetricGroup.gauge(
558560
MetricNames.LOG_LAKE_TIMESTAMP_LAG,
559561
() ->

website/docs/maintenance/observability/monitor-metrics.md

Lines changed: 13 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -294,7 +294,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
294294
</thead>
295295
<tbody>
296296
<tr>
297-
<th rowspan="25"><strong>coordinator</strong></th>
297+
<th rowspan="27"><strong>coordinator</strong></th>
298298
<td style={{textAlign: 'center', verticalAlign: 'middle' }} rowspan="10">-</td>
299299
<td>activeCoordinatorCount</td>
300300
<td>The number of active CoordinatorServer (only leader) in this cluster.</td>
@@ -401,7 +401,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
401401
<td>Gauge</td>
402402
</tr>
403403
<tr>
404-
<td rowspan="5">lakeTiering_table</td>
404+
<td rowspan="7">lakeTiering_table</td>
405405
<td>tierLag</td>
406406
<td>Time in milliseconds since the last successful tiering operation for this table. For newly registered tables that have never completed a tiering round, the lag is measured from the time the table was registered.</td>
407407
<td>Gauge</td>
@@ -426,6 +426,16 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
426426
<td>Cumulative total record count of the lake table after the last tiering round. Returns -1 if no tiering has completed yet.</td>
427427
<td>Gauge</td>
428428
</tr>
429+
<tr>
430+
<td>pendingTime</td>
431+
<td>How long (in milliseconds) the table has been waiting in the pending queue for tiering. Returns 0 when the table is not currently pending.</td>
432+
<td>Gauge</td>
433+
</tr>
434+
<tr>
435+
<td>freshness</td>
436+
<td>The user-configured data freshness interval (in milliseconds) for this table.</td>
437+
<td>Gauge</td>
438+
</tr>
429439
</tbody>
430440
</table>
431441

@@ -855,7 +865,7 @@ Some metrics might not be exposed when using other JVM implementations (e.g. IBM
855865
</tr>
856866
<tr>
857867
<td>timestampLag</td>
858-
<td>The timestamp lag between local log and remote log for this table bucket in milliseconds.</td>
868+
<td>The timestamp lag between the latest log record and the latest record already tiered to the lake for this table bucket, in milliseconds.</td>
859869
<td>Gauge</td>
860870
</tr>
861871
<tr>

0 commit comments

Comments
 (0)