Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -48,6 +48,7 @@
import java.util.ArrayList;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.locks.ReentrantReadWriteLock;

Expand Down Expand Up @@ -104,6 +105,13 @@ public ReentrantReadWriteLock getRegionLock(ConsensusGroupId consensusGroupId) {
: schemaRegionLockMap.get(consensusGroupId);
}

public boolean isRegionExists(final ConsensusGroupId consensusGroupId) {
return Objects.nonNull(
consensusGroupId instanceof DataRegionId
? storageEngine.getDataRegion((DataRegionId) consensusGroupId)
: schemaEngine.getSchemaRegion((SchemaRegionId) consensusGroupId));
}

public TSStatus createSchemaRegion(
final TRegionReplicaSet regionReplicaSet, final String storageGroup) {
TSStatus tsStatus;
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -94,7 +94,6 @@
import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Objects;
import java.util.concurrent.locks.ReentrantReadWriteLock;

public class RegionWriteExecutor {
Expand Down Expand Up @@ -166,7 +165,7 @@ public RegionExecutionResult execute(final ConsensusGroupId groupId, final PlanN
return planNode.accept(executionVisitor, new WritePlanNodeExecutionContext(groupId, lock));
} catch (final Throwable e) {
// Detect problems caused by removed region
if (Objects.isNull(regionManager.getRegionLock(groupId))) {
if (!regionManager.isRegionExists(groupId)) {
final String errorMsg =
"Exception "
+ e.getClass().getSimpleName()
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -71,6 +71,7 @@
import java.util.concurrent.Future;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;

// manage all the schemaRegion in this dataNode
public class SchemaEngine {
Expand Down Expand Up @@ -335,20 +336,34 @@ private ISchemaRegion createSchemaRegionWithoutExistenceCheck(
return schemaRegion;
}

public synchronized void deleteSchemaRegion(SchemaRegionId schemaRegionId)
public synchronized void deleteSchemaRegion(final SchemaRegionId schemaRegionId)
throws MetadataException {
ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
final ISchemaRegion schemaRegion = schemaRegionMap.get(schemaRegionId);
if (schemaRegion == null) {
logger.warn(DataNodeSchemaMessages.SCHEMA_REGION_ALREADY_DELETED, schemaRegionId);
return;
}
schemaRegion.deleteSchemaRegion();
final AtomicReference<MetadataException> lastException = new AtomicReference<>();
// Synchronize it for region deletion detection when error occurs
schemaRegionMap.compute(
schemaRegionId,
(regionId, iSchemaRegion) -> {
try {
schemaRegion.deleteSchemaRegion();
} catch (final MetadataException e) {
lastException.set(e);
return iSchemaRegion;
}
return null;
});
if (Objects.nonNull(lastException.get())) {
throw lastException.get();
}
schemaMetricManager.removeSchemaRegionMetric(schemaRegionId.getId());
schemaRegionMap.remove(schemaRegionId);

// check whether the sg dir is empty
File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath());
File[] regionDirList =
final File sgDir = new File(config.getSchemaDir(), schemaRegion.getDatabaseFullPath());
final File[] regionDirList =
sgDir.listFiles(
(dir, name) -> {
try {
Expand Down
Loading