Skip to content

Pipe: Fix potential resource leaks in pipe lifecycle cleanup#17666

Open
Caideyipi wants to merge 4 commits into
masterfrom
potential-leak
Open

Pipe: Fix potential resource leaks in pipe lifecycle cleanup#17666
Caideyipi wants to merge 4 commits into
masterfrom
potential-leak

Conversation

@Caideyipi

@Caideyipi Caideyipi commented May 14, 2026

Copy link
Copy Markdown
Collaborator

Description

This PR fixes several potential memory/resource leaks in Pipe runtime lifecycle cleanup.

Previously, some passive paths could create or retain per-schema-region or per-pipe runtime state during leader notification, snapshot handling, queue cleanup, metric updates, pipe drop, and schema-region deletion. This could leave stale listeners, metrics, committers, rate limiters, aggregate runtime state, or floating memory counters after a pipe/schema region was closed, dropped, altered, or deleted.

Main changes:

  • Avoid creating schema-region pipe listeners from passive paths.

    • Add schemaListenerIfPresent(...) and use it in snapshot/load/write/delete and queue cleanup paths.
    • Keep schema-region leader-ready state separate from listener state, so leader notifications no longer instantiate SchemaRegionListeningQueue and its metrics.
    • Cleanup unused closed schema listeners and deregister listener metrics.
    • Clear listener and leader-ready state when deleting a schema region.
  • Cleanup stale per-pipe runtime state on pipe drop/discard.

    • Clear PipeEventCommitManager committer and restart-time state when a pipe is dropped.
    • Remove endpoint rate limiter entries when data-region sinks discard events of a dropped pipe.
    • Ensure pipe metrics deregistration removes internal maps/gauges/counters, including when the metric service is not bound.
    • Avoid recreating remaining-event metric operators for stale pipe instances during metric updates.
  • Make floating memory accounting safe for late event releases.

    • Track floating memory by pipeName_creationTime instead of relying only on live PipeMeta.
    • Keep counters after pipe drop/alter while late releases may still arrive.
    • Cleanup counters only after usage returns to zero and the matching pipe instance is no longer alive.
  • Fix aggregate processor shared-state cleanup.

    • Decrease per-pipe reference count safely.
    • Remove and clear per-timeseries runtime state only when the last processor reference closes.

Tests

Added unit tests for:

  • PipeSchemaRegionListenerManagerTest: verifies leader state does not create listeners and unused closed listeners are cleaned up.
  • PipeTaskAgentTest: verifies floating memory usage remains tracked after pipe drop until late release, and zero counters are cleaned after drop.

This PR has:

  • been self-reviewed.
    • concurrent read
    • concurrent write
    • concurrent read and write
  • added documentation for new or modified features or behaviors.
  • added Javadocs for most classes and all non-trivial methods.
  • added or updated version, license, or notice information
  • added comments explaining the why and the intent of the code wherever would not be obvious for an unfamiliar reader.
  • added unit tests or modified existing tests to cover new code paths, ensuring the threshold for code coverage.
  • added integration tests.
  • been tested in a test IoTDB cluster.

Key changed/added classes (or packages if there are too many classes) in this PR
  • PipeSchemaRegionListenerManager
  • PipeDataNodeRuntimeAgent
  • SchemaExecutionVisitor
  • SchemaRegionStateMachine
  • DataNodeRegionManager
  • PipeDataNodeTaskAgent
  • PipeEventCommitManager
  • IoTDBSink
  • PipeDataNodeSinglePipeMetrics
  • PipeTsFileToTabletsMetrics
  • PipeTaskAgent
  • AggregateProcessor

@sonarqubecloud

Copy link
Copy Markdown

@codecov

codecov Bot commented May 14, 2026

Copy link
Copy Markdown

Codecov Report

❌ Patch coverage is 48.34123% with 109 lines in your changes missing coverage. Please review.
✅ Project coverage is 41.42%. Comparing base (fcd52d7) to head (74e6f9e).

Files with missing lines Patch % Lines
...metric/overview/PipeDataNodeSinglePipeMetrics.java 50.94% 26 Missing ⚠️
...machine/schemaregion/SchemaRegionStateMachine.java 11.11% 16 Missing ⚠️
...temachine/schemaregion/SchemaExecutionVisitor.java 0.00% 11 Missing ⚠️
...pe/metric/overview/PipeTsFileToTabletsMetrics.java 0.00% 11 Missing ⚠️
...b/pipe/processor/aggregate/AggregateProcessor.java 0.00% 10 Missing ⚠️
...pe/agent/task/progress/PipeEventCommitManager.java 0.00% 10 Missing ⚠️
...agent/runtime/PipeSchemaRegionListenerManager.java 75.00% 7 Missing ⚠️
...otdb/db/pipe/agent/task/PipeDataNodeTaskAgent.java 0.00% 7 Missing ⚠️
...b/pipe/agent/runtime/PipeDataNodeRuntimeAgent.java 20.00% 4 Missing ⚠️
...db/protocol/thrift/impl/DataNodeRegionManager.java 0.00% 4 Missing ⚠️
... and 2 more
Additional details and impacted files
@@             Coverage Diff              @@
##             master   #17666      +/-   ##
============================================
+ Coverage     41.40%   41.42%   +0.02%     
  Complexity      318      318              
============================================
  Files          5286     5286              
  Lines        369547   369640      +93     
  Branches      47815    47848      +33     
============================================
+ Hits         153008   153134     +126     
+ Misses       216539   216506      -33     

☔ View full report in Codecov by Harness.
📢 Have feedback on the report? Share it here.

🚀 New features to boost your workflow:
  • ❄️ Test Analytics: Detect flaky tests, report on failures, and find test suite problems.

@Caideyipi Caideyipi changed the title Pipe: Fixed multiple potential memory leaks Pipe: Fix potential resource leaks in pipe lifecycle cleanup Jun 8, 2026
# Conflicts:
#	iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaExecutionVisitor.java
#	iotdb-core/datanode/src/main/java/org/apache/iotdb/db/consensus/statemachine/schemaregion/SchemaRegionStateMachine.java
#	iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/async/IoTDBDataRegionAsyncSink.java
#	iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/thrift/sync/IoTDBDataRegionSyncSink.java
@sonarqubecloud

Copy link
Copy Markdown

Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant