Skip to content

[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026

Open
loserwang1024 wants to merge 4 commits intoapache:mainfrom
loserwang1024:arrow-memory-manager
Open

[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026
loserwang1024 wants to merge 4 commits intoapache:mainfrom
loserwang1024:arrow-memory-manager

Conversation

@loserwang1024
Copy link
Copy Markdown
Contributor

(The sections below can be removed for hotfixes or typos)
-->

Purpose

Linked issue: close #3025

Brief change log

Tests

API and Format

Documentation

Copy link
Copy Markdown
Contributor

Copilot AI left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Pull request overview

This PR addresses Arrow memory fragmentation/OOM during high-column-count decompression by replacing Netty-backed pooled allocations with a bump-pointer, chunk-recycling allocation strategy and wiring it into client read/write paths.

Changes:

  • Introduces ChunkedAllocationManager (bump-pointer sub-allocation within reusable native chunks) for Arrow allocations.
  • Adds allocator construction utilities (AllocatorUtil) and a custom rounding policy (FlussRoundingPolicy) used by the new allocator setup.
  • Updates client read/write components to use the new allocation manager (e.g., LogRecordReadContext, RecordAccumulator, LogFetcher, LimitBatchScanner).

Reviewed changes

Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.

Show a summary per file
File Description
fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java Centralizes creation of a RootAllocator configured with a custom AllocationManager.Factory.
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java Allows injecting an AllocationManager.Factory and defaults Arrow read contexts to ChunkedFactory.
fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java Adds a custom rounding policy used in allocator configuration.
fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java Implements the new chunked bump-pointer allocation manager + factory/pool.
fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java Switches Arrow allocator used for writing to the new chunked allocator.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java Shares a ChunkedFactory across local/remote read contexts for scanning.
fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java Uses chunked allocator in read context for limit scans (Arrow path).

💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.

Comment on lines +297 to +307
// Align to 8 bytes for safe direct-memory access.
long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1);

if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) {
// Current chunk is full or doesn't exist — obtain a recycled or new chunk.
activeChunk = obtainChunk();
}

long offset = activeChunk.bumpAllocate(alignedSize);
return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size);
}
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory.create() aligns allocations to 8 bytes (alignedSize) but the created ChunkedAllocationManager reports allocatedSize = size (unaligned). This can make Arrow's allocator accounting inconsistent with the actual bytes carved out of the chunk (e.g., size=1/2/4 becomes 8 bytes reserved) and may also return a buffer size that doesn't reflect the backing slice. Consider tracking and reporting the aligned size (or otherwise ensuring accounting matches the real reserved bytes).

Copilot uses AI. Check for mistakes.
Comment on lines +354 to +366
/**
* Closes this factory, freeing all cached chunks. Active chunks with outstanding
* sub-allocations will be freed when their last ArrowBuf is released.
*/
public synchronized void close() {
while (!freeChunks.isEmpty()) {
freeChunks.poll().destroy();
}
if (activeChunk != null && activeChunk.subAllocCount.get() == 0) {
activeChunk.destroy();
}
activeChunk = null;
}
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory allocates and caches native chunks (Unsafe.allocateMemory) and provides a close() method to free cached chunks, but no call sites invoke ChunkedFactory.close() (and it is not an @Override). If Arrow/RootAllocator does not automatically close the factory, this will retain native memory even after BufferAllocator.close(). Please wire factory closure into the owning component lifecycle (e.g., keep a reference and close it alongside the allocator) or implement the appropriate closeable interface expected by Arrow so it is closed automatically.

Copilot uses AI. Check for mistakes.
* @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct.
* @param maxFreeChunks maximum number of empty chunks to keep cached for reuse.
*/
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedFactory(long chunkSize, int maxFreeChunks) does not validate its inputs. Passing chunkSize <= 0 or maxFreeChunks < 0 can lead to Unsafe.allocateMemory errors or a factory that never recycles/frees chunks as intended. Please add argument validation (and fail fast with a clear exception).

Suggested change
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
public ChunkedFactory(long chunkSize, int maxFreeChunks) {
if (chunkSize <= 0) {
throw new IllegalArgumentException(
"chunkSize must be > 0, but was " + chunkSize);
}
if (maxFreeChunks < 0) {
throw new IllegalArgumentException(
"maxFreeChunks must be >= 0, but was " + maxFreeChunks);
}

Copilot uses AI. Check for mistakes.
}

