Skip to content

Commit 12bfc6e

Browse files
committed
fix comments
1 parent 1faecb2 commit 12bfc6e

5 files changed

Lines changed: 201 additions & 14 deletions

File tree

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/configuration/FlinkOptions.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -703,6 +703,15 @@ public class FlinkOptions extends HoodieConfig {
703703
.noDefaultValue()
704704
.withDescription("Parallelism of tasks that do bucket assign, default same as the write task parallelism");
705705

706+
@AdvancedConfig
707+
public static final ConfigOption<Integer> BUCKET_ASSIGN_METRIC_SAMPLING_RATE = ConfigOptions
708+
.key("write.bucket_assign.metric.sampling.rate")
709+
.intType()
710+
.defaultValue(1000)
711+
.withDescription("Sampling interval for bucket assign metrics. "
712+
+ "The bucket assign operator records the latency of every Nth index lookup "
713+
+ "to keep the observability overhead low on the hot path.");
714+
706715
@AdvancedConfig
707716
public static final ConfigOption<Integer> WRITE_TASKS = ConfigOptions
708717
.key("write.tasks")
Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,67 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.metrics;
20+
21+
import org.apache.flink.metrics.Gauge;
22+
import org.apache.flink.metrics.MetricGroup;
23+
24+
/**
25+
* Metrics for the flink bucket assign operator.
26+
*/
27+
public class FlinkBucketAssignMetrics extends HoodieFlinkMetrics {
28+
public static final String INDEX_LOOKUP_DURATION_MS = "indexLookupDurationMs";
29+
public static final String INSERT_COUNT = "insertCount";
30+
public static final String UPDATE_COUNT = "updateCount";
31+
32+
private final int indexLookupSamplingRate;
33+
34+
private long lookupCount;
35+
private long indexLookupDurationMs;
36+
private long insertCount;
37+
private long updateCount;
38+
39+
public FlinkBucketAssignMetrics(MetricGroup metricGroup, int indexLookupSamplingRate) {
40+
super(metricGroup);
41+
this.indexLookupSamplingRate = Math.max(1, indexLookupSamplingRate);
42+
}
43+
44+
@Override
45+
public void registerMetrics() {
46+
metricGroup.gauge(INDEX_LOOKUP_DURATION_MS, (Gauge<Long>) () -> indexLookupDurationMs);
47+
metricGroup.gauge(INSERT_COUNT, (Gauge<Long>) () -> insertCount);
48+
metricGroup.gauge(UPDATE_COUNT, (Gauge<Long>) () -> updateCount);
49+
}
50+
51+
public boolean shouldSampleLookup() {
52+
lookupCount += 1;
53+
return lookupCount % indexLookupSamplingRate == 0;
54+
}
55+
56+
public void recordLookupDuration(long lookupDurationMs) {
57+
this.indexLookupDurationMs = lookupDurationMs;
58+
}
59+
60+
public void markInsert() {
61+
insertCount += 1;
62+
}
63+
64+
public void markUpdate() {
65+
updateCount += 1;
66+
}
67+
}

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/metrics/FlinkRocksDBIndexMetrics.java

Lines changed: 13 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -36,19 +36,19 @@ public class FlinkRocksDBIndexMetrics extends HoodieFlinkMetrics {
3636
private static final String ALL_MEMTABLES_SIZE_PROPERTY = "rocksdb.cur-size-all-mem-tables";
3737
private static final String IMMUTABLE_MEMTABLE_COUNT_PROPERTY = "rocksdb.num-immutable-mem-table";
3838

39-
public static final String ROCKSDB_DISK_TOTAL_SST_FILES_SIZE = "rocksdb.disk.total_sst_files_size";
40-
public static final String ROCKSDB_DISK_LIVE_SST_FILES_SIZE = "rocksdb.disk.live_sst_files_size";
41-
public static final String ROCKSDB_BLOCK_CACHE_CAPACITY = "rocksdb.block_cache.capacity";
42-
public static final String ROCKSDB_BLOCK_CACHE_USAGE = "rocksdb.block_cache.usage";
43-
public static final String ROCKSDB_BLOCK_CACHE_HIT_RATIO = "rocksdb.block_cache.hit_ratio";
44-
public static final String ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO = "rocksdb.block_cache.data_hit_ratio";
45-
public static final String ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO = "rocksdb.block_cache.index_hit_ratio";
46-
public static final String ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO = "rocksdb.block_cache.filter_hit_ratio";
47-
48-
public static final String ROCKSDB_MEMTABLE_ACTIVE_SIZE = "rocksdb.memtable.active_size";
49-
public static final String ROCKSDB_MEMTABLE_ALL_SIZE = "rocksdb.memtable.all_size";
50-
public static final String ROCKSDB_MEMTABLE_IMMUTABLE_COUNT = "rocksdb.memtable.immutable_count";
51-
public static final String ROCKSDB_MEMTABLE_HIT_RATIO = "rocksdb.memtable.hit_ratio";
39+
public static final String ROCKSDB_DISK_TOTAL_SST_FILES_SIZE = "rocksdb.disk.totalSstFilesSize";
40+
public static final String ROCKSDB_DISK_LIVE_SST_FILES_SIZE = "rocksdb.disk.liveSstFilesSize";
41+
public static final String ROCKSDB_BLOCK_CACHE_CAPACITY = "rocksdb.blockCache.capacity";
42+
public static final String ROCKSDB_BLOCK_CACHE_USAGE = "rocksdb.blockCache.usage";
43+
public static final String ROCKSDB_BLOCK_CACHE_HIT_RATIO = "rocksdb.blockCache.hitRatio";
44+
public static final String ROCKSDB_BLOCK_CACHE_DATA_HIT_RATIO = "rocksdb.blockCache.dataHitRatio";
45+
public static final String ROCKSDB_BLOCK_CACHE_INDEX_HIT_RATIO = "rocksdb.blockCache.indexHitRatio";
46+
public static final String ROCKSDB_BLOCK_CACHE_FILTER_HIT_RATIO = "rocksdb.blockCache.filterHitRatio";
47+
48+
public static final String ROCKSDB_MEMTABLE_ACTIVE_SIZE = "rocksdb.memtable.activeSize";
49+
public static final String ROCKSDB_MEMTABLE_ALL_SIZE = "rocksdb.memtable.allSize";
50+
public static final String ROCKSDB_MEMTABLE_IMMUTABLE_COUNT = "rocksdb.memtable.immutableCount";
51+
public static final String ROCKSDB_MEMTABLE_HIT_RATIO = "rocksdb.memtable.hitRatio";
5252

5353
private final RocksDBIndexBackend rocksDBIndexBackend;
5454

hudi-flink-datasource/hudi-flink/src/main/java/org/apache/hudi/sink/partitioner/BucketAssignFunction.java

Lines changed: 21 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
import org.apache.hudi.configuration.HadoopConfigurations;
3131
import org.apache.hudi.configuration.OptionsResolver;
3232
import org.apache.hudi.hadoop.fs.HadoopFSUtils;
33+
import org.apache.hudi.metrics.FlinkBucketAssignMetrics;
3334
import org.apache.hudi.sink.event.Correspondent;
3435
import org.apache.hudi.sink.partitioner.index.IndexBackend;
3536
import org.apache.hudi.sink.partitioner.index.IndexBackendFactory;
@@ -50,6 +51,7 @@
5051
import org.apache.flink.types.RowKind;
5152
import org.apache.flink.util.Collector;
5253

54+
import java.io.IOException;
5355
import java.util.Objects;
5456

5557
/**
@@ -86,6 +88,8 @@ public class BucketAssignFunction
8688
@Getter
8789
private transient IndexBackend indexBackend;
8890

91+
private transient FlinkBucketAssignMetrics bucketAssignMetrics;
92+
8993
/**
9094
* Bucket assigner to assign new bucket IDs or reuse existing ones.
9195
*/
@@ -137,6 +141,10 @@ public void open(Configuration parameters) throws Exception {
137141
@Override
138142
public void initializeState(FunctionInitializationContext context) throws Exception {
139143
this.indexBackend = IndexBackendFactory.create(conf, context, getRuntimeContext());
144+
this.bucketAssignMetrics = new FlinkBucketAssignMetrics(
145+
getRuntimeContext().getMetricGroup(),
146+
conf.get(FlinkOptions.BUCKET_ASSIGN_METRIC_SAMPLING_RATE));
147+
this.bucketAssignMetrics.registerMetrics();
140148
this.indexBackend.registerMetrics(getRuntimeContext().getMetricGroup());
141149
}
142150

@@ -166,7 +174,7 @@ protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Co
166174
// Only changing records need looking up the index for the location,
167175
// append only records are always recognized as INSERT.
168176
// Structured as Tuple(partition, fileId, instantTime).
169-
HoodieRecordGlobalLocation oldLoc = indexBackend.get(recordKey);
177+
HoodieRecordGlobalLocation oldLoc = getRecordLocation(recordKey);
170178
if (oldLoc != null) {
171179
// Set up the instant time as "U" to mark the bucket as an update bucket.
172180
String partitionFromState = oldLoc.getPartitionPath();
@@ -191,8 +199,10 @@ protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Co
191199
location = oldLoc.toLocal("U");
192200
this.bucketAssigner.addUpdate(partitionPath, location.getFileId());
193201
}
202+
bucketAssignMetrics.markUpdate();
194203
} else {
195204
location = getNewRecordLocation(partitionPath);
205+
bucketAssignMetrics.markInsert();
196206
}
197207
// refresh the index only when the location is updated.
198208
if (oldLoc == null || !oldLoc.getFileId().equals(location.getFileId())) {
@@ -209,6 +219,16 @@ protected void processRecord(HoodieFlinkInternalRow record, String recordKey, Co
209219
out.collect(record);
210220
}
211221

222+
private HoodieRecordGlobalLocation getRecordLocation(String recordKey) throws IOException {
223+
if (!bucketAssignMetrics.shouldSampleLookup()) {
224+
return indexBackend.get(recordKey);
225+
}
226+
long startMs = System.currentTimeMillis();
227+
HoodieRecordGlobalLocation oldLoc = indexBackend.get(recordKey);
228+
bucketAssignMetrics.recordLookupDuration(System.currentTimeMillis() - startMs);
229+
return oldLoc;
230+
}
231+
212232
protected HoodieRecordLocation getNewRecordLocation(String partitionPath) {
213233
final BucketInfo bucketInfo = this.bucketAssigner.addInsert(partitionPath);
214234
final HoodieRecordLocation location;
Lines changed: 91 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,91 @@
1+
/*
2+
* Licensed to the Apache Software Foundation (ASF) under one
3+
* or more contributor license agreements. See the NOTICE file
4+
* distributed with this work for additional information
5+
* regarding copyright ownership. The ASF licenses this file
6+
* to you under the Apache License, Version 2.0 (the
7+
* "License"); you may not use this file except in compliance
8+
* with the License. You may obtain a copy of the License at
9+
*
10+
* http://www.apache.org/licenses/LICENSE-2.0
11+
*
12+
* Unless required by applicable law or agreed to in writing, software
13+
* distributed under the License is distributed on an "AS IS" BASIS,
14+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
15+
* See the License for the specific language governing permissions and
16+
* limitations under the License.
17+
*/
18+
19+
package org.apache.hudi.metrics;
20+
21+
import org.apache.flink.metrics.Gauge;
22+
import org.apache.flink.metrics.MetricGroup;
23+
import org.junit.jupiter.api.Test;
24+
25+
import java.util.HashMap;
26+
import java.util.Map;
27+
28+
import static org.junit.jupiter.api.Assertions.assertEquals;
29+
import static org.junit.jupiter.api.Assertions.assertFalse;
30+
import static org.junit.jupiter.api.Assertions.assertTrue;
31+
import static org.mockito.ArgumentMatchers.any;
32+
import static org.mockito.ArgumentMatchers.anyString;
33+
import static org.mockito.Mockito.doAnswer;
34+
import static org.mockito.Mockito.mock;
35+
36+
/**
37+
* Test cases for {@link FlinkBucketAssignMetrics}.
38+
*/
39+
public class TestFlinkBucketAssignMetrics {
40+
41+
@Test
42+
void testMetricsRegistrationAndDefaults() {
43+
MetricGroup metricGroup = mock(MetricGroup.class);
44+
Map<String, Gauge<?>> gauges = new HashMap<>();
45+
doAnswer(invocation -> {
46+
String name = invocation.getArgument(0);
47+
Gauge<?> gauge = invocation.getArgument(1);
48+
gauges.put(name, gauge);
49+
return gauge;
50+
}).when(metricGroup).gauge(anyString(), any(Gauge.class));
51+
52+
FlinkBucketAssignMetrics metrics = new FlinkBucketAssignMetrics(metricGroup, 1000);
53+
metrics.registerMetrics();
54+
55+
assertTrue(gauges.containsKey(FlinkBucketAssignMetrics.INDEX_LOOKUP_DURATION_MS));
56+
assertTrue(gauges.containsKey(FlinkBucketAssignMetrics.INSERT_COUNT));
57+
assertTrue(gauges.containsKey(FlinkBucketAssignMetrics.UPDATE_COUNT));
58+
assertEquals(0D, ((Number) gauges.get(FlinkBucketAssignMetrics.INDEX_LOOKUP_DURATION_MS).getValue()).doubleValue());
59+
assertEquals(0L, ((Number) gauges.get(FlinkBucketAssignMetrics.INSERT_COUNT).getValue()).longValue());
60+
assertEquals(0L, ((Number) gauges.get(FlinkBucketAssignMetrics.UPDATE_COUNT).getValue()).longValue());
61+
}
62+
63+
@Test
64+
void testLookupSamplingAndCounts() {
65+
MetricGroup metricGroup = mock(MetricGroup.class);
66+
Map<String, Gauge<?>> gauges = new HashMap<>();
67+
doAnswer(invocation -> {
68+
String name = invocation.getArgument(0);
69+
Gauge<?> gauge = invocation.getArgument(1);
70+
gauges.put(name, gauge);
71+
return gauge;
72+
}).when(metricGroup).gauge(anyString(), any(Gauge.class));
73+
74+
FlinkBucketAssignMetrics metrics = new FlinkBucketAssignMetrics(metricGroup, 3);
75+
metrics.registerMetrics();
76+
77+
assertFalse(metrics.shouldSampleLookup());
78+
assertFalse(metrics.shouldSampleLookup());
79+
assertTrue(metrics.shouldSampleLookup());
80+
81+
metrics.recordLookupDuration(3L);
82+
metrics.recordLookupDuration(5L);
83+
metrics.markInsert();
84+
metrics.markInsert();
85+
metrics.markUpdate();
86+
87+
assertEquals(5D, ((Number) gauges.get(FlinkBucketAssignMetrics.INDEX_LOOKUP_DURATION_MS).getValue()).doubleValue());
88+
assertEquals(2L, ((Number) gauges.get(FlinkBucketAssignMetrics.INSERT_COUNT).getValue()).longValue());
89+
assertEquals(1L, ((Number) gauges.get(FlinkBucketAssignMetrics.UPDATE_COUNT).getValue()).longValue());
90+
}
91+
}

0 commit comments

Comments
 (0)