From a1a6171ef921159f5ab7acdfde9bf6a69733b4dd Mon Sep 17 00:00:00 2001 From: Mykola Solianko Date: Tue, 10 Feb 2026 14:13:59 +0200 Subject: [PATCH 1/5] common: iamclient: cancel RegisterNode stream on reconnect Signed-off-by: Mykola Solianko --- src/common/iamclient/publicnodeservice.cpp | 4 ++++ 1 file changed, 4 insertions(+) diff --git a/src/common/iamclient/publicnodeservice.cpp b/src/common/iamclient/publicnodeservice.cpp index 9ee9e4f3..61616c03 100644 --- a/src/common/iamclient/publicnodeservice.cpp +++ b/src/common/iamclient/publicnodeservice.cpp @@ -75,6 +75,10 @@ Error PublicNodesService::Reconnect() mSubscriptionManager->Reconnect(mStub.get()); } + if (mRegisterNodeCtx) { + mRegisterNodeCtx->TryCancel(); + } + return ErrorEnum::eNone; } From 201565f7a0d51c10d1487435125fa7fa77facdca Mon Sep 17 00:00:00 2001 From: Mykola Solianko Date: Tue, 10 Feb 2026 14:16:21 +0200 Subject: [PATCH 2/5] mp: iamclient: refactor to use PublicNodesService Signed-off-by: Mykola Solianko --- src/mp/iamclient/CMakeLists.txt | 4 +- src/mp/iamclient/iamclient.cpp | 190 +++++++++ src/mp/iamclient/iamclient.hpp | 96 +++++ src/mp/iamclient/publicnodeclient.cpp | 362 ------------------ src/mp/iamclient/publicnodeclient.hpp | 152 -------- src/mp/iamclient/tests/iamclient.cpp | 63 +-- src/mp/iamclient/tests/mocks/certprovider.hpp | 31 -- src/mp/iamclient/tests/stubs/iamserver.hpp | 18 +- 8 files changed, 335 insertions(+), 581 deletions(-) create mode 100644 src/mp/iamclient/iamclient.cpp create mode 100644 src/mp/iamclient/iamclient.hpp delete mode 100644 src/mp/iamclient/publicnodeclient.cpp delete mode 100644 src/mp/iamclient/publicnodeclient.hpp delete mode 100644 src/mp/iamclient/tests/mocks/certprovider.hpp diff --git a/src/mp/iamclient/CMakeLists.txt b/src/mp/iamclient/CMakeLists.txt index 0d743c4c..f689fe1d 100644 --- a/src/mp/iamclient/CMakeLists.txt +++ b/src/mp/iamclient/CMakeLists.txt @@ -14,13 +14,13 @@ set(TARGET_NAME iamclient) # Sources # ###################################################################################################################### -set(SOURCES publicnodeclient.cpp) +set(SOURCES iamclient.cpp) # ###################################################################################################################### # Libraries # ###################################################################################################################### -set(LIBRARIES aos::mp::communication aos::common::logger aos::common::iamclient) +set(LIBRARIES aos::common::utils aos::common::iamclient) # ###################################################################################################################### # Target diff --git a/src/mp/iamclient/iamclient.cpp b/src/mp/iamclient/iamclient.cpp new file mode 100644 index 00000000..3c2970c1 --- /dev/null +++ b/src/mp/iamclient/iamclient.cpp @@ -0,0 +1,190 @@ +/* + * Copyright (C) 2025 EPAM Systems, Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#include + +#include "iamclient.hpp" + +namespace aos::mp::iamclient { + +/*********************************************************************************************************************** + * Public + **********************************************************************************************************************/ + +Error IAMClient::Init(const config::IAMConfig& cfg, aos::iamclient::CertProviderItf& certProvider, + common::iamclient::TLSCredentialsItf& tlsCredentials, bool provisioningMode) +{ + mCertProvider = &certProvider; + mCertStorage = cfg.mCertStorage; + + const bool publicServer = mCertStorage.empty(); + + LOG_DBG() << "Init IAM client: publicServer=" << publicServer << ", provisioningMode=" << provisioningMode; + + return PublicNodesService::Init(publicServer ? cfg.mIAMMainPublicServerURL : cfg.mIAMMainProtectedServerURL, + tlsCredentials, provisioningMode, publicServer, mCertStorage); +} + +Error IAMClient::Start() +{ + std::lock_guard lock {mMutex}; + + LOG_DBG() << "Start IAM client"; + + if (!mCertStorage.empty()) { + if (auto err = mCertProvider->SubscribeListener(String(mCertStorage.c_str()), *this); !err.IsNone()) { + return AOS_ERROR_WRAP(err); + } + } + + mStarted = true; + + mOutgoingMsgThread = std::thread(&IAMClient::ProcessOutgoingMessages, this); + + return PublicNodesService::Start(); +} + +void IAMClient::Stop() +{ + LOG_DBG() << "Stop IAM client"; + + { + std::lock_guard lock {mMutex}; + + if (!mStarted) { + return; + } + + mStarted = false; + + mOutgoingMsgChannel.Close(); + mIncomingMsgChannel.Close(); + } + + mCV.notify_all(); + + PublicNodesService::Stop(); + + if (mOutgoingMsgThread.joinable()) { + mOutgoingMsgThread.join(); + } + + if (!mCertStorage.empty()) { + mCertProvider->UnsubscribeListener(*this); + } +} + +Error IAMClient::SendMessages(std::vector messages) +{ + LOG_DBG() << "Send message"; + + return mOutgoingMsgChannel.Send(std::move(messages)); +} + +RetWithError> IAMClient::ReceiveMessages() +{ + LOG_DBG() << "Receive message"; + + return mIncomingMsgChannel.Receive(); +} + +/*********************************************************************************************************************** + * Protected + **********************************************************************************************************************/ + +Error IAMClient::ReceiveMessage(const iamanager::v6::IAMIncomingMessages& msg) +{ + std::vector message(msg.ByteSizeLong()); + + LOG_DBG() << "Received message" << Log::Field("msg", msg.DebugString()); + + if (!msg.SerializeToArray(message.data(), message.size())) { + return Error(ErrorEnum::eRuntime, "failed to serialize message"); + } + + if (auto err = mIncomingMsgChannel.Send(std::move(message)); !err.IsNone()) { + return AOS_ERROR_WRAP(err); + } + + return ErrorEnum::eNone; +} + +void IAMClient::OnConnected() +{ + LOG_DBG() << "IAM client connected"; + + std::lock_guard lock {mMutex}; + + mConnected = true; + mCV.notify_all(); +} + +void IAMClient::OnDisconnected() +{ + LOG_DBG() << "IAM client disconnected"; + + std::lock_guard lock {mMutex}; + + mConnected = false; +} + +/*********************************************************************************************************************** + * Private + **********************************************************************************************************************/ + +void IAMClient::OnCertChanged([[maybe_unused]] const CertInfo& info) +{ + LOG_INF() << "Certificate changed, reconnecting"; + + if (auto err = Reconnect(); !err.IsNone()) { + LOG_ERR() << "Failed to reconnect" << Log::Field(err); + } +} + +void IAMClient::ProcessOutgoingMessages() +{ + LOG_DBG() << "Processing outgoing messages"; + + while (mStarted) { + auto [msg, err] = mOutgoingMsgChannel.Receive(); + if (!err.IsNone()) { + LOG_ERR() << "Failed to receive message" << Log::Field(err); + + return; + } + + { + std::unique_lock lock {mMutex}; + + LOG_DBG() << "Received message from channel"; + + mCV.wait(lock, [this] { return mConnected || !mStarted; }); + + if (!mStarted) { + return; + } + + iamanager::v6::IAMOutgoingMessages outgoingMsg; + if (!outgoingMsg.ParseFromArray(msg.data(), static_cast(msg.size()))) { + LOG_ERR() << "Failed to parse outgoing message"; + + continue; + } + + LOG_DBG() << "Sending message: msg=" << outgoingMsg.DebugString().c_str(); + + if (auto sendErr = SendMessage(outgoingMsg); !sendErr.IsNone()) { + LOG_ERR() << "Failed to send message" << Log::Field(sendErr); + + mMessageCache.push(outgoingMsg); + + continue; + } + } + } +} + +} // namespace aos::mp::iamclient diff --git a/src/mp/iamclient/iamclient.hpp b/src/mp/iamclient/iamclient.hpp new file mode 100644 index 00000000..d8ce4bed --- /dev/null +++ b/src/mp/iamclient/iamclient.hpp @@ -0,0 +1,96 @@ +/* + * Copyright (C) 2025 EPAM Systems, Inc. + * + * SPDX-License-Identifier: Apache-2.0 + */ + +#ifndef AOS_MP_IAMCLIENT_IAMCLIENT_HPP_ +#define AOS_MP_IAMCLIENT_IAMCLIENT_HPP_ + +#include +#include +#include +#include + +#include +#include + +#include + +#include +#include +#include + +namespace aos::mp::iamclient { + +/** + * IAM client. + */ +class IAMClient : public common::iamclient::PublicNodesService, private aos::iamclient::CertListenerItf { +public: + /** + * Initializes the client. + * + * @param cfg Configuration. + * @param certProvider Certificate provider. + * @param tlsCredentials TLS credentials. + * @param provisioningMode Whether the node is in provisioning mode (uses insecure connection). + * @return Error error code. + */ + Error Init(const config::IAMConfig& cfg, aos::iamclient::CertProviderItf& certProvider, + common::iamclient::TLSCredentialsItf& tlsCredentials, bool provisioningMode = false); + + /** + * Starts the client. + * + * @return Error error code. + */ + Error Start(); + + /** + * Stops the client. + */ + void Stop(); + + /** + * Sends messages. + * + * @param messages Messages. + * @return Error error code. + */ + Error SendMessages(std::vector messages); + + /** + * Receives messages. + * + * @return Messages. + */ + RetWithError> ReceiveMessages(); + +protected: + Error ReceiveMessage(const iamanager::v6::IAMIncomingMessages& msg) override; + void OnConnected() override; + void OnDisconnected() override; + +private: + void OnCertChanged(const CertInfo& info) override; + void ProcessOutgoingMessages(); + + aos::iamclient::CertProviderItf* mCertProvider {}; + std::string mCertStorage; + + std::thread mOutgoingMsgThread; + std::mutex mMutex; + std::condition_variable mCV; + std::atomic mStarted {}; + bool mConnected {}; + + common::utils::Channel> mOutgoingMsgChannel; + common::utils::Channel> mIncomingMsgChannel; + + std::queue mMessageCache; +}; + +} // namespace aos::mp::iamclient + +#endif diff --git a/src/mp/iamclient/publicnodeclient.cpp b/src/mp/iamclient/publicnodeclient.cpp deleted file mode 100644 index e7673a6d..00000000 --- a/src/mp/iamclient/publicnodeclient.cpp +++ /dev/null @@ -1,362 +0,0 @@ -/* - * Copyright (C) 2024 EPAM Systems, Inc. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#include -#include - -#include "publicnodeclient.hpp" - -namespace aos::mp::iamclient { - -/*********************************************************************************************************************** - * Public - **********************************************************************************************************************/ - -Error PublicNodeClient::Init( - const config::IAMConfig& cfg, common::iamclient::TLSCredentialsItf& certProvider, bool publicServer) -{ - LOG_DBG() << "Init public node client: publicServer=" << publicServer; - - mCertStorage = cfg.mCertStorage; - mCertProvider = &certProvider; - mPublicServer = publicServer; - - if (auto err = CreateCredentials(); !err.IsNone()) { - return err; - } - - mUrl = publicServer ? cfg.mIAMMainPublicServerURL : cfg.mIAMMainProtectedServerURL; - - return ErrorEnum::eNone; -} - -Error PublicNodeClient::SendMessages(std::vector messages) -{ - LOG_DBG() << "Sending messages"; - - return mOutgoingMsgChannel.Send(std::move(messages)); -} - -RetWithError> PublicNodeClient::ReceiveMessages() -{ - LOG_DBG() << "Receiving messages"; - - return mIncomingMsgChannel.Receive(); -} - -void PublicNodeClient::OnConnected() -{ - std::lock_guard lock {mMutex}; - - if (!mNotifyConnected) { - mNotifyConnected = true; - - mConnectionThread = std::thread(&PublicNodeClient::ConnectionLoop, this, mUrl); - mHandlerOutgoingMsgsThread = std::thread(&PublicNodeClient::ProcessOutgoingIAMMessages, this); - } -} - -void PublicNodeClient::OnDisconnected() -{ - Close(); -} - -void PublicNodeClient::OnCertChanged([[maybe_unused]] const CertInfo& info) -{ - LOG_DBG() << "Certificate changed"; - - auto task = [this] { - { - std::lock_guard lock {mMutex}; - - if (mPublicServer) { - LOG_DBG() << "Skipping certificate change for public server"; - - return; - } - - mConnected = false; - - if (mRegisterNodeCtx) { - mRegisterNodeCtx->TryCancel(); - } - } - - while (!mShutdown) { - auto res = mCertProvider->GetMTLSClientCredentials(mCertStorage.c_str()); - if (!res.mError.IsNone()) { - std::unique_lock lock {mMutex}; - - LOG_ERR() << "Failed to get mTLS config: error=" << res.mError.Message(); - - mCV.wait_for(lock, cReconnectInterval, [this] { return mShutdown.load(); }); - - continue; - } - - if (mCredentialList.empty()) { - mCredentialList.push_back(res.mValue); - - return; - } - - mCredentialList.back() = res.mValue; - - return; - } - }; - - try { - mThreadPool.start(*makeRunnable(std::move(task))); - } catch (const Poco::Exception& e) { - LOG_ERR() << "Failed to start cert change task: error=" << e.displayText().c_str(); - } catch (const std::exception& e) { - LOG_ERR() << "Failed to start cert change task: error=" << e.what(); - } -} - -/*********************************************************************************************************************** - * Private - **********************************************************************************************************************/ - -void PublicNodeClient::Close() -{ - LOG_INF() << "Destroying public node client"; - - { - std::unique_lock lock {mMutex}; - - if (mShutdown) { - return; - } - - mOutgoingMsgChannel.Close(); - mIncomingMsgChannel.Close(); - - if (!mNotifyConnected) { - return; - } - - mShutdown = true; - mNotifyConnected = false; - - if (mRegisterNodeCtx) { - mRegisterNodeCtx->TryCancel(); - } - } - - mCV.notify_all(); - - mThreadPool.joinAll(); - - if (mConnectionThread.joinable()) { - mConnectionThread.join(); - } - - if (mHandlerOutgoingMsgsThread.joinable()) { - mHandlerOutgoingMsgsThread.join(); - } -} - -Error PublicNodeClient::CreateCredentials() -{ - if (mPublicServer) { - mCredentialList.push_back(grpc::InsecureChannelCredentials()); - - auto res = mCertProvider->GetTLSClientCredentials(); - if (!res.mError.IsNone()) { - return AOS_ERROR_WRAP(res.mError); - } - - mCredentialList.push_back(res.mValue); - - return ErrorEnum::eNone; - } - - auto res = mCertProvider->GetMTLSClientCredentials(mCertStorage.c_str()); - if (!res.mError.IsNone()) { - return AOS_ERROR_WRAP(res.mError); - } - - mCredentialList.push_back(res.mValue); - - return ErrorEnum::eNone; -} - -void PublicNodeClient::ConnectionLoop(const std::string& url) noexcept -{ - LOG_DBG() << "public node client connection loop started"; - - while (!mShutdown) { - try { - if (auto err = RegisterNode(url); !err.IsNone()) { - LOG_ERR() << "Failed to register node: error=" << err.Message(); - } - } catch (const std::exception& e) { - LOG_WRN() << "Failed to connect: error=" << e.what(); - } - - std::unique_lock lock {mMutex}; - - mCV.wait_for(lock, cReconnectInterval, [this]() { return mShutdown.load(); }); - } - - LOG_DBG() << "public node client connection loop stopped"; -} - -Error PublicNodeClient::RegisterNode(const std::string& url) -{ - std::unique_lock lock {mMutex}; - - LOG_DBG() << "Registering node: url=" << url.c_str(); - - for (const auto& credentials : mCredentialList) { - mConnected = false; - - if (mShutdown) { - return ErrorEnum::eNone; - } - - auto channel = grpc::CreateCustomChannel(url, credentials, grpc::ChannelArguments()); - if (!channel) { - LOG_ERR() << "Failed to create channel"; - - continue; - } - - mPublicNodeServiceStub = PublicNodeService::NewStub(channel); - if (!mPublicNodeServiceStub) { - LOG_ERR() << "Failed to create stub"; - - continue; - } - - mRegisterNodeCtx = std::make_unique(); - mStream = mPublicNodeServiceStub->RegisterNode(mRegisterNodeCtx.get()); - if (!mStream) { - LOG_ERR() << "Failed to create stream"; - - continue; - } - - LOG_DBG() << "Connection established"; - - if (auto err = SendCachedMessages(); !err.IsNone()) { - LOG_ERR() << "Failed to send cached messages: error=" << err.Message(); - - continue; - } - - mConnected = true; - lock.unlock(); - - mCV.notify_all(); - - LOG_DBG() << "Try handling incoming messages url=" << url.c_str(); - - if (auto err = HandleIncomingMessages(); !err.IsNone()) { - LOG_ERR() << "Failed to handle incoming messages: error=" << err.Message(); - } - - lock.lock(); - } - - return Error(ErrorEnum::eRuntime, "failed to register node"); -} - -Error PublicNodeClient::HandleIncomingMessages() -{ - iamanager::v5::IAMIncomingMessages incomingMsg; - - LOG_DBG() << "Handle incoming messages"; - - while (mStream->Read(&incomingMsg)) { - std::vector message(incomingMsg.ByteSizeLong()); - - LOG_DBG() << "Received message: msg=" << incomingMsg.DebugString().c_str(); - - if (!incomingMsg.SerializeToArray(message.data(), message.size())) { - LOG_ERR() << "Failed to serialize message"; - - continue; - } - - if (auto err = mIncomingMsgChannel.Send(std::move(message)); !err.IsNone()) { - return Error(ErrorEnum::eRuntime, "failed to send message"); - } - } - - return ErrorEnum::eNone; -} - -void PublicNodeClient::ProcessOutgoingIAMMessages() -{ - LOG_DBG() << "Processing outgoing IAM messages"; - - while (!mShutdown) { - auto [msg, err] = mOutgoingMsgChannel.Receive(); - if (!err.IsNone()) { - LOG_ERR() << "Failed to receive message: error=" << err; - - return; - } - - { - std::unique_lock lock {mMutex}; - - LOG_DBG() << "Received message from IAM"; - - mCV.wait(lock, [this] { return mConnected || mShutdown; }); - - if (mShutdown) { - return; - } - - iamanager::v5::IAMOutgoingMessages outgoingMsg; - if (!outgoingMsg.ParseFromArray(msg.data(), static_cast(msg.size()))) { - LOG_ERR() << "Failed to parse outgoing message"; - - continue; - } - - LOG_DBG() << "Sending message to IAM: msg=" << outgoingMsg.DebugString().c_str(); - - if (!mStream->Write(outgoingMsg)) { - LOG_ERR() << "Failed to send message"; - - CacheMessage(outgoingMsg); - - continue; - } - } - } -} - -void PublicNodeClient::CacheMessage(const iamanager::v5::IAMOutgoingMessages& message) -{ - LOG_DBG() << "Caching message"; - - mMessageCache.push(message); -} - -Error PublicNodeClient::SendCachedMessages() -{ - while (!mMessageCache.empty()) { - const auto& message = mMessageCache.front(); - - if (!mStream->Write(message)) { - return Error(ErrorEnum::eRuntime, "failed to send cached message"); - } - - mMessageCache.pop(); - - LOG_DBG() << "Cached message sent"; - } - - return ErrorEnum::eNone; -} - -} // namespace aos::mp::iamclient diff --git a/src/mp/iamclient/publicnodeclient.hpp b/src/mp/iamclient/publicnodeclient.hpp deleted file mode 100644 index 8dfe7cf8..00000000 --- a/src/mp/iamclient/publicnodeclient.hpp +++ /dev/null @@ -1,152 +0,0 @@ -/* - * Copyright (C) 2024 EPAM Systems, Inc. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef AOS_MP_IAMCLIENT_PUBLICNODECLIENT_HPP_ -#define AOS_MP_IAMCLIENT_PUBLICNODECLIENT_HPP_ - -#include -#include -#include -#include -#include -#include - -#include -#include -#include -#include - -#include - -#include - -#include -#include -#include -#include -#include - -namespace aos::mp::iamclient { - -/** - * Public node client interface. - */ -class PublicNodeClient : public communication::HandlerItf, public aos::iamclient::CertListenerItf { -public: - /** - * Constructor. - */ - PublicNodeClient() = default; - - /** - * Initializes the client. - * - * @param cfg Configuration. - * @param certProvider Certificate provider. - * @param publicServer Public server. - * @return Error error code. - */ - Error Init( - const config::IAMConfig& cfg, common::iamclient::TLSCredentialsItf& certProvider, bool publicServer = true); - - /** - * Notifies that connection is established. - */ - void OnConnected() override; - - /** - * Notifies that connection is lost. - */ - void OnDisconnected() override; - - /** - * Sends messages. - * - * @param messages Messages. - * @return Error error code. - */ - Error SendMessages(std::vector messages) override; - - /** - * Receives messages. - * - * @return Messages. - */ - RetWithError> ReceiveMessages() override; - - /** - * Subscribes to certificate changes. - * - * @param certType Certificate type. - */ - void OnCertChanged(const CertInfo& info) override; - -private: - using StreamPtr = std::unique_ptr< - grpc::ClientReaderWriterInterface>; - using PublicNodeService = iamanager::v5::IAMPublicNodesService; - using PublicNodeServiceStubPtr = std::unique_ptr; - using HandlerFunc = std::function; - - template - class RunnableWrapper : public Poco::Runnable { - F mFunc; - - public: - explicit RunnableWrapper(F&& func) - : mFunc(std::move(func)) - { - } - - void run() override { mFunc(); } - }; - - template - static auto makeRunnable(F&& f) - { - return new RunnableWrapper(std::forward(f)); - } - - static constexpr auto cReconnectInterval = std::chrono::seconds(3); - - Error CreateCredentials(); - void ConnectionLoop(const std::string& url) noexcept; - Error HandleIncomingMessages(); - Error RegisterNode(const std::string& url); - void InitializeHandlers(); - void ProcessOutgoingIAMMessages(); - void Close(); - void CacheMessage(const iamanager::v5::IAMOutgoingMessages& message); - Error SendCachedMessages(); - - std::vector> mCredentialList; - std::string mCertStorage; - common::iamclient::TLSCredentialsItf* mCertProvider {}; - - std::unique_ptr mRegisterNodeCtx; - StreamPtr mStream; - PublicNodeServiceStubPtr mPublicNodeServiceStub; - Poco::ThreadPool mThreadPool; - - std::thread mConnectionThread; - std::thread mHandlerOutgoingMsgsThread; - std::condition_variable mCV; - std::atomic mShutdown {}; - bool mConnected {}; - bool mNotifyConnected {}; - std::mutex mMutex; - std::string mUrl; - bool mPublicServer {}; - - common::utils::Channel> mOutgoingMsgChannel; - common::utils::Channel> mIncomingMsgChannel; - - std::queue mMessageCache; -}; - -} // namespace aos::mp::iamclient - -#endif diff --git a/src/mp/iamclient/tests/iamclient.cpp b/src/mp/iamclient/tests/iamclient.cpp index cd9d6785..5a5fd750 100644 --- a/src/mp/iamclient/tests/iamclient.cpp +++ b/src/mp/iamclient/tests/iamclient.cpp @@ -9,11 +9,13 @@ #include +#include #include -#include +#include + +#include -#include "mocks/certprovider.hpp" #include "stubs/iamserver.hpp" using namespace testing; @@ -42,11 +44,13 @@ class IamClientTest : public Test { mClient.emplace(); } + void TearDown() override { mClient->Stop(); } + std::optional mIAMServerStub; - std::optional mClient; - crypto::x509::ProviderItf* mCryptoProvider {}; - config::Config mConfig {}; + std::optional mClient; + crypto::x509::ProviderItf* mCryptoProvider {}; + config::Config mConfig {}; }; /*********************************************************************************************************************** @@ -55,16 +59,18 @@ class IamClientTest : public Test { TEST_F(IamClientTest, RegisterNodeOutgoingMessages) { - common::iamclient::MockCertProvider certProvider {}; - EXPECT_CALL(certProvider, GetTLSClientCredentials()).WillOnce(testing::Return(grpc::InsecureChannelCredentials())); + aos::iamclient::CertProviderMock certProvider {}; + TLSCredentialsMock tlsCredentials {}; + + auto err = mClient->Init(mConfig.mIAMConfig, certProvider, tlsCredentials, true); + ASSERT_EQ(err, ErrorEnum::eNone); - auto err = mClient->Init(mConfig.mIAMConfig, certProvider); + err = mClient->Start(); ASSERT_EQ(err, ErrorEnum::eNone); - mClient->OnConnected(); EXPECT_TRUE(mIAMServerStub->WaitForConnection()); - iamanager::v5::IAMOutgoingMessages outgoingMsg; + iamanager::v6::IAMOutgoingMessages outgoingMsg; // send StartProvisioningResponse outgoingMsg.mutable_start_provisioning_response(); @@ -137,22 +143,22 @@ TEST_F(IamClientTest, RegisterNodeOutgoingMessages) mIAMServerStub->WaitResponse(); outgoingMsg = mIAMServerStub->GetOutgoingMessage(); EXPECT_TRUE(outgoingMsg.has_cert_types_response()); - - mClient->OnDisconnected(); } TEST_F(IamClientTest, RegisterNodeIncomingMessages) { - common::iamclient::MockCertProvider certProvider {}; - EXPECT_CALL(certProvider, GetTLSClientCredentials()).WillOnce(testing::Return(grpc::InsecureChannelCredentials())); + aos::iamclient::CertProviderMock certProvider {}; + TLSCredentialsMock tlsCredentials {}; + + auto err = mClient->Init(mConfig.mIAMConfig, certProvider, tlsCredentials, true); + ASSERT_EQ(err, ErrorEnum::eNone); - auto err = mClient->Init(mConfig.mIAMConfig, certProvider); + err = mClient->Start(); ASSERT_EQ(err, ErrorEnum::eNone); - mClient->OnConnected(); EXPECT_TRUE(mIAMServerStub->WaitForConnection()); - iamanager::v5::IAMIncomingMessages incomingMsg; + iamanager::v6::IAMIncomingMessages incomingMsg; // receive StartProvisioningRequest incomingMsg.mutable_start_provisioning_request(); @@ -217,28 +223,35 @@ TEST_F(IamClientTest, RegisterNodeIncomingMessages) EXPECT_EQ(res.mError, ErrorEnum::eNone); EXPECT_TRUE(incomingMsg.ParseFromArray(res.mValue.data(), res.mValue.size())); EXPECT_TRUE(incomingMsg.has_apply_cert_request()); - - mClient->OnDisconnected(); } TEST_F(IamClientTest, CertChanged) { - common::iamclient::MockCertProvider certProvider {}; - EXPECT_CALL(certProvider, GetMTLSClientCredentials(_)) + aos::iamclient::CertProviderMock certProvider {}; + TLSCredentialsMock tlsCredentials {}; + EXPECT_CALL(tlsCredentials, GetMTLSClientCredentials(_)) .Times(2) .WillRepeatedly(testing::Return(std::shared_ptr(grpc::InsecureChannelCredentials()))); - auto err = mClient->Init(mConfig.mIAMConfig, certProvider, false); + EXPECT_CALL(certProvider, SubscribeListener(_, _)).WillOnce(testing::Return(ErrorEnum::eNone)); + EXPECT_CALL(certProvider, UnsubscribeListener(_)).WillOnce(testing::Return(ErrorEnum::eNone)); + + mConfig.mIAMConfig.mCertStorage = "iam"; + + auto err = mClient->Init(mConfig.mIAMConfig, certProvider, tlsCredentials); + ASSERT_EQ(err, ErrorEnum::eNone); + + err = mClient->Start(); ASSERT_EQ(err, ErrorEnum::eNone); - mClient->OnConnected(); EXPECT_TRUE(mIAMServerStub->WaitForConnection()); - mClient->OnCertChanged(CertInfo {}); + err = mClient->Reconnect(); + EXPECT_EQ(err, ErrorEnum::eNone); EXPECT_TRUE(mIAMServerStub->WaitForDisconnection()); EXPECT_TRUE(mIAMServerStub->WaitForConnection()); - mClient->OnDisconnected(); + mClient->Stop(); } } // namespace aos::mp::iamclient diff --git a/src/mp/iamclient/tests/mocks/certprovider.hpp b/src/mp/iamclient/tests/mocks/certprovider.hpp deleted file mode 100644 index 5d1a1d4d..00000000 --- a/src/mp/iamclient/tests/mocks/certprovider.hpp +++ /dev/null @@ -1,31 +0,0 @@ -/* - * Copyright (C) 2024 Renesas Electronics Corporation. - * Copyright (C) 2024 EPAM Systems, Inc. - * - * SPDX-License-Identifier: Apache-2.0 - */ - -#ifndef AOS_MP_IAMCLIENT_CERTPROVIDER_HPP_ -#define AOS_MP_IAMCLIENT_CERTPROVIDER_HPP_ - -#include -#include - -#include - -namespace aos::common::iamclient { - -class MockCertProvider : public TLSCredentialsItf { -public: - MOCK_METHOD( - RetWithError>, GetMTLSClientCredentials, (const String&), (override)); - MOCK_METHOD(RetWithError>, GetTLSClientCredentials, (), (override)); - MOCK_METHOD( - Error, GetCert, (const String&, const Array&, const Array&, CertInfo&), (const, override)); - MOCK_METHOD(Error, SubscribeListener, (const String&, aos::iamclient::CertListenerItf&), (override)); - MOCK_METHOD(Error, UnsubscribeListener, (aos::iamclient::CertListenerItf&), (override)); -}; - -} // namespace aos::common::iamclient - -#endif diff --git a/src/mp/iamclient/tests/stubs/iamserver.hpp b/src/mp/iamclient/tests/stubs/iamserver.hpp index 55edc192..91729b0e 100644 --- a/src/mp/iamclient/tests/stubs/iamserver.hpp +++ b/src/mp/iamclient/tests/stubs/iamserver.hpp @@ -11,14 +11,14 @@ #include #include -#include +#include #include /** * Test IAM server. */ -class TestIAMServer final : public iamanager::v5::IAMPublicNodesService::Service { +class TestIAMServer final : public iamanager::v6::IAMPublicNodesService::Service { public: /** * Constructor. @@ -38,7 +38,7 @@ class TestIAMServer final : public iamanager::v5::IAMPublicNodesService::Service * @param message Message. * @return True if success. */ - bool SendIncomingMessage(const iamanager::v5::IAMIncomingMessages& message) { return mStream->Write(message); } + bool SendIncomingMessage(const iamanager::v6::IAMIncomingMessages& message) { return mStream->Write(message); } /** * Wait for connection. @@ -85,7 +85,7 @@ class TestIAMServer final : public iamanager::v5::IAMPublicNodesService::Service * * @return Outgoing message. */ - iamanager::v5::IAMOutgoingMessages GetOutgoingMessage() const { return mOutgoingMsg; } + iamanager::v6::IAMOutgoingMessages GetOutgoingMessage() const { return mOutgoingMsg; } private: constexpr static std::chrono::seconds kTimeout = std::chrono::seconds(5); @@ -94,18 +94,18 @@ class TestIAMServer final : public iamanager::v5::IAMPublicNodesService::Service { grpc::ServerBuilder builder; builder.AddListeningPort("localhost:8002", grpc::InsecureServerCredentials()); - builder.RegisterService(static_cast(this)); + builder.RegisterService(static_cast(this)); return builder.BuildAndStart(); } grpc::Status RegisterNode(grpc::ServerContext*, - grpc::ServerReaderWriter* stream) + grpc::ServerReaderWriter* stream) { try { mStream = stream; - iamanager::v5::IAMOutgoingMessages incomingMsg; + iamanager::v6::IAMOutgoingMessages incomingMsg; mConnected = true; mCV.notify_all(); @@ -126,9 +126,9 @@ class TestIAMServer final : public iamanager::v5::IAMPublicNodesService::Service std::unique_ptr mServer; std::string mCertType; aos::CertInfo mCertInfo; - iamanager::v5::IAMOutgoingMessages mOutgoingMsg; + iamanager::v6::IAMOutgoingMessages mOutgoingMsg; - grpc::ServerReaderWriter* mStream {}; + grpc::ServerReaderWriter* mStream {}; std::mutex mLock; std::condition_variable mCV; bool mConnected = false; From 61cdd9b68782d94ec2d89f8e540ab8112f93bbd7 Mon Sep 17 00:00:00 2001 From: Mykola Solianko Date: Tue, 10 Feb 2026 14:29:17 +0200 Subject: [PATCH 3/5] mp: config: rename logprovider to logging Signed-off-by: Mykola Solianko --- src/mp/communication/cmconnection.cpp | 2 +- .../tests/communicationsecure.cpp | 20 +++++++------- src/mp/config/config.cpp | 26 +++++++++---------- src/mp/config/config.hpp | 20 +++++++------- 4 files changed, 34 insertions(+), 34 deletions(-) diff --git a/src/mp/communication/cmconnection.cpp b/src/mp/communication/cmconnection.cpp index 45b419f6..ec8f0f6d 100644 --- a/src/mp/communication/cmconnection.cpp +++ b/src/mp/communication/cmconnection.cpp @@ -61,7 +61,7 @@ Error CMConnection::Init(const config::Config& cfg, HandlerItf& handler, Communi mHandler = &handler; - if (auto err = mArchiveManager.Init(*this, cfg.mLogProviderConfig); !err.IsNone()) { + if (auto err = mArchiveManager.Init(*this, cfg.mLogConfig); !err.IsNone()) { return err; } diff --git a/src/mp/communication/tests/communicationsecure.cpp b/src/mp/communication/tests/communicationsecure.cpp index b6392cc9..e4c171fa 100644 --- a/src/mp/communication/tests/communicationsecure.cpp +++ b/src/mp/communication/tests/communicationsecure.cpp @@ -113,16 +113,16 @@ class CommunicationSecureManagerTest : public ::testing::Test { std::filesystem::create_directories(mTmpDir); - mConfig.mIAMConfig.mOpenPort = 8081; - mConfig.mIAMConfig.mSecurePort = 8080; - mConfig.mVChan.mIAMCertStorage = "server"; - mConfig.mVChan.mSMCertStorage = "server"; - mConfig.mDownload.mDownloadDir = "download"; - mConfig.mImageStoreDir = "images"; - mConfig.mCMConfig.mOpenPort = 30001; - mConfig.mCMConfig.mSecurePort = 30002; - mConfig.mLogProviderConfig.mMaxPartSize = 1024; - mConfig.mLogProviderConfig.mMaxPartCount = 10; + mConfig.mIAMConfig.mOpenPort = 8081; + mConfig.mIAMConfig.mSecurePort = 8080; + mConfig.mVChan.mIAMCertStorage = "server"; + mConfig.mVChan.mSMCertStorage = "server"; + mConfig.mDownload.mDownloadDir = "download"; + mConfig.mImageStoreDir = "images"; + mConfig.mCMConfig.mOpenPort = 30001; + mConfig.mCMConfig.mSecurePort = 30002; + mConfig.mLogConfig.mMaxPartSize = 1024; + mConfig.mLogConfig.mMaxPartCount = 10; mConfig.mCACert = CERTIFICATES_MP_DIR "/ca.cer"; diff --git a/src/mp/config/config.cpp b/src/mp/config/config.cpp index eae78fca..4b5da7a4 100644 --- a/src/mp/config/config.cpp +++ b/src/mp/config/config.cpp @@ -6,8 +6,8 @@ #include -#include #include +#include #include "config.hpp" @@ -71,10 +71,10 @@ IAMConfig ParseIAMConfig(const common::utils::CaseInsensitiveObjectWrapper& obje }; } -aos::logprovider::Config ParseLogProviderConfig(const common::utils::CaseInsensitiveObjectWrapper& object) +aos::logging::Config ParseLogProviderConfig(const common::utils::CaseInsensitiveObjectWrapper& object) { if (!object.Has("LogProvider")) { - return aos::logprovider::Config { + return aos::logging::Config { cDefaultMaxLogPartSize, cDefaultMaxLogPartCount, }; @@ -82,7 +82,7 @@ aos::logprovider::Config ParseLogProviderConfig(const common::utils::CaseInsensi auto logProviderObject = object.GetObject("LogProvider"); - return aos::logprovider::Config { + return aos::logging::Config { logProviderObject.GetValue("MaxPartSize", cDefaultMaxLogPartSize), logProviderObject.GetValue("MaxPartCount", cDefaultMaxLogPartCount), }; @@ -123,15 +123,15 @@ RetWithError ParseConfig(const std::string& filename) try { common::utils::CaseInsensitiveObjectWrapper object(result.mValue.extract()); - config.mWorkingDir = object.GetValue("WorkingDir"); - config.mVChan = ParseVChanConfig(object.GetObject("VChan")); - config.mCMConfig = ParseCMConfig(object.GetObject("CMConfig")); - config.mCertStorage = object.GetValue("CertStorage"); - config.mCACert = object.GetValue("CACert"); - config.mImageStoreDir = object.GetValue("ImageStoreDir"); - config.mDownload = ParseDownloader(object.GetObject("Downloader")); - config.mIAMConfig = ParseIAMConfig(object.GetObject("IAMConfig")); - config.mLogProviderConfig = ParseLogProviderConfig(object); + config.mWorkingDir = object.GetValue("WorkingDir"); + config.mVChan = ParseVChanConfig(object.GetObject("VChan")); + config.mCMConfig = ParseCMConfig(object.GetObject("CMConfig")); + config.mCertStorage = object.GetValue("CertStorage"); + config.mCACert = object.GetValue("CACert"); + config.mImageStoreDir = object.GetValue("ImageStoreDir"); + config.mDownload = ParseDownloader(object.GetObject("Downloader")); + config.mIAMConfig = ParseIAMConfig(object.GetObject("IAMConfig")); + config.mLogConfig = ParseLogProviderConfig(object); } catch (const std::exception& e) { return {config, Error(ErrorEnum::eFailed, e.what())}; } diff --git a/src/mp/config/config.hpp b/src/mp/config/config.hpp index 6568f311..487098f6 100644 --- a/src/mp/config/config.hpp +++ b/src/mp/config/config.hpp @@ -9,7 +9,7 @@ #include -#include +#include #include #include @@ -66,15 +66,15 @@ struct CMConfig { * Configuration. */ struct Config { - std::string mWorkingDir; - VChanConfig mVChan; - CMConfig mCMConfig; - std::string mCertStorage; - std::string mCACert; - std::string mImageStoreDir; - Download mDownload; - IAMConfig mIAMConfig; - aos::logprovider::Config mLogProviderConfig; + std::string mWorkingDir; + VChanConfig mVChan; + CMConfig mCMConfig; + std::string mCertStorage; + std::string mCACert; + std::string mImageStoreDir; + Download mDownload; + IAMConfig mIAMConfig; + aos::logging::Config mLogConfig; }; /*********************************************************************************************************************** From 87251b5bb53c9e2c52b98aacb98e36f213cf46a4 Mon Sep 17 00:00:00 2001 From: Mykola Solianko Date: Tue, 10 Feb 2026 14:30:07 +0200 Subject: [PATCH 4/5] build.sh: enable MP build Signed-off-by: Mykola Solianko --- build.sh | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/build.sh b/build.sh index 66d4edb0..38920f0e 100755 --- a/build.sh +++ b/build.sh @@ -60,7 +60,7 @@ conan_setup() { build_project() { local with_cm="ON" local with_iam="ON" - local with_mp="OFF" + local with_mp="ON" local with_sm="ON" if [[ -n "$ARG_AOS_SERVICES" ]]; then From 6c566a1aecaa1652cf921cef70c06a94a1925ca8 Mon Sep 17 00:00:00 2001 From: Mykola Solianko Date: Tue, 10 Feb 2026 14:31:31 +0200 Subject: [PATCH 5/5] mp: temporarily disable uncompilable modules Signed-off-by: Mykola Solianko --- src/mp/CMakeLists.txt | 6 ------ 1 file changed, 6 deletions(-) diff --git a/src/mp/CMakeLists.txt b/src/mp/CMakeLists.txt index db16fab0..ccd5c572 100644 --- a/src/mp/CMakeLists.txt +++ b/src/mp/CMakeLists.txt @@ -52,11 +52,5 @@ endif() # Modules # ###################################################################################################################### -add_subdirectory(app) -add_subdirectory(cmclient) -add_subdirectory(communication) add_subdirectory(config) -add_subdirectory(filechunker) add_subdirectory(iamclient) -add_subdirectory(imageunpacker) -add_subdirectory(logprovider)