private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) {
if (maxOrder > 14) {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

validateAndCalculateChunkSize rejects maxOrder > 14 but does not reject negative values, even though the error message says the expected range is 0-14. This can silently produce an invalid chunkSize when -Dorg.apache.memory.allocator.maxOrder is negative. Please validate maxOrder >= 0 as well.

Suggested change
if (maxOrder > 14) {
if (maxOrder < 0 || maxOrder > 14) {

Copilot uses AI. Check for mistakes.
Comment on lines +27 to +32
* A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty
* 4.1+ memory allocation behavior.
*
* <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's
* maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9.
*
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The JavaDoc for FlussRoundingPolicy is now misleading in the context of this PR: it describes aligning Arrow to Netty’s chunking behavior and “reducing Arrow’s chunk size”, but the PR is explicitly replacing Netty pooling with ChunkedAllocationManager. Please update the documentation to reflect the new allocator strategy (or, if this rounding policy is still needed, explain why it’s still relevant without Netty).

Suggested change
* A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty
* 4.1+ memory allocation behavior.
*
* <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's
* maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9.
*
* A custom rounding policy that uses Fluss's configured Arrow chunk size instead of relying on
* Arrow's historical default chunk sizing.
*
* <p>By default, Arrow uses {@code maxOrder=11}, which yields 16MB chunks for the default page
* size. Fluss defaults {@code org.apache.memory.allocator.maxOrder} to {@code 9}, which yields
* 4MB chunks and keeps rounding behavior aligned with the chunk size used by Fluss's allocation
* strategy.
*
* <p>This policy is still needed without Netty because requests smaller than the configured chunk
* size are rounded to the next power of two, while larger requests are left unchanged. Using the
* configured chunk size here avoids Arrow rounding small allocations as if the chunk were larger
* than the allocator actually uses.
*

Copilot uses AI. Check for mistakes.
Comment on lines +18 to +36
package org.apache.fluss.shaded.arrow.org.apache.arrow.memory;

import org.apache.fluss.record.FlussRoundingPolicy;

import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder;

/** Util to create UNSAFE_ALLOCATOR. */
public class AllocatorUtil {

private AllocatorUtil() {}

public static BufferAllocator createBufferAllocator(
AllocationManager.Factory allocationManagerFactory) {
return new RootAllocator(
configBuilder()
.listener(AllocationListener.NOOP)
.maxAllocation(Long.MAX_VALUE)
.roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY)
.allocationManagerFactory(allocationManagerFactory)
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

AllocatorUtil is placed under the shaded Arrow package but imports org.apache.fluss.record.FlussRoundingPolicy. This introduces a dependency from shaded Arrow code back into Fluss packages (unlike the other shaded Arrow sources under org/apache/fluss/shaded/arrow/...), which can undermine the purpose of shading and complicate dependency isolation. Consider moving this utility to a non-shaded Fluss package, or keep it in the shaded package but avoid referencing Fluss classes directly (e.g., accept a RoundingPolicy/config as a parameter).

Copilot uses AI. Check for mistakes.
Comment on lines 167 to 174
LogRecordReadContext readContext =
LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo,
false,
null,
schemaGetter,
new ChunkedAllocationManager.ChunkedFactory());
LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.

Copilot uses AI. Check for mistakes.
Comment on lines +127 to +134
ChunkedAllocationManager.ChunkedFactory chunkedFactory =
new ChunkedAllocationManager.ChunkedFactory();
this.readContext =
LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, false, projection, schemaGetter, chunkedFactory);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
LogRecordReadContext.createReadContext(
tableInfo, true, projection, schemaGetter, chunkedFactory);
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

LogFetcher creates a ChunkedFactory instance and shares it across two LogRecordReadContexts, but it is not retained as a field and never closed. If ChunkedFactory retains native chunks (as its close() method suggests), those chunks may remain allocated after readContext/remoteReadContext are closed. Consider storing the factory as a member and closing it in LogFetcher.close() (or ensure allocator shutdown closes the factory automatically).

Copilot uses AI. Check for mistakes.
Comment on lines +114 to +123
public class ChunkedAllocationManager extends AllocationManager {

/** 8-byte alignment for all sub-allocations within a chunk. */
private static final long ALIGNMENT = 8;

/** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */
private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024;

/** Default maximum number of empty chunks to keep in the free-list. */
private static final int DEFAULT_MAX_FREE_CHUNKS = 3;
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

This PR is titled as a [client] change, but it introduces new allocator infrastructure in fluss-common (e.g., ChunkedAllocationManager, FlussRoundingPolicy, AllocatorUtil) in addition to the client wiring. Please consider adjusting the PR title/component to reflect the broader scope, or split common vs client wiring if that matches project norms.

Copilot uses AI. Check for mistakes.
Comment on lines +248 to +258
/**
* Factory that creates {@link ChunkedAllocationManager} instances.
*
* <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer.
* When the active chunk is full, a recycled or freshly-allocated chunk is used. Large
* allocations (>= chunkSize) get their own dedicated memory region.
*
* <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code
* synchronized}.
*/
public static class ChunkedFactory implements AllocationManager.Factory {
Copy link

Copilot AI Apr 11, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

ChunkedAllocationManager is a new, complex native-memory allocator (chunk pooling, ref-counting, concurrency). There are existing tests under fluss-common/src/test/java/org/apache/fluss/compression/* but no tests exercising this allocator’s core behaviors (chunk reuse, drain/recycle, direct-allocation path, and concurrent release race described in onChunkDrained). Adding focused unit tests here would help prevent regressions and validate the OOM fix scenario from #3025.

Copilot uses AI. Check for mistakes.
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 please add UT for this class

Copy link
Copy Markdown
Member

@wuchong wuchong left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

I remain concerned about modifying such a low-level memory allocator, especially given its complexity and the lack of unit test coverage. Could we investigate whether existing configuration options in the current allocator could mitigate this issue? Additionally, have we evaluated Arrow’s UnsafeAllocationManager and NettyAllocationManager as alternatives?

Furthermore, please note that KvManager still initializes RootAllocator with Long.MAX_VALUE.

* <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump
* pointer), behaving identically to {@link UnsafeAllocationManager}.
*/
public class ChunkedAllocationManager extends AllocationManager {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

move this class to package org.apache.fluss.row.arrow.memory, along with FlussRoundingPolicy and AllocatorUtil

Comment on lines +248 to +258
/**
* Factory that creates {@link ChunkedAllocationManager} instances.
*
* <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer.
* When the active chunk is full, a recycled or freshly-allocated chunk is used. Large
* allocations (>= chunkSize) get their own dedicated memory region.
*
* <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code
* synchronized}.
*/
public static class ChunkedFactory implements AllocationManager.Factory {
Copy link
Copy Markdown
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

+1 please add UT for this class

…edAllocationManager for Arrow memory allocation.
@loserwang1024
Copy link
Copy Markdown
Contributor Author

loserwang1024 commented Apr 13, 2026

Additionally, have we evaluated Arrow’s UnsafeAllocationManager and NettyAllocationManager as alternatives?

The NettyAllocationManager corresponds to our current allocation strategy. However, when decompressing data with over 1000 columns, it causes significant memory amplification (or memory bloat).

I also have tested UnsafeAllocationManager, and it indeed avoids OOM. However, since it requires allocating and freeing memory every time, I am concerned about the high overhead. For example, for 1000 columns, at least 2000 allocations and deallocations would be required for each batch.

That's why I chose this intermediate approach to reuse memory as much as possible.

please note that KvManager still initializes RootAllocator with Long.MAX_VALUE.

n our current production environment, off-heap memory on the server side is sufficient. Therefore, to minimize impact (or reduce risk), we decided not to make any changes.

* Closes this factory, freeing all cached chunks. Active chunks with outstanding
* sub-allocations will be freed when their last ArrowBuf is released.
*/
public synchronized void close() {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

If close() is called while ArrowBufs are still alive, the active chunk is set to null but not destroyed.
When those ArrowBufs are eventually released, onChunkDrained adds the chunk to freeChunks, but nobody will ever drain it, so native memor leaked

Copy link
Copy Markdown
Contributor Author

@loserwang1024 loserwang1024 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

You are right. Currently, Only close the ChunkedAllocationManager after all data has been fully read and released, which is why I didn't catch this during production testing.

* it is still the active chunk). The guard {@code chunk.subAllocCount.get() > 0} handles
* this race.
*/
synchronized void onChunkDrained(Chunk chunk) {
Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

The factory is claimed to be thread-safe, but onChunkDrained can be called twice for the same chunk, causing double-recycle or use-after-free

The race:
Thread A calls releaseSubAllocation(), subAllocCount hits 0, but hasn't entered onChunkDrained yet (no lock).
Meanwhile Thread B calls create() (synchronized), allocates from the same chunk (subAllocCount 0 to 1), then immediately releases (subAllocCount 1 to 0), and enters onChunkDrained first, recycling the chunk to freeChunks.
Thread A then enters onChunkDrained and subAllocCount is still 0, the > 0 guard passes, and the chunk is recycled/destroyed a second time.
Double entry in freeChunks -> overlapping allocations -> native memory corruption.

Reproduced with: 8 threads sharing one ChunkedFactory (1KB chunks, max 2 free), each thread doing allocator.buffer(64) -> close() in a loop, 500 times. JVM crashes with exit code 134.

I think some generation counter can help with it, shouldn't be very hard to add

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loserwang1024 Ty, very interesting PR, I really enjoyed reading the code.
Left some comments with my findings, PTAL

LogRecordReadContext.createReadContext(
tableInfo, false, projection, schemaGetter, chunkedFactory);
this.remoteReadContext =
LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter);
Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the write path affected by the OOM? Bump-pointer is ideal for read (exact-size, batch-release), but writes do grow-and-copy reallocation and dead space stays in the chunk until full drain.

Might be worth keeping the default allocator here, though we are talking about peak allocations only

Copy link
Copy Markdown
Contributor Author

@loserwang1024 loserwang1024 Apr 14, 2026

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Was the write path affected by the OOM?

Yes, when I test 1000+ columns, compress also need many small off-heap allocation for each column(ArrowWriter.initFieldVector). The error log is:

2026-04-08 10:20:27
java.io.IOException: Failed to send record to table fluss.table1(p=2026040810). Writer state: running
	at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:162)
	at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
	at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:246)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
	at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
	at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:706)
	at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1202)
	at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1146)
	at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976)
	at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955)
	at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:768)
	at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
	at java.base/java.lang.Thread.run(Thread.java:991)
