Skip to content

Improve pipe runtime memory allocation#18051

Open
Caideyipi wants to merge 9 commits into
masterfrom
pipe-dynamic-memory-allocation
Open

Improve pipe runtime memory allocation#18051
Caideyipi wants to merge 9 commits into
masterfrom
pipe-dynamic-memory-allocation

Conversation

@Caideyipi

@Caideyipi Caideyipi commented Jun 29, 2026

Copy link
Copy Markdown
Collaborator

Summary

  • Keep PR 18051's dynamic pipe memory allocation changes for TsFile parser, sink batch, and TsFile read buffers.
  • Fix the TsFile -> Tablet parser memory self-lock seen in multi-pipe large TsFile sync: when tablet memory allocation fails, the same parser no longer keeps retrying while holding parser chunk/page/tablet memory.
  • Bound the OOM recovery path so the TsFile event is retried by the outer pipe subtask after parser memory is released, instead of spinning forever inside one parsed tablet.

Root cause fixed

In the failing path, PipeTsFileInsertionEvent.consumeTabletInsertionEventsWithRetry caught PipeRuntimeOutOfMemoryCriticalException and retried the same parsed tablet indefinitely. The parser still held temporary tablet/chunk/page memory while the consumer was trying to allocate memory for the downstream PipeRawTabletInsertionEvent. With multiple pipes parsing large TsFiles concurrently, this could saturate the tablet memory pool and form a live-lock: pipe stays RUNNING, but sub_remain_event does not decrease and no tablet is transferred.

Implementation details

  • Release parser temporary tablet memory after scan/query/table parsers hand a Tablet to a raw tablet event.
  • On pipe OOM during TsFile tablet consumption:
    • release the parsed raw tablet event if it has not been queued/referenced;
    • close the current TsFile parser to release parser-side tablet/chunk/page/modification memory;
    • rethrow OOM so the outer pipe subtask keeps the original TsFile event as lastEvent and retries later.
  • Ensure OOM is not hidden by processor lambdas that temporarily store exceptions in AtomicReference (PipeProcessorSubtask, DownSamplingProcessor, AggregateProcessor).
  • Wake memory waiters when PipeMemoryManager.resize(..., 0) releases memory.
  • Fix zero-size memory block registration so a failed expansion does not leave an unallocated block in allocatedBlocks; also register zero-size blocks expanded through tryAllocate(block, ...).
  • Update the OOM log message to describe the new release-and-retry behavior instead of infinite in-place retry.

Tests

  • Added UT coverage in TsFileInsertionEventParserTest:
    • testScanParserReleasesTabletMemoryAfterRawTabletGenerated
    • testConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemory
  • mvn spotless:apply -pl iotdb-core/datanode
  • git diff --check
  • Attempted targeted UT:
    • mvn -pl iotdb-core/datanode '-Dtest=TsFileInsertionEventParserTest#testScanParserReleasesTabletMemoryAfterRawTabletGenerated+testConsumeTabletInsertionEventsWithRetryReleasesParserOnOutOfMemory' '-Dspotless.check.skip=true' '-Dcheckstyle.skip=true' test
    • blocked before test execution by existing unrelated main compile errors in ConsensusSubscriptionSetupHandler: missing TopicConstant.COLUMN_KEY and TopicConstant.COLUMN_DEFAULT_VALUE.
  • Attempted module compile with reduced JVM memory pressure:
    • mvn -pl iotdb-core/datanode -DskipTests '-Dspotless.check.skip=true' '-Dcheckstyle.skip=true' compile
    • blocked by the same unrelated TopicConstant.COLUMN_* compile errors.

No new IT is added because this is a low-level parser/memory-manager recovery path and the deterministic UTs cover the parser-memory release and OOM cleanup behavior directly. The original 10-pipe/2kw scenario should be used as the regression workload for end-to-end validation.

@codecov

codecov Bot commented Jun 29, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 21.94357% with 249 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.75%. Comparing base (3eddf08) to head (06393dd).
⚠️ Report is 2 commits behind head on master.

Files with missing lines Patch % Lines
.../protocol/thrift/sync/IoTDBDataRegionSyncSink.java 0.00% 53 Missing ⚠️
...otdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java 0.00% 48 Missing ⚠️
...builder/IoTConsensusV2TransferBatchReqBuilder.java 0.00% 47 Missing ⚠️
...oad/evolvable/batch/PipeTabletEventPlainBatch.java 14.28% 18 Missing ⚠️
...ler/IoTConsensusV2TsFileInsertionEventHandler.java 0.00% 18 Missing ⚠️
...hrift/async/handler/PipeTransferTsFileHandler.java 0.00% 18 Missing ⚠️
.../payload/evolvable/batch/PipeTabletEventBatch.java 60.60% 13 Missing ⚠️
...rotocol/iotconsensusv2/IoTConsensusV2SyncSink.java 0.00% 10 Missing ⚠️
...pipe/sink/protocol/legacy/IoTDBLegacyPipeSink.java 0.00% 9 Missing ⚠️
...ad/evolvable/batch/PipeTabletEventTsFileBatch.java 44.44% 5 Missing ⚠️
... and 5 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #18051      +/-   ##
============================================
+ Coverage     41.68%   41.75%   +0.07%     
  Complexity      318      318              
============================================
  Files          5296     5296              
  Lines        371819   371946     +127     
  Branches      48106    48109       +3     
============================================
+ Hits         154984   155317     +333     
+ Misses       216835   216629     -206     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

final Tablet tablet = getNextTablet();
// Record tablet metrics
recordTabletMetrics(tablet);
releaseTabletMemoryBlock();

@jt2594838 jt2594838 Jul 3, 2026

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Where is the memory allocated?
May move it to the first line; otherwise, it looks like you are freeing the newly retrieved Tablet.

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in c940475: the previous parser-owned tablet memory is released before fetching the next tablet, so the ownership/order is clearer.

Comment on lines +137 to +140
final long newTotalBufferSize = totalBufferSize + bufferSize;
PipeDataNodeResourceManager.memory()
.forceResize(allocatedMemoryBlock, Math.min(newTotalBufferSize, maxBatchSizeInBytes));
totalBufferSize = newTotalBufferSize;

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

newTotalBufferSize = Math.min(totalBufferSize + bufferSize, maxBatchSizeInBytes)?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in c940475: totalBufferSize is clamped with Math.min(totalBufferSize + bufferSize, maxBatchSizeInBytes) before resizing.

Comment on lines +171 to +184
private void rollbackTo(
final int previousEventsSize,
final int previousRequestCommitIdsSize,
final int previousBatchReqsSize) {
while (events.size() > previousEventsSize) {
events.remove(events.size() - 1);
}
while (requestCommitIds.size() > previousRequestCommitIdsSize) {
requestCommitIds.remove(requestCommitIds.size() - 1);
}
while (batchReqs.size() > previousBatchReqsSize) {
batchReqs.remove(batchReqs.size() - 1);
}
}

Copy link
Copy Markdown
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

subList.clear()?

Copy link
Copy Markdown
Collaborator Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Applied in c940475: rollback now uses subList(...).clear() for events, commit ids, and batch requests.

@sonarqubecloud

sonarqubecloud Bot commented Jul 3, 2026

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

2 participants