diff --git a/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java b/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java index 689c2510d9..b2db21cce5 100644 --- a/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java +++ b/storage/src/main/java/org/zstack/storage/addon/primary/ExternalPrimaryStorage.java @@ -18,6 +18,7 @@ import org.zstack.core.workflow.FlowChainBuilder; import org.zstack.core.workflow.ShareFlow; import org.zstack.header.core.Completion; +import org.zstack.header.core.NoErrorCompletion; import org.zstack.header.core.NopeCompletion; import org.zstack.header.core.ReturnValueCompletion; import org.zstack.header.core.WhileDoneCompletion; @@ -172,7 +173,31 @@ protected void handleApiMessage(APIMessage msg) { } } - private void handle(APIUpdateExternalPrimaryStorageMsg msg) { + private void handle(final APIUpdateExternalPrimaryStorageMsg msg) { + thdf.chainSubmit(new ChainTask(msg) { + @Override + public String getSyncSignature() { + return String.format("update-external-primary-storage-%s", msg.getUuid()); + } + + @Override + public void run(SyncTaskChain chain) { + doUpdateExternalPrimaryStorageInQueue(msg, new NoErrorCompletion(chain) { + @Override + public void done() { + chain.next(); + } + }); + } + + @Override + public String getName() { + return getSyncSignature(); + } + }); + } + + private void doUpdateExternalPrimaryStorageInQueue(APIUpdateExternalPrimaryStorageMsg msg, NoErrorCompletion completion) { APIUpdateExternalPrimaryStorageEvent evt = new APIUpdateExternalPrimaryStorageEvent(msg.getId()); if (msg.getName() != null) { externalVO.setName(msg.getName()); @@ -188,7 +213,7 @@ private void handle(APIUpdateExternalPrimaryStorageMsg msg) { } boolean needReconnect = false; String oldConfig = externalVO.getConfig(); - if (msg.getConfig() != null) { + if (msg.getConfig() != null && !msg.getConfig().equals(oldConfig)) { String config = controller.validateConfig(msg.getConfig()); externalVO.setConfig(config); needReconnect = true; @@ -216,6 +241,7 @@ public void run(MessageReply reply) { } bus.publish(evt); + completion.done(); } }); return; @@ -223,6 +249,7 @@ public void run(MessageReply reply) { evt.setInventory(externalVO.toInventory()); bus.publish(evt); + completion.done(); } @Override diff --git a/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy b/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy index d085ecec66..9b92f70e3e 100644 --- a/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy +++ b/test/src/test/groovy/org/zstack/test/integration/storage/primary/addon/zbs/ZbsPrimaryStorageCase.groovy @@ -1,9 +1,11 @@ package org.zstack.test.integration.storage.primary.addon.zbs import org.springframework.http.HttpEntity +import org.zstack.core.cloudbus.CloudBus import org.zstack.core.cloudbus.EventCallback import org.zstack.core.cloudbus.EventFacade import org.zstack.core.db.Q +import org.zstack.core.thread.ThreadFacadeImpl import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageSpaceVO import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO_ @@ -13,6 +15,8 @@ import org.zstack.header.storage.primary.PrimaryStorageCapacityVO_ import org.zstack.header.storage.primary.PrimaryStorageHostRefVO import org.zstack.header.storage.primary.PrimaryStorageHostRefVO_ import org.zstack.header.storage.primary.PrimaryStorageStatus +import org.zstack.header.storage.primary.ReconnectPrimaryStorageMsg +import org.zstack.header.storage.primary.ReconnectPrimaryStorageReply import org.zstack.storage.zbs.MdsUri import org.zstack.sdk.* import org.zstack.storage.addon.primary.ExternalPrimaryStorageSystemTags @@ -47,6 +51,8 @@ class ZbsPrimaryStorageCase extends SubCase { VolumeInventory vol, vol2 KVMHostInventory kvm EventFacade evtf + ThreadFacadeImpl thdf + AtomicInteger reconnectMsgCount = new AtomicInteger(0) @Override void clean() { @@ -165,6 +171,7 @@ class ZbsPrimaryStorageCase extends SubCase { diskOffering = env.inventoryByName("diskOffering") as DiskOfferingInventory kvm = env.inventoryByName("kvm-1") as KVMHostInventory evtf = bean(EventFacade.class) + thdf = bean(ThreadFacadeImpl.class) testSyncPrimaryStorageCapacityConcurrently() testDefaultConfig() @@ -294,6 +301,53 @@ class ZbsPrimaryStorageCase extends SubCase { assert Q.New(ExternalPrimaryStorageSpaceVO.class) .eq(ExternalPrimaryStorageSpaceVO_.primaryStorageUuid, ps.uuid) .count() == 1 + + def signature = String.format("update-external-primary-storage-%s", ps.uuid) + def run = thdf.getChainTaskInfo(signature).getRunningTask().size() + retryInSecs { + assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0 + assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0 + } + env.message(ReconnectPrimaryStorageMsg.class) { ReconnectPrimaryStorageMsg msg, CloudBus bus -> + if (ps != null && msg.getPrimaryStorageUuid() == ps.uuid) { + reconnectMsgCount.incrementAndGet() + run = thdf.getChainTaskInfo(signature).getRunningTask().size() + assert run == 1 + } + def reply = new ReconnectPrimaryStorageReply() + bus.reply(msg, reply) + } + def oldReconnectMsgCount=reconnectMsgCount.get() + String oldConfig = Q.New(ExternalPrimaryStorageVO.class) + .select(ExternalPrimaryStorageVO_.config) + .eq(ExternalPrimaryStorageVO_.uuid, ps.uuid) + .findValue() + Thread.start { + updateExternalPrimaryStorage { + uuid = ps.uuid + config = oldConfig + } + } + sleep(1000) + Thread.start { + updateExternalPrimaryStorage { + uuid = ps.uuid + config ="{\"mdsUrls\":[\"root:password@127.0.1.4\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}" + } + } + Thread.start { + updateExternalPrimaryStorage { + uuid = ps.uuid + config ="{\"mdsUrls\":[\"root:password@127.0.1.5\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}" + } + } + retryInSecs { + assert reconnectMsgCount.get() >=1 + assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0 + assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0 + } + assert reconnectMsgCount.get() == oldReconnectMsgCount + 2 + env.revokeMessage(ReconnectPrimaryStorageMsg.class, null) // update multi pools // Config.Pool updateExternalPrimaryStorage {