-
Notifications
You must be signed in to change notification settings - Fork 519
[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation. #3026
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -53,6 +53,7 @@ | |
| import org.apache.fluss.rpc.protocol.ApiError; | ||
| import org.apache.fluss.rpc.protocol.Errors; | ||
| import org.apache.fluss.rpc.util.PredicateMessageUtils; | ||
| import org.apache.fluss.shaded.arrow.org.apache.arrow.memory.ChunkedAllocationManager; | ||
| import org.apache.fluss.shaded.netty4.io.netty.buffer.ByteBuf; | ||
| import org.apache.fluss.utils.IOUtils; | ||
| import org.apache.fluss.utils.Projection; | ||
|
|
@@ -94,6 +95,7 @@ public class LogFetcher implements Closeable { | |
| // currently can only do project when generate scanRecord instead of doing project while read | ||
| // bytes from remote file. | ||
| private final LogRecordReadContext remoteReadContext; | ||
| private final ChunkedAllocationManager.ChunkedFactory chunkedFactory; | ||
| @Nullable private final Projection projection; | ||
| @Nullable private final org.apache.fluss.rpc.messages.PbPredicate cachedPbPredicate; | ||
| private final int filterSchemaId; | ||
|
|
@@ -128,10 +130,13 @@ public LogFetcher( | |
| SchemaGetter schemaGetter) { | ||
| this.tablePath = tableInfo.getTablePath(); | ||
| this.isPartitioned = tableInfo.isPartitioned(); | ||
| this.chunkedFactory = new ChunkedAllocationManager.ChunkedFactory(); | ||
| this.readContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, false, projection, schemaGetter, chunkedFactory); | ||
| this.remoteReadContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Was the write path affected by the OOM? Bump-pointer is ideal for read (exact-size, batch-release), but writes do grow-and-copy reallocation and dead space stays in the chunk until full drain. Might be worth keeping the default allocator here, though we are talking about peak allocations only
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Yes, when I test 1000+ columns, compress also need many small off-heap allocation for each column(ArrowWriter.initFieldVector). The error log is: 2026-04-08 10:20:27
java.io.IOException: Failed to send record to table fluss.table1(p=2026040810). Writer state: running
at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:162)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:246)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1146)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:768)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
at java.base/java.lang.Thread.run(Thread.java:991)
Caused by: org.apache.fluss.exception.FlussRuntimeException: Failed to send record to table fluss.t_geely_2x_realtime_signal(p=2026040810). Writer state: running
at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:217)
at org.apache.fluss.client.write.WriterClient.send(WriterClient.java:140)
at org.apache.fluss.client.table.writer.AbstractTableWriter.send(AbstractTableWriter.java:66)
at org.apache.fluss.client.table.writer.AppendWriterImpl.append(AppendWriterImpl.java:87)
at org.apache.fluss.flink.sink.writer.AppendSinkWriter.writeRow(AppendSinkWriter.java:62)
at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:146)
... 14 more
Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:355)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:350)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:338)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:316)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:280)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateBytes(BaseVariableWidthVector.java:462)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:420)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:429)
at org.apache.fluss.row.arrow.ArrowWriter.initFieldVector(ArrowWriter.java:309)
at org.apache.fluss.row.arrow.ArrowWriter.<init>(ArrowWriter.java:130)
at org.apache.fluss.row.arrow.ArrowWriterPool.lambda$getOrCreateWriter$3(ArrowWriterPool.java:107)
at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
at org.apache.fluss.row.arrow.ArrowWriterPool.getOrCreateWriter(ArrowWriterPool.java:93)
at org.apache.fluss.client.write.RecordAccumulator.appendNewBatch(RecordAccumulator.java:599)
at org.apache.fluss.client.write.RecordAccumulator.append(RecordAccumulator.java:210)
at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:205)
... 19 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:177)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
... 40 more |
||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, true, projection, schemaGetter, chunkedFactory); | ||
| this.projection = projection; | ||
| this.cachedPbPredicate = | ||
| recordBatchFilter != null | ||
|
|
@@ -603,6 +608,7 @@ public synchronized void close() throws IOException { | |
| IOUtils.closeQuietly(remoteLogDownloader, "remoteLogDownloader"); | ||
| readContext.close(); | ||
| remoteReadContext.close(); | ||
| chunkedFactory.close(); | ||
| isClosed = true; | ||
| LOG.info("Fetcher for {} is closed.", tablePath); | ||
| } | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In
parseLimitScanResponse, the newly createdLogRecordReadContextis never closed. SinceLogRecordReadContextowns Arrow resources (VectorSchemaRoots +BufferAllocator), this will leak direct memory and native buffers. Please wrap theLogRecordReadContextin a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.