Improve pipe runtime memory allocation#18051
Conversation
Codecov Report❌ Patch coverage is Additional details and impacted files@@ Coverage Diff @@
## master #18051 +/- ##
============================================
+ Coverage 41.68% 41.75% +0.07%
Complexity 318 318
============================================
Files 5296 5296
Lines 371819 371946 +127
Branches 48106 48109 +3
============================================
+ Hits 154984 155317 +333
+ Misses 216835 216629 -206 ☔ View full report in Codecov by Harness. 🚀 New features to boost your workflow:
|
| final Tablet tablet = getNextTablet(); | ||
| // Record tablet metrics | ||
| recordTabletMetrics(tablet); | ||
| releaseTabletMemoryBlock(); |
There was a problem hiding this comment.
Where is the memory allocated?
May move it to the first line; otherwise, it looks like you are freeing the newly retrieved Tablet.
There was a problem hiding this comment.
Applied in c940475: the previous parser-owned tablet memory is released before fetching the next tablet, so the ownership/order is clearer.
| final long newTotalBufferSize = totalBufferSize + bufferSize; | ||
| PipeDataNodeResourceManager.memory() | ||
| .forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, maxBatchSizeInBytes)); | ||
| totalBufferSize = newTotalBufferSize; |
There was a problem hiding this comment.
newTotalBufferSize = Math.min(totalBufferSize + bufferSize, maxBatchSizeInBytes)?
There was a problem hiding this comment.
Applied in c940475: totalBufferSize is clamped with Math.min(totalBufferSize + bufferSize, maxBatchSizeInBytes) before resizing.
| private void rollbackTo( | ||
| final int previousEventsSize, | ||
| final int previousRequestCommitIdsSize, | ||
| final int previousBatchReqsSize) { | ||
| while (events.size() > previousEventsSize) { | ||
| events.remove(events.size() - 1); | ||
| } | ||
| while (requestCommitIds.size() > previousRequestCommitIdsSize) { | ||
| requestCommitIds.remove(requestCommitIds.size() - 1); | ||
| } | ||
| while (batchReqs.size() > previousBatchReqsSize) { | ||
| batchReqs.remove(batchReqs.size() - 1); | ||
| } | ||
| } |
There was a problem hiding this comment.
Applied in c940475: rollback now uses subList(...).clear() for events, commit ids, and batch requests.
|



Summary
Root cause fixed
In the failing path,
PipeTsFileInsertionEvent.consumeTabletInsertionEventsWithRetrycaughtPipeRuntimeOutOfMemoryCriticalExceptionand retried the same parsed tablet indefinitely. The parser still held temporary tablet/chunk/page memory while the consumer was trying to allocate memory for the downstreamPipeRawTabletInsertionEvent. With multiple pipes parsing large TsFiles concurrently, this could saturate the tablet memory pool and form a live-lock: pipe stays RUNNING, butsub_remain_eventdoes not decrease and no tablet is transferred.Implementation details
Tabletto a raw tablet event.lastEventand retries later.AtomicReference(PipeProcessorSubtask,DownSamplingProcessor,AggregateProcessor).PipeMemoryManager.resize(..., 0)releases memory.allocatedBlocks; also register zero-size blocks expanded throughtryAllocate(block, ...).Tests
TsFileInsertionEventParserTest:testScanParserReleasesTabletMemoryAfterRawTabletGeneratedtestConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemorymvn spotless:apply -pl iotdb-core/datanodegit diff --checkmvn -pl iotdb-core/datanode '-Dtest=TsFileInsertionEventParserTest#testScanParserReleasesTabletMemoryAfterRawTabletGenerated+testConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemory' '-Dspotless.check.skip=true' '-Dcheckstyle.skip=true' testConsensusSubscriptionSetupHandler: missingTopicConstant.COLUMN_KEYandTopicConstant.COLUMN_DEFAULT_VALUE.mvn -pl iotdb-core/datanode -DskipTests '-Dspotless.check.skip=true' '-Dcheckstyle.skip=true' compileTopicConstant.COLUMN_*compile errors.No new IT is added because this is a low-level parser/memory-manager recovery path and the deterministic UTs cover the parser-memory release and OOM cleanup behavior directly. The original 10-pipe/2kw scenario should be used as the regression workload for end-to-end validation.