From ae319465542ff1880e7e810f8f7163278ac15613 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 21 Mar 2025 16:46:26 +0800 Subject: [PATCH 1/5] Enhance --- .../db/protocol/thrift/impl/DataNodeRegionManager.java | 8 ++++++++ .../execution/executor/RegionWriteExecutor.java | 3 +-- .../org/apache/iotdb/db/schemaengine/SchemaEngine.java | 3 +-- 3 files changed, 10 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java index 4fc2cfbff9c68..e82639dfae277 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/protocol/thrift/impl/DataNodeRegionManager.java @@ -47,6 +47,7 @@ import java.util.ArrayList; import java.util.List; import java.util.Map; +import java.util.Objects; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.locks.ReentrantReadWriteLock; @@ -103,6 +104,13 @@ public ReentrantReadWriteLock getRegionLock(ConsensusGroupId consensusGroupId) { : schemaRegionLockMap.get(consensusGroupId); } + public boolean isRegionExists(final ConsensusGroupId consensusGroupId) { + return Objects.nonNull( + consensusGroupId instanceof DataRegionId + ? storageEngine.getDataRegion((DataRegionId) consensusGroupId) + : schemaEngine.getSchemaRegion((SchemaRegionId) consensusGroupId)); + } + public TSStatus createSchemaRegion( final TRegionReplicaSet regionReplicaSet, final String storageGroup) { TSStatus tsStatus; diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java index fc3013b6240a2..726261e8f33fb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/queryengine/execution/executor/RegionWriteExecutor.java @@ -88,7 +88,6 @@ import java.util.Iterator; import java.util.List; import java.util.Map; -import java.util.Objects; import java.util.concurrent.locks.ReentrantReadWriteLock; public class RegionWriteExecutor { @@ -160,7 +159,7 @@ public RegionExecutionResult execute(final ConsensusGroupId groupId, final PlanN return planNode.accept(executionVisitor, new WritePlanNodeExecutionContext(groupId, lock)); } catch (final Throwable e) { // Detect problems caused by removed region - if (Objects.isNull(regionManager.getRegionLock(groupId))) { + if (!regionManager.isRegionExists(groupId)) { final String errorMsg = "Exception " + e.getClass().getSimpleName() diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 4608e52c29a24..72e0624ba590f 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -314,14 +314,13 @@ private ISchemaRegion createSchemaRegionWithoutExistenceCheck( public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException { - ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId); + ISchemaRegion schemaRegion = schemaRegionMap.remove(schemaRegionId); if (schemaRegion == null) { logger.warn("SchemaRegion(id = {}) has been deleted, skiped", schemaRegionId); return; } schemaRegion.deleteSchemaRegion(); schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId()); - schemaRegionMap.remove(schemaRegionId); // check whether the sg dir is empty File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath()); From d9a791be884a72fb738898ae33940e75512e6b13 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 21 Mar 2025 17:04:12 +0800 Subject: [PATCH 2/5] Update SchemaEngine.java --- .../java/org/apache/iotdb/db/schemaengine/SchemaEngine.java | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 72e0624ba590f..6e6bfc269c734 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -316,7 +316,7 @@ public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId) throws MetadataException { ISchemaRegion schemaRegion = schemaRegionMap.remove(schemaRegionId); if (schemaRegion == null) { - logger.warn("SchemaRegion(id = {}) has been deleted, skiped", schemaRegionId); + logger.warn("SchemaRegion(id = {}) has been deleted, skipped", schemaRegionId); return; } schemaRegion.deleteSchemaRegion(); From 526d3d3c12382cfee554a0e3992c19e7fd39768e Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 21 Mar 2025 17:19:02 +0800 Subject: [PATCH 3/5] Update SchemaEngine.java --- .../org/apache/iotdb/db/schemaengine/SchemaEngine.java | 9 ++++++++- 1 file changed, 8 insertions(+), 1 deletion(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 6e6bfc269c734..1999a5b070d41 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -319,7 +319,14 @@ public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId) logger.warn("SchemaRegion(id = {}) has been deleted, skipped", schemaRegionId); return; } - schemaRegion.deleteSchemaRegion(); + try { + schemaRegion.deleteSchemaRegion(); + } catch (final Throwable e) { + // If any error occurs, we should restore the schema region for next + // retries to avoid resource leakage + schemaRegionMap.put(schemaRegionId, schemaRegion); + throw e; + } schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId()); // check whether the sg dir is empty From 52f90f36f0cbb0e0f4d3e8ecd1efbc169ce78297 Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 21 Mar 2025 17:20:14 +0800 Subject: [PATCH 4/5] Update SchemaEngine.java --- .../org/apache/iotdb/db/schemaengine/SchemaEngine.java | 9 +++++---- 1 file changed, 5 insertions(+), 4 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index 1999a5b070d41..a58a83c2ef1bb 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -312,9 +312,10 @@ private ISchemaRegion createSchemaRegionWithoutExistenceCheck( return schemaRegion; } - public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId) + public synchronized void deleteSchemaRegion(final SchemaRegionId schemaRegionId) throws MetadataException { - ISchemaRegion schemaRegion = schemaRegionMap.remove(schemaRegionId); + // Remove the region first for concurrent exceptions detection + final ISchemaRegion schemaRegion = schemaRegionMap.remove(schemaRegionId); if (schemaRegion == null) { logger.warn("SchemaRegion(id = {}) has been deleted, skipped", schemaRegionId); return; @@ -330,8 +331,8 @@ public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId) schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId()); // check whether the sg dir is empty - File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath()); - File[] regionDirList = + final File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath()); + final File[] regionDirList = sgDir.listFiles( (dir, name) -> { try { From a2bd1e34610191633cb5f42866fe6eea80c216fe Mon Sep 17 00:00:00 2001 From: Caideyipi <87789683+Caideyipi@users.noreply.github.com> Date: Fri, 21 Mar 2025 17:32:19 +0800 Subject: [PATCH 5/5] Update SchemaEngine.java --- .../iotdb/db/schemaengine/SchemaEngine.java | 26 ++++++++++++------- 1 file changed, 17 insertions(+), 9 deletions(-) diff --git a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java index a58a83c2ef1bb..768c55f2576de 100644 --- a/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java +++ b/iotdb-core/datanode/src/main/java/org/apache/iotdb/db/schemaengine/SchemaEngine.java @@ -67,6 +67,7 @@ import java.util.concurrent.Future; import java.util.concurrent.ScheduledExecutorService; import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicReference; // manage all the schemaRegion in this dataNode public class SchemaEngine { @@ -314,19 +315,26 @@ private ISchemaRegion createSchemaRegionWithoutExistenceCheck( public synchronized void deleteSchemaRegion(final SchemaRegionId schemaRegionId) throws MetadataException { - // Remove the region first for concurrent exceptions detection - final ISchemaRegion schemaRegion = schemaRegionMap.remove(schemaRegionId); + final ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId); if (schemaRegion == null) { logger.warn("SchemaRegion(id = {}) has been deleted, skipped", schemaRegionId); return; } - try { - schemaRegion.deleteSchemaRegion(); - } catch (final Throwable e) { - // If any error occurs, we should restore the schema region for next - // retries to avoid resource leakage - schemaRegionMap.put(schemaRegionId, schemaRegion); - throw e; + final AtomicReference lastException = new AtomicReference<>(); + // Synchronize it for region deletion detection when error occurs + schemaRegionMap.compute( + schemaRegionId, + (regionId, iSchemaRegion) -> { + try { + schemaRegion.deleteSchemaRegion(); + } catch (final MetadataException e) { + lastException.set(e); + return iSchemaRegion; + } + return null; + }); + if (Objects.nonNull(lastException.get())) { + throw lastException.get(); } schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId());