Skip to content

MessageOutStream_8cpp

github-actions edited this page Mar 15, 2026 · 3 revisions

title: src/Messenger/MessageOutStream.cpp


src/Messenger/MessageOutStream.cpp

Namespaces

Name
aasdk
aasdk::messenger

Source code

// This file is part of aasdk library project.
// Copyright (C) 2018 f1x.studio (Michal Szwaj)
// Copyright (C) 2024 CubeOne (Simon Dean - simon.dean@cubeone.co.uk)
// Copyright (C) 2026 OpenCarDev (Matthew Hilton - matthilton2005@gmail.com)
//
// aasdk is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// aasdk is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with aasdk. If not, see <http://www.gnu.org/licenses/>.

#include <boost/endian/conversion.hpp>
#include <aasdk/IO/PromiseLink.hpp>
#include <aasdk/Messenger/MessageOutStream.hpp>


namespace aasdk {
  namespace messenger {

    MessageOutStream::MessageOutStream(boost::asio::io_service &ioService, transport::ITransport::Pointer transport,
                                       ICryptor::Pointer cryptor)
        : strand_(ioService), transport_(std::move(transport)), cryptor_(std::move(cryptor)), offset_(0),
          remainingSize_(0) {

    }

    void MessageOutStream::stream(Message::Pointer message, SendPromise::Pointer promise) {
      strand_.dispatch([this, self = this->shared_from_this(), message = std::move(message), promise = std::move(
          promise)]() mutable {
        if (promise_ != nullptr) {
          promise->reject(error::Error(error::ErrorCode::OPERATION_IN_PROGRESS));
          return;
        }

        message_ = std::move(message);
        promise_ = std::move(promise);

        if (message_->getPayload().size() >= cMaxFramePayloadSize) {
          offset_ = 0;
          remainingSize_ = message_->getPayload().size();
          this->streamSplittedMessage();
        } else {
          try {
            auto data(this->compoundFrame(FrameType::BULK, common::DataConstBuffer(message_->getPayload())));

            auto transportPromise = transport::ITransport::SendPromise::defer(strand_);
            io::PromiseLink<>::forward(*transportPromise, std::move(promise_));
            transport_->send(std::move(data), std::move(transportPromise));
          }
          catch (const error::Error &e) {
            promise_->reject(e);
            promise_.reset();
          }

          this->reset();
        }
      });
    }

    void MessageOutStream::streamSplittedMessage() {
      try {
        const auto &payload = message_->getPayload();
        auto ptr = &payload[offset_];
        auto size = remainingSize_ < cMaxFramePayloadSize ? remainingSize_ : cMaxFramePayloadSize;

        FrameType frameType =
            offset_ == 0 ? FrameType::FIRST : (remainingSize_ - size > 0 ? FrameType::MIDDLE : FrameType::LAST);
        auto data(this->compoundFrame(frameType, common::DataConstBuffer(ptr, size)));

        auto transportPromise = transport::ITransport::SendPromise::defer(strand_);

        if (frameType == FrameType::LAST) {
          this->reset();
          io::PromiseLink<>::forward(*transportPromise, std::move(promise_));
        } else {
          transportPromise->then([this, self = this->shared_from_this(), size]() mutable {
                                   offset_ += size;
                                   remainingSize_ -= size;
                                   this->streamSplittedMessage();
                                 },
                                 [this, self = this->shared_from_this()](const error::Error &e) mutable {
                                   this->reset();
                                   promise_->reject(e);
                                   promise_.reset();
                                 });
        }

        transport_->send(std::move(data), std::move(transportPromise));
      }
      catch (const error::Error &e) {
        this->reset();
        promise_->reject(e);
        promise_.reset();
      }
    }

    common::Data MessageOutStream::compoundFrame(FrameType frameType, const common::DataConstBuffer &payloadBuffer) {
      const FrameHeader frameHeader(message_->getChannelId(), frameType, message_->getEncryptionType(),
                                    message_->getType());
      common::Data data(frameHeader.getData());
      data.resize(data.size() +
                  FrameSize::getSizeOf(frameType == FrameType::FIRST ? FrameSizeType::EXTENDED : FrameSizeType::SHORT));
      size_t payloadSize = 0;

      if (message_->getEncryptionType() == EncryptionType::ENCRYPTED) {
        payloadSize = cryptor_->encrypt(data, payloadBuffer);
      } else {
        data.insert(data.end(), payloadBuffer.cdata, payloadBuffer.cdata + payloadBuffer.size);
        payloadSize = payloadBuffer.size;
      }

      this->setFrameSize(data, frameType, payloadSize, message_->getPayload().size());
      return data;
    }

    void MessageOutStream::setFrameSize(common::Data &data, FrameType frameType, size_t payloadSize, size_t totalSize) {
      const auto &frameSize =
          frameType == FrameType::FIRST ? FrameSize(payloadSize, totalSize) : FrameSize(payloadSize);
      const auto &frameSizeData = frameSize.getData();
      memcpy(&data[FrameHeader::getSizeOf()], &frameSizeData[0], frameSizeData.size());
    }

    void MessageOutStream::reset() {
      offset_ = 0;
      remainingSize_ = 0;
      message_.reset();
    }

  }
}

Updated on 2026-03-15 at 09:02:41 +0000

Clone this wiki locally