Skip to content

Commit ba02f8f

Browse files
committed
Update WriteBackSink.java
1 parent 129d06a commit ba02f8f

1 file changed

Lines changed: 41 additions & 16 deletions

File tree

  • iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback

iotdb-core/datanode/src/main/java/org/apache/iotdb/db/pipe/sink/protocol/writeback/WriteBackSink.java

Lines changed: 41 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -127,8 +127,6 @@ public class WriteBackSink implements PipeConnector {
127127

128128
// Simulate the behavior of the client-to-server communication
129129
// for correctly handling data insertion in IoTDBReceiverAgent#receive method
130-
private static final Coordinator COORDINATOR = Coordinator.getInstance();
131-
private static final SessionManager SESSION_MANAGER = SessionManager.getInstance();
132130
public static final AtomicLong id = new AtomicLong();
133131
private InternalClientSession session;
134132

@@ -140,10 +138,34 @@ public class WriteBackSink implements PipeConnector {
140138
private String invalidTargetTableModelDatabaseName;
141139
private String targetTreeModelDatabaseName;
142140

143-
private static final SqlParser RELATIONAL_SQL_PARSER = new SqlParser();
144-
145141
private static final Set<String> ALREADY_CREATED_DATABASES = ConcurrentHashMap.newKeySet();
146142

143+
private static SessionManager getSessionManager() {
144+
return SessionManagerHolder.INSTANCE;
145+
}
146+
147+
private static SqlParser getRelationalSqlParser() {
148+
return SqlParserHolder.INSTANCE;
149+
}
150+
151+
private static class SessionManagerHolder {
152+
153+
private static final SessionManager INSTANCE = SessionManager.getInstance();
154+
155+
private SessionManagerHolder() {
156+
// empty constructor
157+
}
158+
}
159+
160+
private static class SqlParserHolder {
161+
162+
private static final SqlParser INSTANCE = new SqlParser();
163+
164+
private SqlParserHolder() {
165+
// empty constructor
166+
}
167+
}
168+
147169
@Override
148170
public void validate(final PipeParameterValidator validator) throws Exception {
149171
validator.validateSynonymAttributes(
@@ -279,13 +301,14 @@ public void customize(
279301
customizeTargetDatabase(targetDatabase);
280302
}
281303

282-
if (SESSION_MANAGER.getCurrSession() == null) {
283-
SESSION_MANAGER.registerSession(session);
304+
final SessionManager sessionManager = getSessionManager();
305+
if (sessionManager.getCurrSession() == null) {
306+
sessionManager.registerSession(session);
284307
}
285308

286309
// Check the password and its expiration
287310
if (Objects.nonNull(passwordString)
288-
&& SESSION_MANAGER
311+
&& sessionManager
289312
.login(
290313
session,
291314
usernameString,
@@ -683,7 +706,9 @@ private static void throwWriteBackExceptionIfNecessary(
683706
@Override
684707
public void close() throws Exception {
685708
if (session != null) {
686-
SESSION_MANAGER.closeSession(session, COORDINATOR::cleanupQueryExecution, false);
709+
getSessionManager()
710+
.closeSession(
711+
session, queryId -> Coordinator.getInstance().cleanupQueryExecution(queryId), false);
687712
}
688713
}
689714

@@ -700,10 +725,10 @@ private TSStatus executeStatementForTableModel(
700725
return Coordinator.getInstance()
701726
.executeForTableModel(
702727
new PipeEnrichedStatement(statement),
703-
RELATIONAL_SQL_PARSER,
728+
getRelationalSqlParser(),
704729
session,
705-
SESSION_MANAGER.requestQueryId(),
706-
SESSION_MANAGER.getSessionInfoOfPipeReceiver(session, dataBaseName),
730+
getSessionManager().requestQueryId(),
731+
getSessionManager().getSessionInfoOfPipeReceiver(session, dataBaseName),
707732
"",
708733
LocalExecutionPlanner.getInstance().metadata,
709734
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
@@ -733,10 +758,10 @@ private TSStatus executeStatementForTableModel(
733758
return Coordinator.getInstance()
734759
.executeForTableModel(
735760
new PipeEnrichedStatement(statement),
736-
RELATIONAL_SQL_PARSER,
761+
getRelationalSqlParser(),
737762
session,
738-
SESSION_MANAGER.requestQueryId(),
739-
SESSION_MANAGER.getSessionInfo(session),
763+
getSessionManager().requestQueryId(),
764+
getSessionManager().getSessionInfo(session),
740765
"",
741766
LocalExecutionPlanner.getInstance().metadata,
742767
IoTDBDescriptor.getInstance().getConfig().getQueryTimeoutThreshold())
@@ -821,8 +846,8 @@ private TSStatus executeStatementForTreeModel(final Statement statement, final S
821846
return Coordinator.getInstance()
822847
.executeForTreeModel(
823848
new PipeEnrichedStatement(statement),
824-
SESSION_MANAGER.requestQueryId(),
825-
SESSION_MANAGER.getSessionInfo(session),
849+
getSessionManager().requestQueryId(),
850+
getSessionManager().getSessionInfo(session),
826851
"",
827852
ClusterPartitionFetcher.getInstance(),
828853
ClusterSchemaFetcher.getInstance(),

0 commit comments

Comments
 (0)