[client] Replace Netty PooledByteBufAllocator with bump-pointer ChunkedAllocationManager for Arrow memory allocation.#3026
Conversation
There was a problem hiding this comment.
Pull request overview
This PR addresses Arrow memory fragmentation/OOM during high-column-count decompression by replacing Netty-backed pooled allocations with a bump-pointer, chunk-recycling allocation strategy and wiring it into client read/write paths.
Changes:
- Introduces
ChunkedAllocationManager(bump-pointer sub-allocation within reusable native chunks) for Arrow allocations. - Adds allocator construction utilities (
AllocatorUtil) and a custom rounding policy (FlussRoundingPolicy) used by the new allocator setup. - Updates client read/write components to use the new allocation manager (e.g.,
LogRecordReadContext,RecordAccumulator,LogFetcher,LimitBatchScanner).
Reviewed changes
Copilot reviewed 7 out of 7 changed files in this pull request and generated 10 comments.
Show a summary per file
| File | Description |
|---|---|
| fluss-common/src/main/java/org/apache/fluss/shaded/arrow/org/apache/arrow/memory/AllocatorUtil.java | Centralizes creation of a RootAllocator configured with a custom AllocationManager.Factory. |
| fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java | Allows injecting an AllocationManager.Factory and defaults Arrow read contexts to ChunkedFactory. |
| fluss-common/src/main/java/org/apache/fluss/record/FlussRoundingPolicy.java | Adds a custom rounding policy used in allocator configuration. |
| fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java | Implements the new chunked bump-pointer allocation manager + factory/pool. |
| fluss-client/src/main/java/org/apache/fluss/client/write/RecordAccumulator.java | Switches Arrow allocator used for writing to the new chunked allocator. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/log/LogFetcher.java | Shares a ChunkedFactory across local/remote read contexts for scanning. |
| fluss-client/src/main/java/org/apache/fluss/client/table/scanner/batch/LimitBatchScanner.java | Uses chunked allocator in read context for limit scans (Arrow path). |
💡 Add Copilot custom instructions for smarter, more guided reviews. Learn how to get started.
| // Align to 8 bytes for safe direct-memory access. | ||
| long alignedSize = (size + ALIGNMENT - 1) & ~(ALIGNMENT - 1); | ||
|
|
||
| if (activeChunk == null || !activeChunk.hasRoom(alignedSize)) { | ||
| // Current chunk is full or doesn't exist — obtain a recycled or new chunk. | ||
| activeChunk = obtainChunk(); | ||
| } | ||
|
|
||
| long offset = activeChunk.bumpAllocate(alignedSize); | ||
| return new ChunkedAllocationManager(accountingAllocator, activeChunk, offset, size); | ||
| } |
There was a problem hiding this comment.
ChunkedFactory.create() aligns allocations to 8 bytes (alignedSize) but the created ChunkedAllocationManager reports allocatedSize = size (unaligned). This can make Arrow's allocator accounting inconsistent with the actual bytes carved out of the chunk (e.g., size=1/2/4 becomes 8 bytes reserved) and may also return a buffer size that doesn't reflect the backing slice. Consider tracking and reporting the aligned size (or otherwise ensuring accounting matches the real reserved bytes).
| /** | ||
| * Closes this factory, freeing all cached chunks. Active chunks with outstanding | ||
| * sub-allocations will be freed when their last ArrowBuf is released. | ||
| */ | ||
| public synchronized void close() { | ||
| while (!freeChunks.isEmpty()) { | ||
| freeChunks.poll().destroy(); | ||
| } | ||
| if (activeChunk != null && activeChunk.subAllocCount.get() == 0) { | ||
| activeChunk.destroy(); | ||
| } | ||
| activeChunk = null; | ||
| } |
There was a problem hiding this comment.
ChunkedFactory allocates and caches native chunks (Unsafe.allocateMemory) and provides a close() method to free cached chunks, but no call sites invoke ChunkedFactory.close() (and it is not an @Override). If Arrow/RootAllocator does not automatically close the factory, this will retain native memory even after BufferAllocator.close(). Please wire factory closure into the owning component lifecycle (e.g., keep a reference and close it alongside the allocator) or implement the appropriate closeable interface expected by Arrow so it is closed automatically.
| * @param chunkSize maximum size of each chunk (bytes). Allocations >= this go direct. | ||
| * @param maxFreeChunks maximum number of empty chunks to keep cached for reuse. | ||
| */ | ||
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { |
There was a problem hiding this comment.
ChunkedFactory(long chunkSize, int maxFreeChunks) does not validate its inputs. Passing chunkSize <= 0 or maxFreeChunks < 0 can lead to Unsafe.allocateMemory errors or a factory that never recycles/frees chunks as intended. Please add argument validation (and fail fast with a clear exception).
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { | |
| public ChunkedFactory(long chunkSize, int maxFreeChunks) { | |
| if (chunkSize <= 0) { | |
| throw new IllegalArgumentException( | |
| "chunkSize must be > 0, but was " + chunkSize); | |
| } | |
| if (maxFreeChunks < 0) { | |
| throw new IllegalArgumentException( | |
| "maxFreeChunks must be >= 0, but was " + maxFreeChunks); | |
| } |
| } | ||
|
|
||
| private static long validateAndCalculateChunkSize(long pageSize, int maxOrder) { | ||
| if (maxOrder > 14) { |
There was a problem hiding this comment.
validateAndCalculateChunkSize rejects maxOrder > 14 but does not reject negative values, even though the error message says the expected range is 0-14. This can silently produce an invalid chunkSize when -Dorg.apache.memory.allocator.maxOrder is negative. Please validate maxOrder >= 0 as well.
| if (maxOrder > 14) { | |
| if (maxOrder < 0 || maxOrder > 14) { |
| * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty | ||
| * 4.1+ memory allocation behavior. | ||
| * | ||
| * <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's | ||
| * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. | ||
| * |
There was a problem hiding this comment.
The JavaDoc for FlussRoundingPolicy is now misleading in the context of this PR: it describes aligning Arrow to Netty’s chunking behavior and “reducing Arrow’s chunk size”, but the PR is explicitly replacing Netty pooling with ChunkedAllocationManager. Please update the documentation to reflect the new allocator strategy (or, if this rounding policy is still needed, explain why it’s still relevant without Netty).
| * A custom rounding policy that reduces Arrow's chunk size from 16MB to 4MB to align with Netty | |
| * 4.1+ memory allocation behavior. | |
| * | |
| * <p>Arrow's default maxOrder=11 (16MB chunks) can cause memory inefficiency when used with Netty's | |
| * maxOrder=9 (4MB chunks). This class patches the default by using maxOrder=9. | |
| * | |
| * A custom rounding policy that uses Fluss's configured Arrow chunk size instead of relying on | |
| * Arrow's historical default chunk sizing. | |
| * | |
| * <p>By default, Arrow uses {@code maxOrder=11}, which yields 16MB chunks for the default page | |
| * size. Fluss defaults {@code org.apache.memory.allocator.maxOrder} to {@code 9}, which yields | |
| * 4MB chunks and keeps rounding behavior aligned with the chunk size used by Fluss's allocation | |
| * strategy. | |
| * | |
| * <p>This policy is still needed without Netty because requests smaller than the configured chunk | |
| * size are rounded to the next power of two, while larger requests are left unchanged. Using the | |
| * configured chunk size here avoids Arrow rounding small allocations as if the chunk were larger | |
| * than the allocator actually uses. | |
| * |
| package org.apache.fluss.shaded.arrow.org.apache.arrow.memory; | ||
|
|
||
| import org.apache.fluss.record.FlussRoundingPolicy; | ||
|
|
||
| import static org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.configBuilder; | ||
|
|
||
| /** Util to create UNSAFE_ALLOCATOR. */ | ||
| public class AllocatorUtil { | ||
|
|
||
| private AllocatorUtil() {} | ||
|
|
||
| public static BufferAllocator createBufferAllocator( | ||
| AllocationManager.Factory allocationManagerFactory) { | ||
| return new RootAllocator( | ||
| configBuilder() | ||
| .listener(AllocationListener.NOOP) | ||
| .maxAllocation(Long.MAX_VALUE) | ||
| .roundingPolicy(FlussRoundingPolicy.DEFAULT_ROUNDING_POLICY) | ||
| .allocationManagerFactory(allocationManagerFactory) |
There was a problem hiding this comment.
AllocatorUtil is placed under the shaded Arrow package but imports org.apache.fluss.record.FlussRoundingPolicy. This introduces a dependency from shaded Arrow code back into Fluss packages (unlike the other shaded Arrow sources under org/apache/fluss/shaded/arrow/...), which can undermine the purpose of shading and complicate dependency isolation. Consider moving this utility to a non-shaded Fluss package, or keep it in the shaded package but avoid referencing Fluss classes directly (e.g., accept a RoundingPolicy/config as a parameter).
| LogRecordReadContext readContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, false, null, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, | ||
| false, | ||
| null, | ||
| schemaGetter, | ||
| new ChunkedAllocationManager.ChunkedFactory()); | ||
| LogRecords records = MemoryLogRecords.pointToByteBuffer(recordsBuffer); |
There was a problem hiding this comment.
In parseLimitScanResponse, the newly created LogRecordReadContext is never closed. Since LogRecordReadContext owns Arrow resources (VectorSchemaRoots + BufferAllocator), this will leak direct memory and native buffers. Please wrap the LogRecordReadContext in a try-with-resources (or otherwise ensure it’s closed) around the iteration over batches/records.
| ChunkedAllocationManager.ChunkedFactory chunkedFactory = | ||
| new ChunkedAllocationManager.ChunkedFactory(); | ||
| this.readContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, false, projection, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, false, projection, schemaGetter, chunkedFactory); | ||
| this.remoteReadContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); | ||
| LogRecordReadContext.createReadContext( | ||
| tableInfo, true, projection, schemaGetter, chunkedFactory); |
There was a problem hiding this comment.
LogFetcher creates a ChunkedFactory instance and shares it across two LogRecordReadContexts, but it is not retained as a field and never closed. If ChunkedFactory retains native chunks (as its close() method suggests), those chunks may remain allocated after readContext/remoteReadContext are closed. Consider storing the factory as a member and closing it in LogFetcher.close() (or ensure allocator shutdown closes the factory automatically).
| public class ChunkedAllocationManager extends AllocationManager { | ||
|
|
||
| /** 8-byte alignment for all sub-allocations within a chunk. */ | ||
| private static final long ALIGNMENT = 8; | ||
|
|
||
| /** Default chunk size: 4MB (matches Netty 4.1+ maxOrder=9). */ | ||
| private static final long DEFAULT_CHUNK_SIZE = 4L * 1024 * 1024; | ||
|
|
||
| /** Default maximum number of empty chunks to keep in the free-list. */ | ||
| private static final int DEFAULT_MAX_FREE_CHUNKS = 3; |
There was a problem hiding this comment.
This PR is titled as a [client] change, but it introduces new allocator infrastructure in fluss-common (e.g., ChunkedAllocationManager, FlussRoundingPolicy, AllocatorUtil) in addition to the client wiring. Please consider adjusting the PR title/component to reflect the broader scope, or split common vs client wiring if that matches project norms.
| /** | ||
| * Factory that creates {@link ChunkedAllocationManager} instances. | ||
| * | ||
| * <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. | ||
| * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large | ||
| * allocations (>= chunkSize) get their own dedicated memory region. | ||
| * | ||
| * <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code | ||
| * synchronized}. | ||
| */ | ||
| public static class ChunkedFactory implements AllocationManager.Factory { |
There was a problem hiding this comment.
ChunkedAllocationManager is a new, complex native-memory allocator (chunk pooling, ref-counting, concurrency). There are existing tests under fluss-common/src/test/java/org/apache/fluss/compression/* but no tests exercising this allocator’s core behaviors (chunk reuse, drain/recycle, direct-allocation path, and concurrent release race described in onChunkDrained). Adding focused unit tests here would help prevent regressions and validate the OOM fix scenario from #3025.
There was a problem hiding this comment.
+1 please add UT for this class
wuchong
left a comment
There was a problem hiding this comment.
I remain concerned about modifying such a low-level memory allocator, especially given its complexity and the lack of unit test coverage. Could we investigate whether existing configuration options in the current allocator could mitigate this issue? Additionally, have we evaluated Arrow’s UnsafeAllocationManager and NettyAllocationManager as alternatives?
Furthermore, please note that KvManager still initializes RootAllocator with Long.MAX_VALUE.
| * <p>For allocations >= chunkSize, a dedicated memory region is allocated directly (no bump | ||
| * pointer), behaving identically to {@link UnsafeAllocationManager}. | ||
| */ | ||
| public class ChunkedAllocationManager extends AllocationManager { |
There was a problem hiding this comment.
move this class to package org.apache.fluss.row.arrow.memory, along with FlussRoundingPolicy and AllocatorUtil
| /** | ||
| * Factory that creates {@link ChunkedAllocationManager} instances. | ||
| * | ||
| * <p>Small allocations (< chunkSize) are packed into the current active chunk via bump-pointer. | ||
| * When the active chunk is full, a recycled or freshly-allocated chunk is used. Large | ||
| * allocations (>= chunkSize) get their own dedicated memory region. | ||
| * | ||
| * <p>This factory is thread-safe: {@link #create} and {@link #onChunkDrained} are {@code | ||
| * synchronized}. | ||
| */ | ||
| public static class ChunkedFactory implements AllocationManager.Factory { |
There was a problem hiding this comment.
+1 please add UT for this class
fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/compression/ChunkedAllocationManager.java
Outdated
Show resolved
Hide resolved
fluss-common/src/main/java/org/apache/fluss/record/LogRecordReadContext.java
Show resolved
Hide resolved
…edAllocationManager for Arrow memory allocation.
The NettyAllocationManager corresponds to our current allocation strategy. However, when decompressing data with over 1000 columns, it causes significant memory amplification (or memory bloat). I also have tested UnsafeAllocationManager, and it indeed avoids OOM. However, since it requires allocating and freeing memory every time, I am concerned about the high overhead. For example, for 1000 columns, at least 2000 allocations and deallocations would be required for each batch. That's why I chose this intermediate approach to reuse memory as much as possible.
n our current production environment, off-heap memory on the server side is sufficient. Therefore, to minimize impact (or reduce risk), we decided not to make any changes. |
cc550c9 to
1bc6699
Compare
| * Closes this factory, freeing all cached chunks. Active chunks with outstanding | ||
| * sub-allocations will be freed when their last ArrowBuf is released. | ||
| */ | ||
| public synchronized void close() { |
There was a problem hiding this comment.
If close() is called while ArrowBufs are still alive, the active chunk is set to null but not destroyed.
When those ArrowBufs are eventually released, onChunkDrained adds the chunk to freeChunks, but nobody will ever drain it, so native memor leaked
There was a problem hiding this comment.
You are right. Currently, Only close the ChunkedAllocationManager after all data has been fully read and released, which is why I didn't catch this during production testing.
| * it is still the active chunk). The guard {@code chunk.subAllocCount.get() > 0} handles | ||
| * this race. | ||
| */ | ||
| synchronized void onChunkDrained(Chunk chunk) { |
There was a problem hiding this comment.
The factory is claimed to be thread-safe, but onChunkDrained can be called twice for the same chunk, causing double-recycle or use-after-free
The race:
Thread A calls releaseSubAllocation(), subAllocCount hits 0, but hasn't entered onChunkDrained yet (no lock).
Meanwhile Thread B calls create() (synchronized), allocates from the same chunk (subAllocCount 0 to 1), then immediately releases (subAllocCount 1 to 0), and enters onChunkDrained first, recycling the chunk to freeChunks.
Thread A then enters onChunkDrained and subAllocCount is still 0, the > 0 guard passes, and the chunk is recycled/destroyed a second time.
Double entry in freeChunks -> overlapping allocations -> native memory corruption.
Reproduced with: 8 threads sharing one ChunkedFactory (1KB chunks, max 2 free), each thread doing allocator.buffer(64) -> close() in a loop, 500 times. JVM crashes with exit code 134.
I think some generation counter can help with it, shouldn't be very hard to add
fresh-borzoni
left a comment
There was a problem hiding this comment.
@loserwang1024 Ty, very interesting PR, I really enjoyed reading the code.
Left some comments with my findings, PTAL
| LogRecordReadContext.createReadContext( | ||
| tableInfo, false, projection, schemaGetter, chunkedFactory); | ||
| this.remoteReadContext = | ||
| LogRecordReadContext.createReadContext(tableInfo, true, projection, schemaGetter); |
There was a problem hiding this comment.
Was the write path affected by the OOM? Bump-pointer is ideal for read (exact-size, batch-release), but writes do grow-and-copy reallocation and dead space stays in the chunk until full drain.
Might be worth keeping the default allocator here, though we are talking about peak allocations only
There was a problem hiding this comment.
Was the write path affected by the OOM?
Yes, when I test 1000+ columns, compress also need many small off-heap allocation for each column(ArrowWriter.initFieldVector). The error log is:
2026-04-08 10:20:27
java.io.IOException: Failed to send record to table fluss.table1(p=2026040810). Writer state: running
at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:162)
at org.apache.flink.streaming.runtime.operators.sink.SinkWriterOperator.processElement(SinkWriterOperator.java:203)
at org.apache.flink.streaming.runtime.tasks.OneInputStreamTask$StreamTaskNetworkOutput.emitRecord(OneInputStreamTask.java:246)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.processElement(AbstractStreamTaskNetworkInput.java:217)
at org.apache.flink.streaming.runtime.io.AbstractStreamTaskNetworkInput.emitNext(AbstractStreamTaskNetworkInput.java:169)
at org.apache.flink.streaming.runtime.io.StreamOneInputProcessor.processInput(StreamOneInputProcessor.java:68)
at org.apache.flink.streaming.runtime.tasks.StreamTask.processInput(StreamTask.java:706)
at org.apache.flink.streaming.runtime.tasks.mailbox.MailboxProcessor.runMailboxLoop(MailboxProcessor.java:231)
at org.apache.flink.streaming.runtime.tasks.StreamTask.runMailboxLoop(StreamTask.java:1202)
at org.apache.flink.streaming.runtime.tasks.StreamTask.invoke(StreamTask.java:1146)
at org.apache.flink.runtime.taskmanager.Task.runWithSystemExitMonitoring(Task.java:976)
at org.apache.flink.runtime.taskmanager.Task.restoreAndInvoke(Task.java:955)
at org.apache.flink.runtime.taskmanager.Task.doRun(Task.java:768)
at org.apache.flink.runtime.taskmanager.Task.run(Task.java:580)
at java.base/java.lang.Thread.run(Thread.java:991)
Caused by: org.apache.fluss.exception.FlussRuntimeException: Failed to send record to table fluss.t_geely_2x_realtime_signal(p=2026040810). Writer state: running
at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:217)
at org.apache.fluss.client.write.WriterClient.send(WriterClient.java:140)
at org.apache.fluss.client.table.writer.AbstractTableWriter.send(AbstractTableWriter.java:66)
at org.apache.fluss.client.table.writer.AppendWriterImpl.append(AppendWriterImpl.java:87)
at org.apache.fluss.flink.sink.writer.AppendSinkWriter.writeRow(AppendSinkWriter.java:62)
at org.apache.fluss.flink.sink.writer.FlinkSinkWriter.write(FlinkSinkWriter.java:146)
... 14 more
Caused by: org.apache.fluss.shaded.arrow.org.apache.arrow.memory.OutOfMemoryException: Failure allocating buffer.
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:67)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:77)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager.<init>(NettyAllocationManager.java:84)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.NettyAllocationManager$1.create(NettyAllocationManager.java:34)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:355)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.newAllocationManager(BaseAllocator.java:350)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.bufferWithoutReservation(BaseAllocator.java:338)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:316)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.BaseAllocator.buffer(BaseAllocator.java:280)
at org.apache.fluss.shaded.arrow.org.apache.arrow.memory.RootAllocator.buffer(RootAllocator.java:29)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateBytes(BaseVariableWidthVector.java:462)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:420)
at org.apache.fluss.shaded.arrow.org.apache.arrow.vector.BaseVariableWidthVector.allocateNew(BaseVariableWidthVector.java:429)
at org.apache.fluss.row.arrow.ArrowWriter.initFieldVector(ArrowWriter.java:309)
at org.apache.fluss.row.arrow.ArrowWriter.<init>(ArrowWriter.java:130)
at org.apache.fluss.row.arrow.ArrowWriterPool.lambda$getOrCreateWriter$3(ArrowWriterPool.java:107)
at org.apache.fluss.utils.concurrent.LockUtils.inLock(LockUtils.java:42)
at org.apache.fluss.row.arrow.ArrowWriterPool.getOrCreateWriter(ArrowWriterPool.java:93)
at org.apache.fluss.client.write.RecordAccumulator.appendNewBatch(RecordAccumulator.java:599)
at org.apache.fluss.client.write.RecordAccumulator.append(RecordAccumulator.java:210)
at org.apache.fluss.client.write.WriterClient.doSend(WriterClient.java:205)
... 19 more
Caused by: java.lang.OutOfMemoryError: Direct buffer memory. The direct out-of-memory error has occurred. This can mean two things: either job(s) require(s) a larger size of JVM direct memory or there is a direct memory leak. The direct memory can be allocated by user code or some of its dependencies. In this case 'taskmanager.memory.task.off-heap.size' configuration option should be increased. Flink framework and its dependencies also consume the direct memory, mostly for network communication. The most of network memory is managed by Flink and should not result in out-of-memory error. In certain special cases, in particular for jobs with high parallelism, the framework may require more direct memory which is not managed by Flink. In this case 'taskmanager.memory.framework.off-heap.size' configuration option should be increased. If the error persists then there is probably a direct memory leak in user code or some of its dependencies which has to be investigated and fixed. The task executor has to be shutdown...
at java.base/java.nio.Bits.reserveMemory(Bits.java:175)
at java.base/java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:118)
at java.base/java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:317)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.allocateDirect(PoolArena.java:710)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena$DirectArena.newChunk(PoolArena.java:685)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocateNormal(PoolArena.java:212)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.tcacheAllocateSmall(PoolArena.java:177)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:134)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PoolArena.allocate(PoolArena.java:126)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.newDirectBufferL(PooledByteBufAllocatorL.java:178)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL$InnerAllocator.directBuffer(PooledByteBufAllocatorL.java:211)
at org.apache.fluss.shaded.netty4.io.netty.buffer.PooledByteBufAllocatorL.allocate(PooledByteBufAllocatorL.java:58)
... 40 more|
@fresh-borzoni thanks for your advice, I do learn a lot! Just modify it. |
4e24fe4 to
9f2dd02
Compare
fresh-borzoni
left a comment
There was a problem hiding this comment.
@loserwang1024 TY for the nice code, LGTM 👍
(The sections below can be removed for hotfixes or typos)
-->
Purpose
Linked issue: close #3025
Brief change log
Tests
API and Format
Documentation