Skip to content

MessageInStream_8cpp

github-actions edited this page Mar 15, 2026 · 3 revisions

title: src/Messenger/MessageInStream.cpp


src/Messenger/MessageInStream.cpp

Namespaces

Name
aasdk
aasdk::messenger

Attributes

Name
bool enabled
bool videoOnly
int sampleEvery

Attributes Documentation

variable enabled

bool enabled {false};

variable videoOnly

bool videoOnly {true};

variable sampleEvery

int sampleEvery {1};

Source code

// This file is part of aasdk library project.
// Copyright (C) 2018 f1x.studio (Michal Szwaj)
// Copyright (C) 2024 CubeOne (Simon Dean - simon.dean@cubeone.co.uk)
// Copyright (C) 2026 OpenCarDev (Matthew Hilton - matthilton2005@gmail.com)
//
// aasdk is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// aasdk is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with aasdk. If not, see <http://www.gnu.org/licenses/>.

#include <aasdk/Messenger/MessageInStream.hpp>
#include <aasdk/Messenger/MessageId.hpp>
#include <aasdk/Error/Error.hpp>
#include <aasdk/Common/Log.hpp>
#include <aasdk/Common/ModernLogger.hpp>
#include <aap_protobuf/service/control/ControlMessageType.pb.h>
#include <algorithm>
#include <atomic>
#include <chrono>
#include <cstdlib>
#include <iostream>
#include <string>


namespace aasdk::messenger {

  namespace {

    struct MessageTraceConfig {
      bool enabled{false};
      bool videoOnly{true};
      int sampleEvery{1};
    };

    static auto parseEnvBool(const char* value, bool defaultValue) -> bool {
      if (value == nullptr) {
        return defaultValue;
      }

      const std::string token(value);
      if (token == "1" || token == "true" || token == "TRUE" || token == "on" ||
          token == "ON" || token == "yes" || token == "YES") {
        return true;
      }

      if (token == "0" || token == "false" || token == "FALSE" || token == "off" ||
          token == "OFF" || token == "no" || token == "NO") {
        return false;
      }

      return defaultValue;
    }

    static auto parseEnvInt(const char* value, int defaultValue, int minValue, int maxValue) -> int {
      if (value == nullptr) {
        return defaultValue;
      }

      try {
        const int parsed = std::stoi(value);
        return std::max(minValue, std::min(maxValue, parsed));
      } catch (...) {
        return defaultValue;
      }
    }

    static auto readMessageTraceConfig() -> MessageTraceConfig {
      MessageTraceConfig cfg;
      cfg.enabled = parseEnvBool(std::getenv("AASDK_TRACE_MESSAGE"), false);
      cfg.videoOnly = parseEnvBool(std::getenv("AASDK_TRACE_MESSAGE_VIDEO_ONLY"), true);
      cfg.sampleEvery = parseEnvInt(std::getenv("AASDK_TRACE_MESSAGE_SAMPLE_EVERY"), 1, 1, 1000);
      return cfg;
    }

    static auto getMessageTraceConfig() -> MessageTraceConfig {
      static MessageTraceConfig cached = readMessageTraceConfig();
      static auto lastRefresh = std::chrono::steady_clock::now();

      const auto now = std::chrono::steady_clock::now();
      if (now - lastRefresh > std::chrono::seconds(1)) {
        cached = readMessageTraceConfig();
        lastRefresh = now;
      }

      return cached;
    }

    static auto shouldTraceMessage(ChannelId channelId) -> bool {
      static std::atomic<uint64_t> counter{0};
      const MessageTraceConfig cfg = getMessageTraceConfig();
      if (!cfg.enabled) {
        return false;
      }

      if (cfg.videoOnly && channelId != ChannelId::MEDIA_SINK_VIDEO) {
        return false;
      }

      const uint64_t current = ++counter;
      return (current % static_cast<uint64_t>(cfg.sampleEvery)) == 0;
    }

  } // namespace

  MessageInStream::MessageInStream(boost::asio::io_service &ioService, transport::ITransport::Pointer transport,
                                   ICryptor::Pointer cryptor)
      : strand_(ioService), transport_(std::move(transport)), cryptor_(std::move(cryptor)) {

  }