Caused by: org.apache.fluss.exception.FlussRuntimeException: Failed to send record to table fluss.t_geely_2x_realtime_signal(p=2026040810). Writer state: running
	at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:217)
	at org.apache.fluss.client.write.WriterClient.send(WriterClient.java:140)
	at org.apache.fluss.client.table.writer.AbstractTableWriter.send(AbstractTableWriter.java:66)
	at org.apache.fluss.client.table.writer.AppendWriterImpl.append(AppendWriterImpl.java:87)
	at org.apache.fluss.flink.sink.writer.AppendSinkWriter.writeRow(AppendSinkWriter.java:62)
	at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:146)
	... 14 more
Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:355)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:350)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:338)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:316)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:280)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateBytes(BaseVariableWidthVector.java:462)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:420)
	at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:429)
	at org.apache.fluss.row.arrow.ArrowWriter.initFieldVector(ArrowWriter.java:309)
	at org.apache.fluss.row.arrow.ArrowWriter.<init>(ArrowWriter.java:130)
	at org.apache.fluss.row.arrow.ArrowWriterPool.lambda$getOrCreateWriter$3(ArrowWriterPool.java:107)
	at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
	at org.apache.fluss.row.arrow.ArrowWriterPool.getOrCreateWriter(ArrowWriterPool.java:93)
	at org.apache.fluss.client.write.RecordAccumulator.appendNewBatch(RecordAccumulator.java:599)
	at org.apache.fluss.client.write.RecordAccumulator.append(RecordAccumulator.java:210)
	at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:205)
	... 19 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
	at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
	at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
	at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:177)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211)
	at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
	... 40 more

@loserwang1024
Copy link
Copy Markdown
Contributor Author

@fresh-borzoni thanks for your advice, I do learn a lot! Just modify it.

Copy link
Copy Markdown
Contributor

@fresh-borzoni fresh-borzoni left a comment

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

@loserwang1024 TY for the nice code, LGTM 👍

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation

4 participants