Skip to content

Commit cc550c9

Browse files
committed
[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.
1 parent f6ec0a3 commit cc550c9

6 files changed

Lines changed: 474 additions & 18 deletions

File tree

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -18,6 +18,7 @@
1818
package org.apache.fluss.client.table.scanner.batch;
1919

2020
import org.apache.fluss.client.metadata.MetadataUpdater;
21+
import org.apache.fluss.compression.ChunkedAllocationManager;
2122
import org.apache.fluss.exception.LeaderNotAvailableException;
2223
import org.apache.fluss.metadata.KvFormat;
2324
import org.apache.fluss.metadata.SchemaGetter;
@@ -164,7 +165,12 @@ private List<InternalRow> parseLimitScanResponse(LimitScanResponse limitScanResp
164165
}
165166
} else {
166167
LogRecordReadContext readContext =
167-
LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter);
168+
LogRecordReadContext.createReadContext(
169+
tableInfo,
170+
false,
171+
null,
172+
schemaGetter,
173+
new ChunkedAllocationManager.ChunkedFactory());
168174
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
169175
for (LogRecordBatch logRecordBatch : records.batches()) {
170176
// A batch of log record maybe little more than limit, thus we need slice the

fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,7 @@
2323
import org.apache.fluss.client.metrics.ScannerMetricGroup;
2424
import org.apache.fluss.client.table.scanner.RemoteFileDownloader;
2525
import org.apache.fluss.cluster.BucketLocation;
26+
import org.apache.fluss.compression.ChunkedAllocationManager;
2627
import org.apache.fluss.config.ConfigOptions;
2728
import org.apache.fluss.config.Configuration;
2829
import org.apache.fluss.exception.ApiException;
@@ -123,10 +124,14 @@ public LogFetcher(
123124
SchemaGetter schemaGetter) {
124125
this.tablePath = tableInfo.getTablePath();
125126
this.isPartitioned = tableInfo.isPartitioned();
127+
ChunkedAllocationManager.ChunkedFactory chunkedFactory =
128+
new ChunkedAllocationManager.ChunkedFactory();
126129
this.readContext =
127-
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
130+
LogRecordReadContext.createReadContext(
131+
tableInfo, false, projection, schemaGetter, chunkedFactory);
128132
this.remoteReadContext =
129-
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
133+
LogRecordReadContext.createReadContext(
134+
tableInfo, true, projection, schemaGetter, chunkedFactory);
130135
this.projection = projection;
131136
this.logScannerStatus = logScannerStatus;
132137
this.maxFetchBytes =

fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java

Lines changed: 3 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -22,6 +22,7 @@
2222
import org.apache.fluss.client.metrics.WriterMetricGroup;
2323
import org.apache.fluss.cluster.BucketLocation;
2424
import org.apache.fluss.cluster.Cluster;
25+
import org.apache.fluss.compression.ChunkedAllocationManager;
2526
import org.apache.fluss.config.ConfigOptions;
2627
import org.apache.fluss.config.Configuration;
2728
import org.apache.fluss.exception.FlussRuntimeException;
@@ -36,7 +37,6 @@
3637
import org.apache.fluss.row.arrow.ArrowWriter;
3738
import org.apache.fluss.row.arrow.ArrowWriterPool;
3839
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
39-
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
4040
import org.apache.fluss.utils.CopyOnWriteMap;
4141
import org.apache.fluss.utils.MathUtils;
4242
import org.apache.fluss.utils.clock.Clock;
@@ -63,6 +63,7 @@
6363

6464
import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
6565
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
66+
import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocatorUtil.createBufferAllocator;
6667
import static org.apache.fluss.utils.Preconditions.checkNotNull;
6768

6869
/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
@@ -134,7 +135,7 @@ public final class RecordAccumulator {
134135
Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes());
135136

136137
this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf);
137-
this.bufferAllocator = new RootAllocator(Long.MAX_VALUE);
138+
this.bufferAllocator = createBufferAllocator(new ChunkedAllocationManager.ChunkedFactory());
138139
this.arrowWriterPool = new ArrowWriterPool(bufferAllocator);
139140
this.incomplete = new IncompleteBatches();
140141
this.nodesDrainIndex = new HashMap<>();

0 commit comments

Comments
 (0)