Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
7 changes: 5 additions & 2 deletions src/core/cm/launcher/launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -127,7 +127,7 @@ Error Launcher::Start()
UpdateInstanceStatuses();

if (auto err = mBalancer.LoadSMDataForActiveInstances(); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
LOG_ERR() << "Can't load SM data for active instances" << Log::Field(err);
}

if (auto err = mWorkerThread.Run([this](void*) { ProcessUpdate(); }); !err.IsNone()) {
Expand Down Expand Up @@ -464,7 +464,7 @@ void Launcher::ProcessUpdate()
void Launcher::WaitAllNodesConnected(UniqueLock<Mutex>& lock)
{
auto allNodesConnected = [this]() {
auto notConnected = [](const Node& node) { return !node.GetInfo().mIsConnected; };
auto notConnected = [](const Node& node) { return !node.IsConnected(); };

return !mNodeManager.GetNodes().ContainsIf(notConnected) || !mIsRunning;
};
Expand Down Expand Up @@ -615,6 +615,9 @@ Error Launcher::OnNodeInstancesStatusesReceived(const String& nodeID, const Arra
}

mProcessUpdatesCondVar.NotifyAll();
// Node is not connected untill it receives instance statuses.
// So, we need to trigger notification for waiting nodes after we handled statuses.
mAllNodesConnectedCondVar.NotifyAll();

return ErrorEnum::eNone;
}
Expand Down
12 changes: 12 additions & 0 deletions src/core/cm/launcher/node.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -182,13 +182,25 @@ void Node::UpdateMonitoringData(const monitoring::NodeMonitoringData& monitoring

bool Node::UpdateInfo(const UnitNodeInfo& info)
{
// Skip checking resources and runtimes if connection status is changed.
if (mInfo.mIsConnected != info.mIsConnected) {
mInfo.mResources = info.mResources;
mInfo.mRuntimes = info.mRuntimes;
}

// Skip connection status change.
mInfo.mIsConnected = info.mIsConnected;

// Check if node info has changed.
bool nodeChanged = mInfo != info;
if (nodeChanged) {
mInfo = info;
}

if (!info.mIsConnected) {
mIsNodeStatusReceived = false;
}

return nodeChanged;
}

Expand Down
16 changes: 15 additions & 1 deletion src/core/cm/launcher/node.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -149,6 +149,18 @@ class Node : public NodeItf {
*/
void UpdateConfig();

/**
* Indicates whether node is connected.
*
* @return bool.
*/
bool IsConnected() const { return mInfo.mIsConnected && mIsNodeStatusReceived; }

/**
* Notifies the node that its instance status has been received.
*/
void NotifyInstanceStatusReceived() { mIsNodeStatusReceived = true; }

private:
// Returns CPU usage without Aos service instances.
size_t GetSystemCPUUsage(const monitoring::NodeMonitoringData& monitoringData) const;
Expand All @@ -165,7 +177,9 @@ class Node : public NodeItf {
InstanceRunnerItf* mInstanceRunner {};

UnitNodeInfo mInfo {};
bool mNeedBalancing {};
bool mIsNodeStatusReceived {};

bool mNeedBalancing {};

size_t mTotalCPUUsage {};
size_t mTotalRAMUsage {};
Expand Down
4 changes: 3 additions & 1 deletion src/core/cm/launcher/nodemanager.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -143,7 +143,9 @@ Error NodeManager::NotifyNodeStatusReceived(const String& nodeID)
return AOS_ERROR_WRAP(Error(ErrorEnum::eNotFound, "node not found"));
}

if (node->GetInfo().mIsConnected && node->GetInfo().mState == NodeStateEnum::eProvisioned) {
node->NotifyInstanceStatusReceived();

if (node->IsConnected() && node->GetInfo().mState == NodeStateEnum::eProvisioned) {
if (mNodesExpectedToSendStatus.Remove(nodeID) != 0) {
mStatusUpdateCondVar.NotifyAll();
}
Expand Down
28 changes: 28 additions & 0 deletions src/core/cm/launcher/tests/launcher.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -682,6 +682,10 @@ TEST_F(CMLauncherTest, CacheInstances)

ASSERT_TRUE(mLauncher.Start().IsNone());

for (const auto& nodeID : {cNodeIDLocalSM, cNodeIDRemoteSM1, cNodeIDRemoteSM2}) {
mInstanceRunner.SendInitialStatuses(nodeID);
}

// Run instances 1
auto runRequest1 = std::make_unique<StaticArray<RunInstanceRequest, cMaxNumInstances>>();

Expand Down Expand Up @@ -779,6 +783,10 @@ TEST_F(CMLauncherTest, Components)

ASSERT_TRUE(mLauncher.Start().IsNone());

for (const auto& nodeID : {cNodeIDLocalSM, cNodeIDRemoteSM1}) {
mInstanceRunner.SendInitialStatuses(nodeID);
}

auto instanceStatusListener = std::make_unique<InstanceStatusListenerStub>();

mLauncher.SubscribeListener(*instanceStatusListener);
Expand Down Expand Up @@ -1475,6 +1483,10 @@ TEST_F(CMLauncherTest, Balancing)

ASSERT_TRUE(mLauncher.Start().IsNone());

for (const auto& nodeID : nodeIDs) {
mInstanceRunner.SendInitialStatuses(nodeID);
}

// Run instances
auto runStatuses = std::make_unique<StaticArray<InstanceStatus, cMaxNumInstances>>();
ASSERT_TRUE(mLauncher.RunInstances(testItem.mRunRequests, *runStatuses).IsNone());
Expand Down Expand Up @@ -1578,6 +1590,10 @@ TEST_F(CMLauncherTest, PlatformFiltering)

ASSERT_TRUE(mLauncher.Start().IsNone());

for (const auto& nodeID : {cNodeIDLocalSM, cNodeIDRemoteSM1, cNodeIDRemoteSM2}) {
mInstanceRunner.SendInitialStatuses(nodeID);
}

InstanceStatusListenerStub instanceStatusListener;
mLauncher.SubscribeListener(instanceStatusListener);

Expand Down Expand Up @@ -1672,6 +1688,8 @@ TEST_F(CMLauncherTest, ResendInstancesOnMismatchedNodeStatus)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

InstanceStatusListenerStub instanceStatusListener;
mLauncher.SubscribeListener(instanceStatusListener);

Expand Down Expand Up @@ -1752,6 +1770,8 @@ TEST_F(CMLauncherTest, SubjectChanged)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

// 1) Run a single instance with a single subject.
auto runRequest = std::make_unique<StaticArray<RunInstanceRequest, cMaxNumInstances>>();
runRequest->PushBack(CreateRunRequest(cService1, cSubject1, 50, 1));
Expand Down Expand Up @@ -1824,6 +1844,8 @@ TEST_F(CMLauncherTest, PrepareNetworkParamsFails)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

// Run a single instance.
auto runRequest = std::make_unique<StaticArray<RunInstanceRequest, cMaxNumInstances>>();
runRequest->PushBack(CreateRunRequest(cService1, cSubject1, 50, 1));
Expand Down Expand Up @@ -1896,6 +1918,8 @@ TEST_F(CMLauncherTest, TestSentInstanceInfo)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

InstanceStatusListenerStub instanceStatusListener;
mLauncher.SubscribeListener(instanceStatusListener);

Expand Down Expand Up @@ -1980,6 +2004,8 @@ TEST_F(CMLauncherTest, PreinstalledComponents)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

InstanceStatusListenerStub instanceStatusListener;
mLauncher.SubscribeListener(instanceStatusListener);

Expand Down Expand Up @@ -2075,6 +2101,8 @@ TEST_F(CMLauncherTest, SetStatusOnStart)

ASSERT_TRUE(mLauncher.Start().IsNone());

mInstanceRunner.SendInitialStatuses(cNodeIDLocalSM);

// Verify that both instances are activating
InstanceStatus expectedStatus1 = CreateInstanceStatus(CreateInstanceIdent(cService1, cSubject1, 0), cNodeIDLocalSM,
cRunnerRunc, aos::InstanceStateEnum::eActivating, ErrorEnum::eNone, "1.0.0", false, manifestDigest.CStr());
Expand Down
7 changes: 7 additions & 0 deletions src/core/cm/launcher/tests/stubs/instancerunnerstub.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -70,6 +70,13 @@ class InstanceRunnerStub : public InstanceRunnerItf {
mPreinstalledComponents = preinstalledComponents;
}

void SendInitialStatuses(const String& nodeID)
{
if (mStatusReceiver != nullptr) {
mStatusReceiver->OnNodeInstancesStatusesReceived(nodeID, Array<InstanceStatus>());
}
}

MOCK_METHOD(void, OnRunRequest, ());

// InstanceRunnerItf
Expand Down
Loading