Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@
import org.zstack.core.workflow.FlowChainBuilder;
import org.zstack.core.workflow.ShareFlow;
import org.zstack.header.core.Completion;
import org.zstack.header.core.NoErrorCompletion;
import org.zstack.header.core.NopeCompletion;
import org.zstack.header.core.ReturnValueCompletion;
import org.zstack.header.core.WhileDoneCompletion;
Expand Down Expand Up @@ -172,7 +173,31 @@ protected void handleApiMessage(APIMessage msg) {
}
}

private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
private void handle(final APIUpdateExternalPrimaryStorageMsg msg) {
thdf.chainSubmit(new ChainTask(msg) {
@Override
public String getSyncSignature() {
return String.format("update-external-primary-storage-%s", msg.getUuid());
}

@Override
public void run(SyncTaskChain chain) {
doUpdateExternalPrimaryStorageInQueue(msg, new NoErrorCompletion(chain) {
@Override
public void done() {
chain.next();
}
});
}

@Override
public String getName() {
return getSyncSignature();
}
});
}

private void doUpdateExternalPrimaryStorageInQueue(APIUpdateExternalPrimaryStorageMsg msg, NoErrorCompletion completion) {
APIUpdateExternalPrimaryStorageEvent evt = new APIUpdateExternalPrimaryStorageEvent(msg.getId());
if (msg.getName() != null) {
externalVO.setName(msg.getName());
Expand All @@ -188,7 +213,7 @@ private void handle(APIUpdateExternalPrimaryStorageMsg msg) {
}
boolean needReconnect = false;
String oldConfig = externalVO.getConfig();
if (msg.getConfig() != null) {
if (msg.getConfig() != null && !msg.getConfig().equals(oldConfig)) {
String config = controller.validateConfig(msg.getConfig());
externalVO.setConfig(config);
needReconnect = true;
Expand Down Expand Up @@ -216,13 +241,15 @@ public void run(MessageReply reply) {
}

bus.publish(evt);
completion.done();
}
});
return;
}

evt.setInventory(externalVO.toInventory());
bus.publish(evt);
completion.done();
}

@Override
Expand Down
Original file line number Diff line number Diff line change
@@ -1,9 +1,11 @@
package org.zstack.test.integration.storage.primary.addon.zbs

import org.springframework.http.HttpEntity
import org.zstack.core.cloudbus.CloudBus
import org.zstack.core.cloudbus.EventCallback
import org.zstack.core.cloudbus.EventFacade
import org.zstack.core.db.Q
import org.zstack.core.thread.ThreadFacadeImpl
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageSpaceVO
import org.zstack.header.storage.addon.primary.ExternalPrimaryStorageVO_
Expand All @@ -13,6 +15,8 @@ import org.zstack.header.storage.primary.PrimaryStorageCapacityVO_
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO
import org.zstack.header.storage.primary.PrimaryStorageHostRefVO_
import org.zstack.header.storage.primary.PrimaryStorageStatus
import org.zstack.header.storage.primary.ReconnectPrimaryStorageMsg
import org.zstack.header.storage.primary.ReconnectPrimaryStorageReply
import org.zstack.storage.zbs.MdsUri
import org.zstack.sdk.*
import org.zstack.storage.addon.primary.ExternalPrimaryStorageSystemTags
Expand Down Expand Up @@ -47,6 +51,8 @@ class ZbsPrimaryStorageCase extends SubCase {
VolumeInventory vol, vol2
KVMHostInventory kvm
EventFacade evtf
ThreadFacadeImpl thdf
AtomicInteger reconnectMsgCount = new AtomicInteger(0)

@Override
void clean() {
Expand Down Expand Up @@ -165,6 +171,7 @@ class ZbsPrimaryStorageCase extends SubCase {
diskOffering = env.inventoryByName("diskOffering") as DiskOfferingInventory
kvm = env.inventoryByName("kvm-1") as KVMHostInventory
evtf = bean(EventFacade.class)
thdf = bean(ThreadFacadeImpl.class)

testSyncPrimaryStorageCapacityConcurrently()
testDefaultConfig()
Expand Down Expand Up @@ -294,6 +301,53 @@ class ZbsPrimaryStorageCase extends SubCase {
assert Q.New(ExternalPrimaryStorageSpaceVO.class)
.eq(ExternalPrimaryStorageSpaceVO_.primaryStorageUuid, ps.uuid)
.count() == 1

def signature = String.format("update-external-primary-storage-%s", ps.uuid)
def run = thdf.getChainTaskInfo(signature).getRunningTask().size()
retryInSecs {
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
}
env.message(ReconnectPrimaryStorageMsg.class) { ReconnectPrimaryStorageMsg msg, CloudBus bus ->
if (ps != null && msg.getPrimaryStorageUuid() == ps.uuid) {
reconnectMsgCount.incrementAndGet()
run = thdf.getChainTaskInfo(signature).getRunningTask().size()
assert run == 1
}
def reply = new ReconnectPrimaryStorageReply()
bus.reply(msg, reply)
}
def oldReconnectMsgCount=reconnectMsgCount.get()
String oldConfig = Q.New(ExternalPrimaryStorageVO.class)
.select(ExternalPrimaryStorageVO_.config)
.eq(ExternalPrimaryStorageVO_.uuid, ps.uuid)
.findValue()
Thread.start {
updateExternalPrimaryStorage {
uuid = ps.uuid
config = oldConfig
}
}
sleep(1000)
Thread.start {
updateExternalPrimaryStorage {
uuid = ps.uuid
config ="{\"mdsUrls\":[\"root:password@127.0.1.4\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
}
}
Thread.start {
updateExternalPrimaryStorage {
uuid = ps.uuid
config ="{\"mdsUrls\":[\"root:password@127.0.1.5\",\"root:password@127.0.1.2\",\"root:password@127.0.1.3\"],\"logicalPoolName\":\"lpool1\"}"
}
}
retryInSecs {
assert reconnectMsgCount.get() >=1
assert thdf.getChainTaskInfo(signature).getRunningTask().size() == 0
assert thdf.getChainTaskInfo(signature).getPendingTask().size() == 0
}
assert reconnectMsgCount.get() == oldReconnectMsgCount + 2
env.revokeMessage(ReconnectPrimaryStorageMsg.class, null)
// update multi pools
// Config.Pool
updateExternalPrimaryStorage {
Expand Down