  void MessageInStream::startReceive(ReceivePromise::Pointer promise) {
    AASDK_LOG_MESSENGER(debug, "startReceiveCalled()");
    strand_.dispatch([this, self = this->shared_from_this(), promise = std::move(promise)]() mutable {
      if (promise_ == nullptr) {
        promise_ = std::move(promise);
        auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
        transportPromise->then(
            [this, self = this->shared_from_this()](common::Data data) mutable {
              this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
            },
            [this, self = this->shared_from_this()](const error::Error &e) mutable {
              AASDK_LOG_MESSENGER(debug, "Rejecting message.");
              promise_->reject(e);
              promise_.reset();
            });

        transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise));
      } else {
        promise_.reset();
        AASDK_LOG_MESSENGER(debug, "Already Handling Promise");
        promise->reject(error::Error(error::ErrorCode::OPERATION_IN_PROGRESS));
      }
    });
  }

  void MessageInStream::receiveFrameHeaderHandler(const common::DataConstBuffer &buffer) {
    FrameHeader frameHeader(buffer);

    AASDK_LOG(debug) << "[MessageInStream] Processing Frame Header: Ch "
                     << channelIdToString(frameHeader.getChannelId()) << " Fr "
                     << frameTypeToString(frameHeader.getType())
                     << " Enc " << (frameHeader.getEncryptionType() == EncryptionType::ENCRYPTED ? "ENCRYPTED" : "PLAIN")
                     << " Msg " << (frameHeader.getMessageType() == MessageType::CONTROL ? "CONTROL" : "SPECIFIC")
                     << " Raw[0]=0x" << std::hex << static_cast<int>(buffer.cdata[0])
                     << " Raw[1]=0x" << static_cast<int>(buffer.cdata[1])
                     << std::dec;

    isValidFrame_ = true;

    auto bufferedMessage = messageBuffer_.find(frameHeader.getChannelId());
    if (bufferedMessage != messageBuffer_.end()) {
      // We have found a message...
      message_ = std::move(bufferedMessage->second);
      messageBuffer_.erase(bufferedMessage);

      AASDK_LOG_MESSENGER(debug, "Found existing message.");

      if (frameHeader.getType() == FrameType::FIRST || frameHeader.getType() == FrameType::BULK) {
        // If it's first or bulk, we need to override the message anyhow, so we will start again.
        // Need to start a new message anyhow
        message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(),
                                             frameHeader.getMessageType());
      }
    } else {
      AASDK_LOG_MESSENGER(debug, "Could not find existing message.");
      // No Message Found in Buffers and this is a middle or last frame, this an error.
      // Still need to process the frame, but we will not resolve at the end.
      message_ = std::make_shared<Message>(frameHeader.getChannelId(), frameHeader.getEncryptionType(),
                                           frameHeader.getMessageType());
      if (frameHeader.getType() == FrameType::MIDDLE || frameHeader.getType() == FrameType::LAST) {
        // This is an error
        isValidFrame_ = false;
      }
    }

    thisFrameType_ = frameHeader.getType();
    const size_t frameSize = FrameSize::getSizeOf(
        frameHeader.getType() == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT);

    auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
    transportPromise->then(
        [this, self = this->shared_from_this()](common::Data data) mutable {
          this->receiveFrameSizeHandler(common::DataConstBuffer(data));
        },
        [this, self = this->shared_from_this()](const error::Error &e) mutable {
          AASDK_LOG_MESSENGER(debug, "Rejecting message.");
          message_.reset();
          promise_->reject(e);
          promise_.reset();
        });

    transport_->receive(frameSize, std::move(transportPromise));
  }

  void MessageInStream::receiveFrameSizeHandler(const common::DataConstBuffer &buffer) {
    auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
    transportPromise->then(
        [this, self = this->shared_from_this()](common::Data data) mutable {
          this->receiveFramePayloadHandler(common::DataConstBuffer(data));
        },
        [this, self = this->shared_from_this()](const error::Error &e) mutable {
          AASDK_LOG_MESSENGER(debug, "Rejecting message.");
          message_.reset();
          promise_->reject(e);
          promise_.reset();
        });

    FrameSize frameSize(buffer);
    frameSize_ = (int) frameSize.getFrameSize();
    AASDK_LOG(debug) << "[MessageInStream] Frame size parsed: frameSize=" << frameSize.getFrameSize()
                     << " totalSize=" << frameSize.getTotalSize();
    transport_->receive(frameSize.getFrameSize(), std::move(transportPromise));
  }

  void MessageInStream::receiveFramePayloadHandler(const common::DataConstBuffer &buffer) {
    const ChannelId channelId = message_->getChannelId();
    const bool traceMessage = shouldTraceMessage(channelId);
    const size_t payloadSizeBefore = message_->getPayload().size();

    AASDK_LOG(debug) << "[MessageInStream] Payload handler: ch=" << channelIdToString(message_->getChannelId())
                     << " enc=" << (message_->getEncryptionType() == EncryptionType::ENCRYPTED ? "ENCRYPTED" : "PLAIN")
                     << " msg=" << (message_->getType() == MessageType::CONTROL ? "CONTROL" : "SPECIFIC")
                     << " frameType=" << frameTypeToString(thisFrameType_)
                     << " frameSize=" << frameSize_
                     << " payloadBytes=" << buffer.size
                     << " cryptorActive=" << (cryptor_->isActive() ? "true" : "false");

    if (message_->getEncryptionType() == EncryptionType::ENCRYPTED) {
      if (!cryptor_->isActive()) {
        // Some devices deliver raw TLS records on control before cryptor activation.
        // Only synthesize ENCAPSULATED_SSL for TLS-looking records to avoid
        // misclassifying regular control payloads (e.g. version responses).
        const bool looksLikeTlsRecord =
            (buffer.size >= 2) &&
            (buffer.cdata[0] >= 0x14 && buffer.cdata[0] <= 0x17) &&
            (buffer.cdata[1] == 0x03);

        if (message_->getChannelId() == ChannelId::CONTROL && looksLikeTlsRecord) {
          message_->insertPayload(messenger::MessageId(
              aap_protobuf::service::control::message::ControlMessageType::MESSAGE_ENCAPSULATED_SSL).getData());
        }

        message_->insertPayload(buffer);
        if (traceMessage) {
          AASDK_LOG(debug) << "[MessageTrace] encrypted-pass-through"
                          << " ch=" << channelIdToString(channelId)
                          << " payloadBytes=" << buffer.size
                          << " payloadSizeAfter=" << message_->getPayload().size();
        }
      } else {
        try {
          const size_t decryptedBytes = cryptor_->decrypt(message_->getPayload(), buffer, frameSize_);
          if (traceMessage) {
            AASDK_LOG(debug) << "[MessageTrace] decrypt"
                            << " ch=" << channelIdToString(channelId)
                            << " frameSize=" << frameSize_
                            << " encryptedBytes=" << buffer.size
                            << " decryptedBytes=" << decryptedBytes
                            << " payloadSizeBefore=" << payloadSizeBefore
                            << " payloadSizeAfter=" << message_->getPayload().size();
          }
        }
        catch (const error::Error &e) {
          AASDK_LOG_MESSENGER(debug, "Rejecting message.");
          message_.reset();
          promise_->reject(e);
          promise_.reset();
          return;
        }
      }
    } else {
      message_->insertPayload(buffer);
    }

    bool isResolved = false;

    // If this is the LAST frame or a BULK frame...
    if ((thisFrameType_ == FrameType::BULK || thisFrameType_ == FrameType::LAST) && isValidFrame_) {
      AASDK_LOG_MESSENGER(debug, "Resolving message.");
      if (traceMessage) {
        AASDK_LOG(debug) << "[MessageTrace] resolve"
                        << " ch=" << channelIdToString(channelId)
                        << " frameType=" << frameTypeToString(thisFrameType_)
                        << " totalPayloadBytes=" << message_->getPayload().size();
      }
      promise_->resolve(std::move(message_));
      promise_.reset();
      isResolved = true;

    } else {
      // First or Middle message, we'll store in our buffer...
      messageBuffer_[message_->getChannelId()] = std::move(message_);
    }

    // If the main promise isn't resolved, then carry on retrieving frame headers.
    if (!isResolved) {
      auto transportPromise = transport::ITransport::ReceivePromise::defer(strand_);
      transportPromise->then(
          [this, self = this->shared_from_this()](common::Data data) mutable {
            this->receiveFrameHeaderHandler(common::DataConstBuffer(data));
          },
          [this, self = this->shared_from_this()](const error::Error &e) mutable {
            message_.reset();
            AASDK_LOG_MESSENGER(debug, "Rejecting message.");
            promise_->reject(e);
            promise_.reset();
          });

      transport_->receive(FrameHeader::getSizeOf(), std::move(transportPromise));
    }
  }

}

Updated on 2026-03-15 at 09:02:41 +0000

Clone this wiki locally