Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -37,6 +37,7 @@
import org.apache.fluss.rpc.gateway.TabletServerGateway;
import org.apache.fluss.rpc.messages.LimitScanRequest;
import org.apache.fluss.rpc.messages.LimitScanResponse;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager;
import org.apache.fluss.types.RowType;
import org.apache.fluss.utils.CloseableIterator;
import org.apache.fluss.utils.SchemaUtil;
Expand Down Expand Up @@ -66,6 +67,8 @@ public class LimitBatchScanner implements BatchScanner {
private final SchemaGetter schemaGetter;
private final KvFormat kvFormat;
private final int targetSchemaId;
/** The chunked allocation manager factory to reuse memory for arrow log write batch. */
private final ChunkedAllocationManager.ChunkedFactory chunkedFactory;

/**
* A cache for schema projection mapping from source schema to target. Use HashMap here, because
Expand Down Expand Up @@ -116,6 +119,7 @@ public LimitBatchScanner(

this.kvFormat = tableInfo.getTableConfig().getKvFormat();
this.endOfInput = false;
this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory();
}

@Nullable
Expand Down Expand Up @@ -164,7 +168,8 @@ private List<InternalRow> parseLimitScanResponse(LimitScanResponse limitScanResp
}
} else {
LogRecordReadContext readContext =
LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, false, null, schemaGetter, chunkedFactory);
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
Comment on lines 170 to 173
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.

Copilot uses AI. Check for mistakes.
for (LogRecordBatch logRecordBatch : records.batches()) {
// A batch of log record maybe little more than limit, thus we need slice the
Expand Down Expand Up @@ -203,5 +208,7 @@ private InternalRow maybeProject(InternalRow originRow) {
@Override
public void close() throws IOException {
scanFuture.cancel(true);
// Release off-heap memory held by the chunked allocation manager factory.
chunkedFactory.close();
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -53,6 +53,7 @@
import org.apache.fluss.rpc.protocol.ApiError;
import org.apache.fluss.rpc.protocol.Errors;
import org.apache.fluss.rpc.util.PredicateMessageUtils;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager;
import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf;
import org.apache.fluss.utils.IOUtils;
import org.apache.fluss.utils.Projection;
Expand Down Expand Up @@ -94,6 +95,7 @@ public class LogFetcher implements Closeable {
// currently can only do project when generate scanRecord instead of doing project while read
// bytes from remote file.
private final LogRecordReadContext remoteReadContext;
private final ChunkedAllocationManager.ChunkedFactory chunkedFactory;
@Nullable private final Projection projection;
@Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate;
private final int filterSchemaId;
Expand Down Expand Up @@ -128,10 +130,13 @@ public LogFetcher(
SchemaGetter schemaGetter) {
this.tablePath = tableInfo.getTablePath();
this.isPartitioned = tableInfo.isPartitioned();
this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory();
this.readContext =
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, false, projection, schemaGetter, chunkedFactory);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the write path affected by the OOM? Bump-pointer is ideal for read (exact-size, batch-release), but writes do grow-and-copy reallocation and dead space stays in the chunk until full drain.

Might be worth keeping the default allocator here, though we are talking about peak allocations only

Copy link
Copy Markdown
Contributor Author

@loserwang1024 loserwang1024 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the write path affected by the OOM?

Yes, when I test 1000+ columns, compress also need many small off-heap allocation for each column(ArrowWriter.initFieldVector). The error log is:

2026-04-08 10:20:27
java.io.IOException: Failed to send record to table fluss.table1(p=2026040810). Writer state: running
	at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:162)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:246)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:706)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1202)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1146)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:768)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
	at java.base/java.lang.Thread.run(Thread.java:991)
Caused by: org.apache.fluss.exception.FlussRuntimeException: Failed to send record to table fluss.t_geely_2x_realtime_signal(p=2026040810). Writer state: running
	at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:217)
	at org.apache.fluss.client.write.WriterClient.send(WriterClient.java:140)
	at org.apache.fluss.client.table.writer.AbstractTableWriter.send(AbstractTableWriter.java:66)
	at org.apache.fluss.client.table.writer.AppendWriterImpl.append(AppendWriterImpl.java:87)
	at org.apache.fluss.flink.sink.writer.AppendSinkWriter.writeRow(AppendSinkWriter.java:62)
	at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:146)
	... 14 more
Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:355)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:350)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:338)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:316)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:280)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateBytes(BaseVariableWidthVector.java:462)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:420)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:429)
	at org.apache.fluss.row.arrow.ArrowWriter.initFieldVector(ArrowWriter.java:309)
	at org.apache.fluss.row.arrow.ArrowWriter.<init>(ArrowWriter.java:130)
	at org.apache.fluss.row.arrow.ArrowWriterPool.lambda$getOrCreateWriter$3(ArrowWriterPool.java:107)
	at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
	at org.apache.fluss.row.arrow.ArrowWriterPool.getOrCreateWriter(ArrowWriterPool.java:93)
	at org.apache.fluss.client.write.RecordAccumulator.appendNewBatch(RecordAccumulator.java:599)
	at org.apache.fluss.client.write.RecordAccumulator.append(RecordAccumulator.java:210)
	at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:205)
	... 19 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
	at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:177)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	... 40 more

LogRecordReadContext.createReadContext(
tableInfo, true, projection, schemaGetter, chunkedFactory);
this.projection = projection;
this.cachedPbPredicate =
recordBatchFilter != null
Expand Down Expand Up @@ -603,6 +608,7 @@ public synchronized void close() throws IOException {
IOUtils.closeQuietly(remoteLogDownloader, "remoteLogDownloader");
readContext.close();
remoteReadContext.close();
chunkedFactory.close();
isClosed = true;
LOG.info("Fetcher for {} is closed.", tablePath);
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -35,8 +35,8 @@
import org.apache.fluss.record.LogRecordBatchStatisticsCollector;
import org.apache.fluss.row.arrow.ArrowWriter;
import org.apache.fluss.row.arrow.ArrowWriterPool;
import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager;
import org.apache.fluss.utils.CopyOnWriteMap;
import org.apache.fluss.utils.MathUtils;
import org.apache.fluss.utils.clock.Clock;
Expand All @@ -63,6 +63,7 @@

import static org.apache.fluss.record.LogRecordBatchFormat.NO_BATCH_SEQUENCE;
import static org.apache.fluss.record.LogRecordBatchFormat.NO_WRITER_ID;
import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil.createBufferAllocator;
import static org.apache.fluss.utils.Preconditions.checkNotNull;

/* This file is based on source code of Apache Kafka Project (https://kafka.apache.org/), licensed by the Apache
Expand Down Expand Up @@ -99,6 +100,9 @@ public final class RecordAccumulator {
/** The arrow buffer allocator to allocate memory for arrow log write batch. */
private final BufferAllocator bufferAllocator;

/** The chunked allocation manager factory, stored for explicit native memory release. */
private final ChunkedAllocationManager.ChunkedFactory chunkedFactory;

/** The pool of lazily created arrow {@link ArrowWriter}s for arrow log write batch. */
private final ArrowWriterPool arrowWriterPool;

Expand Down Expand Up @@ -134,7 +138,8 @@ public final class RecordAccumulator {
Math.max(1, (int) conf.get(ConfigOptions.CLIENT_WRITER_BATCH_SIZE).getBytes());

this.writerBufferPool = LazyMemorySegmentPool.createWriterBufferPool(conf);
this.bufferAllocator = BufferAllocatorUtil.createBufferAllocator();
this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory();
this.bufferAllocator = createBufferAllocator(chunkedFactory);
this.arrowWriterPool = new ArrowWriterPool(bufferAllocator);
this.incomplete = new IncompleteBatches();
this.nodesDrainIndex = new HashMap<>();
Expand Down Expand Up @@ -964,6 +969,8 @@ public void close() {
// Release all the memory segments.
bufferAllocator.releaseBytes(bufferAllocator.getAllocatedMemory());
bufferAllocator.close();
// Release native memory held by the chunked allocation manager factory.
chunkedFactory.close();
}

/** Per table bucket and write batches. */
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -25,8 +25,10 @@
import org.apache.fluss.row.InternalRow;
import org.apache.fluss.row.InternalRow.FieldGetter;
import org.apache.fluss.row.ProjectedRow;
import org.apache.fluss.row.arrow.memory.BufferAllocatorUtil;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationManager;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocatorUtil;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager;
import org.apache.fluss.shaded.arrow.org.apache.arrow.vector.VectorSchemaRoot;
import org.apache.fluss.types.DataType;
import org.apache.fluss.types.RowType;
Expand Down Expand Up @@ -68,6 +70,29 @@ public static LogRecordReadContext createReadContext(
boolean readFromRemote,
@Nullable Projection projection,
SchemaGetter schemaGetter) {
return createReadContext(
tableInfo,
readFromRemote,
projection,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
}

/**
* Creates a {@link LogRecordReadContext} with a custom {@link AllocationManager.Factory}.
*
* @param tableInfo the table info of the table to read
* @param readFromRemote whether the data is read from remote storage
* @param projection the projection to apply, or null for all fields
* @param schemaGetter the schema getter to resolve schema by id
* @param allocationManagerFactory the factory for creating Arrow memory allocations
*/
public static LogRecordReadContext createReadContext(
TableInfo tableInfo,
boolean readFromRemote,
@Nullable Projection projection,
SchemaGetter schemaGetter,
AllocationManager.Factory allocationManagerFactory) {
RowType rowType = tableInfo.getRowType();
LogFormat logFormat = tableInfo.getTableConfig().getLogFormat();
// only for arrow log format, the projection can be push downed to the server side
Expand All @@ -84,7 +109,12 @@ public static LogRecordReadContext createReadContext(
// so set the rowType as is.
int[] selectedFields = projection.getProjection();
return createArrowReadContext(
rowType, schemaId, selectedFields, false, schemaGetter);
rowType,
schemaId,
selectedFields,
false,
schemaGetter,
allocationManagerFactory);
} else {
// arrow data that returned from server has been projected (in order)
RowType projectedRowType = projection.projectInOrder(rowType);
Expand All @@ -95,7 +125,8 @@ public static LogRecordReadContext createReadContext(
schemaId,
selectedFields,
projectionPushDowned,
schemaGetter);
schemaGetter,
allocationManagerFactory);
}
} else if (logFormat == LogFormat.INDEXED) {
int[] selectedFields = projection.getProjection();
Expand All @@ -113,9 +144,11 @@ private static LogRecordReadContext createArrowReadContext(
int schemaId,
int[] selectedFields,
boolean projectionPushDowned,
SchemaGetter schemaGetter) {
SchemaGetter schemaGetter,
AllocationManager.Factory allocationManagerFactory) {
// TODO: use a more reasonable memory limit
BufferAllocator allocator = BufferAllocatorUtil.createBufferAllocator();
BufferAllocator allocator =
BufferAllocatorUtil.createBufferAllocator(allocationManagerFactory);
FieldGetter[] fieldGetters = buildProjectedFieldGetters(dataRowType, selectedFields);
return new LogRecordReadContext(
LogFormat.ARROW,
Expand All @@ -139,7 +172,13 @@ private static LogRecordReadContext createArrowReadContext(
public static LogRecordReadContext createArrowReadContext(
RowType rowType, int schemaId, SchemaGetter schemaGetter) {
int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray();
return createArrowReadContext(rowType, schemaId, selectedFields, false, schemaGetter);
return createArrowReadContext(
rowType,
schemaId,
selectedFields,
false,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
Comment thread
loserwang1024 marked this conversation as resolved.
}

@VisibleForTesting
Expand All @@ -150,7 +189,12 @@ public static LogRecordReadContext createArrowReadContext(
boolean projectionPushDowned) {
int[] selectedFields = IntStream.range(0, rowType.getFieldCount()).toArray();
return createArrowReadContext(
rowType, schemaId, selectedFields, projectionPushDowned, schemaGetter);
rowType,
schemaId,
selectedFields,
projectionPushDowned,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
}

/**
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,7 +17,7 @@
* under the License.
*/

package org.apache.fluss.row.arrow.memory;
package org.apache.fluss.shaded.arrow.org.apache.arrow.memory;

import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.rounding.RoundingPolicy;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.util.CommonUtil;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -17,19 +17,26 @@
* under the License.
*/

package org.apache.fluss.row.arrow.memory;
package org.apache.fluss.shaded.arrow.org.apache.arrow.memory;

import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.AllocationListener;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BufferAllocator;
import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator;
import javax.annotation.Nullable;

import static org.apache.fluss.row.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY;
import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ArrowRoundingPolicy.ARROW_ROUNDING_POLICY;

/** Utility class for creating Arrow BufferAllocators with the custom ArrowRoundingPolicy. */
public class BufferAllocatorUtil {

/** Creates a {@link BufferAllocator} configured with the {@link ArrowRoundingPolicy}. */
public static BufferAllocator createBufferAllocator() {
return new RootAllocator(AllocationListener.NOOP, Long.MAX_VALUE, ARROW_ROUNDING_POLICY);
public static BufferAllocator createBufferAllocator(
@Nullable AllocationManager.Factory allocationManagerFactory) {
ImmutableConfig.Builder builder =
ImmutableConfig.builder()
.listener(AllocationListener.NOOP)
.maxAllocation(Long.MAX_VALUE)
.roundingPolicy(ARROW_ROUNDING_POLICY);
if (allocationManagerFactory != null) {
builder.allocationManagerFactory(allocationManagerFactory);
}
return new RootAllocator(builder.build());
}
}
Loading