9393import com .cloud .utils .Pair ;
9494import com .cloud .utils .component .ManagerBase ;
9595import com .cloud .utils .exception .CloudRuntimeException ;
96+ import org .apache .logging .log4j .ThreadContext ;
9697
9798public class StorageOrchestrator extends ManagerBase implements StorageOrchestrationService , Configurable {
9899
100+ private static final String LOGCONTEXTID = "logcontextid" ;
101+
99102 @ Inject
100103 SnapshotDataStoreDao snapshotDataStoreDao ;
101104 @ Inject
@@ -137,6 +140,8 @@ public class StorageOrchestrator extends ManagerBase implements StorageOrchestra
137140 Integer numConcurrentCopyTasksPerSSVM = 2 ;
138141
139142 private final Map <Long , ThreadPoolExecutor > zoneExecutorMap = new ConcurrentHashMap <>();
143+ private final Map <Long , Integer > zonePendingWorkCountMap = new ConcurrentHashMap <>();
144+
140145 private final Map <Long , ExecutorService > zoneKvmIncrementalExecutorMap = new ConcurrentHashMap <>();
141146
142147 @ Override
@@ -513,12 +518,18 @@ private void createMigrateDataTask(DataObject chosenFileForMigration, Map<DataOb
513518 }
514519
515520 protected <T > Future <T > submit (Long zoneId , Callable <T > task ) {
516- if (!zoneExecutorMap .containsKey (zoneId )) {
517- zoneExecutorMap .put (zoneId , new ThreadPoolExecutor (numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM ,
518- 30 , TimeUnit .MINUTES , new MigrateBlockingQueue <>(numConcurrentCopyTasksPerSSVM )));
521+ ThreadPoolExecutor executor ;
522+ synchronized (this ) {
523+ if (!zoneExecutorMap .containsKey (zoneId )) {
524+ zoneExecutorMap .put (zoneId , new ThreadPoolExecutor (numConcurrentCopyTasksPerSSVM , numConcurrentCopyTasksPerSSVM ,
525+ 30 , TimeUnit .MINUTES , new MigrateBlockingQueue <>(numConcurrentCopyTasksPerSSVM )));
526+ zonePendingWorkCountMap .put (zoneId , 0 );
527+ }
528+ zonePendingWorkCountMap .merge (zoneId , 1 , Integer ::sum );
529+ scaleExecutorIfNecessary (zoneId );
530+ executor = zoneExecutorMap .get (zoneId );
519531 }
520- scaleExecutorIfNecessary (zoneId );
521- return zoneExecutorMap .get (zoneId ).submit (task );
532+ return executor .submit (task );
522533 }
523534
524535 protected <T > Future <T > submitKvmIncrementalMigration (Long zoneId , Callable <T > task ) {
@@ -540,24 +551,23 @@ protected void scaleExecutorIfNecessary(Long zoneId) {
540551 }
541552 }
542553
543- protected void tryCleaningUpExecutor (Long zoneId ) {
554+ protected synchronized void tryCleaningUpExecutor (Long zoneId ) {
544555 if (!zoneExecutorMap .containsKey (zoneId )) {
545556 logger .debug ("No executor exists for zone [{}]." , zoneId );
546557 return ;
547558 }
548559
549- ThreadPoolExecutor executor = zoneExecutorMap .get (zoneId );
550- synchronized (executor ) {
551- int activeTasks = executor .getActiveCount ();
552- if (activeTasks > 1 ) {
553- logger .debug ("Not cleaning executor of zone [{}] yet, as there are [{}] active tasks." , zoneId , activeTasks );
554- return ;
555- }
556-
557- logger .debug ("Cleaning executor of zone [{}]." , zoneId );
558- zoneExecutorMap .remove (zoneId );
559- executor .shutdown ();
560+ zonePendingWorkCountMap .merge (zoneId , -1 , Integer ::sum );
561+ Integer pendingWorkCount = zonePendingWorkCountMap .get (zoneId );
562+ if (pendingWorkCount > 0 ) {
563+ logger .debug ("Not cleaning executor of zone [{}] yet, as there is [{}] pending work." , zoneId , pendingWorkCount );
564+ return ;
560565 }
566+
567+ logger .debug ("Cleaning executor of zone [{}]." , zoneId );
568+ ThreadPoolExecutor executor = zoneExecutorMap .get (zoneId );
569+ zoneExecutorMap .remove (zoneId );
570+ executor .shutdown ();
561571 }
562572
563573 private MigrationResponse handleResponse (List <Future <DataObjectResult >> futures , MigrationPolicy migrationPolicy , String message , boolean success ) {
@@ -733,10 +743,13 @@ private class MigrateDataTask implements Callable<DataObjectResult> {
733743 private DataStore destDataStore ;
734744 private Map <DataObject , Pair <List <SnapshotInfo >, Long >> snapshotChain ;
735745 private Map <DataObject , Pair <List <TemplateInfo >, Long >> templateChain ;
746+ private String logid ;
747+
736748 public MigrateDataTask (DataObject file , DataStore srcDataStore , DataStore destDataStore ) {
737749 this .file = file ;
738750 this .srcDataStore = srcDataStore ;
739751 this .destDataStore = destDataStore ;
752+ this .logid = ThreadContext .get (LOGCONTEXTID );
740753 }
741754
742755 public void setSnapshotChains (Map <DataObject , Pair <List <SnapshotInfo >, Long >> snapshotChain ) {
@@ -758,6 +771,8 @@ public DataObject getFile() {
758771
759772 @ Override
760773 public DataObjectResult call () {
774+ ThreadContext .put (LOGCONTEXTID , logid );
775+
761776 DataObjectResult result ;
762777 AsyncCallFuture <DataObjectResult > future = secStgSrv .migrateData (file , srcDataStore , destDataStore , snapshotChain , templateChain );
763778 try {
@@ -767,22 +782,29 @@ public DataObjectResult call() {
767782 result = new DataObjectResult (file );
768783 result .setResult (e .toString ());
769784 }
785+
770786 tryCleaningUpExecutor (srcDataStore .getScope ().getScopeId ());
787+ ThreadContext .clearAll ();
788+
771789 return result ;
772790 }
773791 }
774792
775793 private class CopyTemplateTask implements Callable <TemplateApiResult > {
776794 private TemplateInfo sourceTmpl ;
777795 private DataStore destStore ;
796+ private String logid ;
778797
779798 public CopyTemplateTask (TemplateInfo sourceTmpl , DataStore destStore ) {
780799 this .sourceTmpl = sourceTmpl ;
781800 this .destStore = destStore ;
801+ this .logid = ThreadContext .get (LOGCONTEXTID );
782802 }
783803
784804 @ Override
785805 public TemplateApiResult call () {
806+ ThreadContext .put (LOGCONTEXTID , logid );
807+
786808 TemplateApiResult result ;
787809 AsyncCallFuture <TemplateApiResult > future = templateService .copyTemplateToImageStore (sourceTmpl , destStore );
788810 try {
@@ -793,7 +815,10 @@ public TemplateApiResult call() {
793815 result = new TemplateApiResult (sourceTmpl );
794816 result .setResult (e .getMessage ());
795817 }
818+
796819 tryCleaningUpExecutor (destStore .getScope ().getScopeId ());
820+ ThreadContext .clearAll ();
821+
797822 return result ;
798823 }
799824 }
0 commit comments