Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion build.sh
Original file line number Diff line number Diff line change
Expand Up @@ -60,7 +60,7 @@ conan_setup() {
build_project() {
local with_cm="ON"
local with_iam="ON"
local with_mp="OFF"
local with_mp="ON"
local with_sm="ON"

if [[ -n "$ARG_AOS_SERVICES" ]]; then
Expand Down
4 changes: 4 additions & 0 deletions src/common/iamclient/publicnodeservice.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -75,6 +75,10 @@ Error PublicNodesService::Reconnect()
mSubscriptionManager->Reconnect(mStub.get());
}

if (mRegisterNodeCtx) {
mRegisterNodeCtx->TryCancel();
}

return ErrorEnum::eNone;
}

Expand Down
6 changes: 0 additions & 6 deletions src/mp/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -52,11 +52,5 @@ endif()
# Modules
# ######################################################################################################################

add_subdirectory(app)
add_subdirectory(cmclient)
add_subdirectory(communication)
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

probably better leave removed.

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done

add_subdirectory(config)
add_subdirectory(filechunker)
add_subdirectory(iamclient)
add_subdirectory(imageunpacker)
add_subdirectory(logprovider)
2 changes: 1 addition & 1 deletion src/mp/communication/cmconnection.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -61,7 +61,7 @@ Error CMConnection::Init(const config::Config& cfg, HandlerItf& handler, Communi

mHandler = &handler;

if (auto err = mArchiveManager.Init(*this, cfg.mLogProviderConfig); !err.IsNone()) {
if (auto err = mArchiveManager.Init(*this, cfg.mLogConfig); !err.IsNone()) {
return err;
}

Expand Down
20 changes: 10 additions & 10 deletions src/mp/communication/tests/communicationsecure.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -113,16 +113,16 @@ class CommunicationSecureManagerTest : public ::testing::Test {

std::filesystem::create_directories(mTmpDir);

mConfig.mIAMConfig.mOpenPort = 8081;
mConfig.mIAMConfig.mSecurePort = 8080;
mConfig.mVChan.mIAMCertStorage = "server";
mConfig.mVChan.mSMCertStorage = "server";
mConfig.mDownload.mDownloadDir = "download";
mConfig.mImageStoreDir = "images";
mConfig.mCMConfig.mOpenPort = 30001;
mConfig.mCMConfig.mSecurePort = 30002;
mConfig.mLogProviderConfig.mMaxPartSize = 1024;
mConfig.mLogProviderConfig.mMaxPartCount = 10;
mConfig.mIAMConfig.mOpenPort = 8081;
mConfig.mIAMConfig.mSecurePort = 8080;
mConfig.mVChan.mIAMCertStorage = "server";
mConfig.mVChan.mSMCertStorage = "server";
mConfig.mDownload.mDownloadDir = "download";
mConfig.mImageStoreDir = "images";
mConfig.mCMConfig.mOpenPort = 30001;
mConfig.mCMConfig.mSecurePort = 30002;
mConfig.mLogConfig.mMaxPartSize = 1024;
mConfig.mLogConfig.mMaxPartCount = 10;

mConfig.mCACert = CERTIFICATES_MP_DIR "/ca.cer";

Expand Down
26 changes: 13 additions & 13 deletions src/mp/config/config.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -6,8 +6,8 @@

#include <fstream>

#include <common/logger/logmodule.hpp>
#include <common/utils/json.hpp>
#include <core/common/tools/logger.hpp>

#include "config.hpp"

Expand Down Expand Up @@ -71,18 +71,18 @@ IAMConfig ParseIAMConfig(const common::utils::CaseInsensitiveObjectWrapper& obje
};
}

aos::logprovider::Config ParseLogProviderConfig(const common::utils::CaseInsensitiveObjectWrapper& object)
aos::logging::Config ParseLogProviderConfig(const common::utils::CaseInsensitiveObjectWrapper& object)
{
if (!object.Has("LogProvider")) {
return aos::logprovider::Config {
return aos::logging::Config {
cDefaultMaxLogPartSize,
cDefaultMaxLogPartCount,
};
}

auto logProviderObject = object.GetObject("LogProvider");

return aos::logprovider::Config {
return aos::logging::Config {
logProviderObject.GetValue<uint64_t>("MaxPartSize", cDefaultMaxLogPartSize),
logProviderObject.GetValue<uint64_t>("MaxPartCount", cDefaultMaxLogPartCount),
};
Expand Down Expand Up @@ -123,15 +123,15 @@ RetWithError<Config> ParseConfig(const std::string& filename)
try {
common::utils::CaseInsensitiveObjectWrapper object(result.mValue.extract<Poco::JSON::Object::Ptr>());

config.mWorkingDir = object.GetValue<std::string>("WorkingDir");
config.mVChan = ParseVChanConfig(object.GetObject("VChan"));
config.mCMConfig = ParseCMConfig(object.GetObject("CMConfig"));
config.mCertStorage = object.GetValue<std::string>("CertStorage");
config.mCACert = object.GetValue<std::string>("CACert");
config.mImageStoreDir = object.GetValue<std::string>("ImageStoreDir");
config.mDownload = ParseDownloader(object.GetObject("Downloader"));
config.mIAMConfig = ParseIAMConfig(object.GetObject("IAMConfig"));
config.mLogProviderConfig = ParseLogProviderConfig(object);
config.mWorkingDir = object.GetValue<std::string>("WorkingDir");
config.mVChan = ParseVChanConfig(object.GetObject("VChan"));
config.mCMConfig = ParseCMConfig(object.GetObject("CMConfig"));
config.mCertStorage = object.GetValue<std::string>("CertStorage");
config.mCACert = object.GetValue<std::string>("CACert");
config.mImageStoreDir = object.GetValue<std::string>("ImageStoreDir");
config.mDownload = ParseDownloader(object.GetObject("Downloader"));
config.mIAMConfig = ParseIAMConfig(object.GetObject("IAMConfig"));
config.mLogConfig = ParseLogProviderConfig(object);
} catch (const std::exception& e) {
return {config, Error(ErrorEnum::eFailed, e.what())};
}
Expand Down
20 changes: 10 additions & 10 deletions src/mp/config/config.hpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,7 +9,7 @@

#include <string>

#include <core/common/logprovider/config.hpp>
#include <core/common/logging/config.hpp>
#include <core/common/tools/error.hpp>

#include <common/utils/time.hpp>
Expand Down Expand Up @@ -66,15 +66,15 @@ struct CMConfig {
* Configuration.
*/
struct Config {
std::string mWorkingDir;
VChanConfig mVChan;
CMConfig mCMConfig;
std::string mCertStorage;
std::string mCACert;
std::string mImageStoreDir;
Download mDownload;
IAMConfig mIAMConfig;
aos::logprovider::Config mLogProviderConfig;
std::string mWorkingDir;
VChanConfig mVChan;
CMConfig mCMConfig;
std::string mCertStorage;
std::string mCACert;
std::string mImageStoreDir;
Download mDownload;
IAMConfig mIAMConfig;
aos::logging::Config mLogConfig;
};

/***********************************************************************************************************************
Expand Down
4 changes: 2 additions & 2 deletions src/mp/iamclient/CMakeLists.txt
Original file line number Diff line number Diff line change
Expand Up @@ -14,13 +14,13 @@ set(TARGET_NAME iamclient)
# Sources
# ######################################################################################################################

set(SOURCES publicnodeclient.cpp)
set(SOURCES iamclient.cpp)

# ######################################################################################################################
# Libraries
# ######################################################################################################################

set(LIBRARIES aos::mp::communication aos::common::logger aos::common::iamclient)
set(LIBRARIES aos::common::utils aos::common::iamclient)

# ######################################################################################################################
# Target
Expand Down
190 changes: 190 additions & 0 deletions src/mp/iamclient/iamclient.cpp
Original file line number Diff line number Diff line change
@@ -0,0 +1,190 @@
/*
* Copyright (C) 2025 EPAM Systems, Inc.
*
* SPDX-License-Identifier: Apache-2.0
*/

#include <core/common/tools/logger.hpp>

#include "iamclient.hpp"

namespace aos::mp::iamclient {

/***********************************************************************************************************************
* Public
**********************************************************************************************************************/

Error IAMClient::Init(const config::IAMConfig& cfg, aos::iamclient::CertProviderItf& certProvider,
common::iamclient::TLSCredentialsItf& tlsCredentials, bool provisioningMode)
{
mCertProvider = &certProvider;
mCertStorage = cfg.mCertStorage;

const bool publicServer = mCertStorage.empty();

LOG_DBG() << "Init IAM client: publicServer=" << publicServer << ", provisioningMode=" << provisioningMode;

return PublicNodesService::Init(publicServer ? cfg.mIAMMainPublicServerURL : cfg.mIAMMainProtectedServerURL,
tlsCredentials, provisioningMode, publicServer, mCertStorage);
}

Error IAMClient::Start()
{
std::lock_guard lock {mMutex};

LOG_DBG() << "Start IAM client";
Copy link
Collaborator

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

lock guard?

Copy link
Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Done


if (!mCertStorage.empty()) {
if (auto err = mCertProvider->SubscribeListener(String(mCertStorage.c_str()), *this); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
}
}

mStarted = true;

mOutgoingMsgThread = std::thread(&IAMClient::ProcessOutgoingMessages, this);

return PublicNodesService::Start();
}

void IAMClient::Stop()
{
LOG_DBG() << "Stop IAM client";

{
std::lock_guard lock {mMutex};

if (!mStarted) {
return;
}

mStarted = false;

mOutgoingMsgChannel.Close();
mIncomingMsgChannel.Close();
}

mCV.notify_all();

PublicNodesService::Stop();

if (mOutgoingMsgThread.joinable()) {
mOutgoingMsgThread.join();
}

if (!mCertStorage.empty()) {
mCertProvider->UnsubscribeListener(*this);
}
}

Error IAMClient::SendMessages(std::vector<uint8_t> messages)
{
LOG_DBG() << "Send message";

return mOutgoingMsgChannel.Send(std::move(messages));
}

RetWithError<std::vector<uint8_t>> IAMClient::ReceiveMessages()
{
LOG_DBG() << "Receive message";

return mIncomingMsgChannel.Receive();
}

/***********************************************************************************************************************
* Protected
**********************************************************************************************************************/

Error IAMClient::ReceiveMessage(const iamanager::v6::IAMIncomingMessages& msg)
{
std::vector<uint8_t> message(msg.ByteSizeLong());

LOG_DBG() << "Received message" << Log::Field("msg", msg.DebugString());

if (!msg.SerializeToArray(message.data(), message.size())) {
return Error(ErrorEnum::eRuntime, "failed to serialize message");
}

if (auto err = mIncomingMsgChannel.Send(std::move(message)); !err.IsNone()) {
return AOS_ERROR_WRAP(err);
}

return ErrorEnum::eNone;
}

void IAMClient::OnConnected()
{
LOG_DBG() << "IAM client connected";

std::lock_guard lock {mMutex};

mConnected = true;
mCV.notify_all();
}

void IAMClient::OnDisconnected()
{
LOG_DBG() << "IAM client disconnected";

std::lock_guard lock {mMutex};

mConnected = false;
}

/***********************************************************************************************************************
* Private
**********************************************************************************************************************/

void IAMClient::OnCertChanged([[maybe_unused]] const CertInfo& info)
{
LOG_INF() << "Certificate changed, reconnecting";

if (auto err = Reconnect(); !err.IsNone()) {
LOG_ERR() << "Failed to reconnect" << Log::Field(err);
}
}

void IAMClient::ProcessOutgoingMessages()
{
LOG_DBG() << "Processing outgoing messages";

while (mStarted) {
auto [msg, err] = mOutgoingMsgChannel.Receive();
if (!err.IsNone()) {
LOG_ERR() << "Failed to receive message" << Log::Field(err);

return;
}

{
std::unique_lock lock {mMutex};

LOG_DBG() << "Received message from channel";

mCV.wait(lock, [this] { return mConnected || !mStarted; });

if (!mStarted) {
return;
}

iamanager::v6::IAMOutgoingMessages outgoingMsg;
if (!outgoingMsg.ParseFromArray(msg.data(), static_cast<int>(msg.size()))) {
LOG_ERR() << "Failed to parse outgoing message";

continue;
}

LOG_DBG() << "Sending message: msg=" << outgoingMsg.DebugString().c_str();

if (auto sendErr = SendMessage(outgoingMsg); !sendErr.IsNone()) {
LOG_ERR() << "Failed to send message" << Log::Field(sendErr);

mMessageCache.push(outgoingMsg);

continue;
}
}
}
}

} // namespace aos::mp::iamclient
Loading
Loading