-
Notifications
You must be signed in to change notification settings - Fork 47
Transport_8cpp
title: src/Transport/Transport.cpp summary: Base transport layer for frame-level communication with Android device.
Base transport layer for frame-level communication with Android device. More...
| Name |
|---|
| aasdk |
| aasdk::transport |
Base transport layer for frame-level communication with Android device.
This implementation provides the low-level frame receive/send buffering:
- Manages receive buffer and distributes available data to waiting requests
- Queues send operations for transmission by subclass implementations
- Handles strand-based serialisation for thread-safe buffer access
- Provides size-based flow control (wait until N bytes available)
Subclasses (USBTransport, TCPTransport) implement enqueueReceive/enqueueSend to handle physical transmission (USB transfers, TCP sockets).
Design: CircularBuffer holds received frames; receive() waits for N bytes before resolving. Multiple receive operations can be pending with different size requirements (e.g., one waiting for 4-byte header, another for 1024-byte payload). sendQueue_ ensures sends complete before starting next one.
Typical scenario (frame reception):
- Messenger calls receive(4) to read frame header
- Queued on receiveStrand_; check buffer - needs more data
- Call enqueueReceive(1500) to read from USB/TCP
- 1500 bytes arrive in receivedDataSink_
- receiveHandler() distributes: 4 bytes to header request, 1496 to next
- Header promise resolves; Messenger parses size field
- Messenger calls receive(size) for payload
- Payload data already in buffer; resolve immediately
// This file is part of aasdk library project.
// Copyright (C) 2018 f1x.studio (Michal Szwaj)
// Copyright (C) 2024 CubeOne (Simon Dean - simon.dean@cubeone.co.uk)
// Copyright (C) 2026 OpenCarDev (Matthew Hilton - matthilton2005@gmail.com)
//
// aasdk is free software: you can redistribute it and/or modify
// it under the terms of the GNU General Public License as published by
// the Free Software Foundation; either version 3 of the License, or
// (at your option) any later version.
//
// aasdk is distributed in the hope that it will be useful,
// but WITHOUT ANY WARRANTY; without even the implied warranty of
// MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
// GNU General Public License for more details.
//
// You should have received a copy of the GNU General Public License
// along with aasdk. If not, see <http://www.gnu.org/licenses/>.
#include <aasdk/Common/Log.hpp>
#include <aasdk/Common/ModernLogger.hpp>
#include <aasdk/Transport/Transport.hpp>
namespace aasdk {
namespace transport {
Transport::Transport(boost::asio::io_service &ioService)
: receiveStrand_(ioService), sendStrand_(ioService) {}
void Transport::receive(size_t size, ReceivePromise::Pointer promise) {
AASDK_LOG_TRANSPORT(debug, "receive()");
receiveStrand_.dispatch([this, self = this->shared_from_this(), size, promise = std::move(promise)]() mutable {
receiveQueue_.emplace_back(size, std::move(promise));
if (receiveQueue_.size() == 1) {
try {
AASDK_LOG_TRANSPORT(debug, "Distribute received data.");
this->distributeReceivedData();
}
catch (const error::Error &e) {
// Due to the design of the messaging system, we don't really need to raise an error - debug it is
AASDK_LOG_TRANSPORT(debug, "Reject receive promise.");
this->rejectReceivePromises(e);
}
}
});
}
void Transport::receiveHandler(size_t bytesTransferred) {
try {
AASDK_LOG_TRANSPORT(debug, "receiveHandler()");
receivedDataSink_.commit(bytesTransferred);
this->distributeReceivedData();
}
catch (const error::Error &e) {
// Due to the design of the messaging system, we don't really need to raise an error - debug it is
AASDK_LOG_TRANSPORT(debug, "Rejecting promise.");
this->rejectReceivePromises(e);
}
}
void Transport::distributeReceivedData() {
AASDK_LOG_TRANSPORT(debug, "distributeReceivedData()");
for (auto queueElement = receiveQueue_.begin(); queueElement != receiveQueue_.end();) {
if (receivedDataSink_.getAvailableSize() < queueElement->first) {
AASDK_LOG_TRANSPORT(debug, "Receiving from buffer.");
auto buffer = receivedDataSink_.fill();
this->enqueueReceive(std::move(buffer));
break;
} else {
auto data(receivedDataSink_.consume(queueElement->first));
AASDK_LOG_TRANSPORT(debug, "Resolve and clear message.");
queueElement->second->resolve(std::move(data));
queueElement = receiveQueue_.erase(queueElement);
}
}
}
void Transport::rejectReceivePromises(const error::Error &e) {
for (auto &queueElement: receiveQueue_) {
queueElement.second->reject(e);
}
receiveQueue_.clear();
}
void Transport::send(common::Data data, SendPromise::Pointer promise) {
sendStrand_.dispatch(
[this, self = this->shared_from_this(), data = std::move(data), promise = std::move(promise)]() mutable {
sendQueue_.emplace_back(std::move(data), std::move(promise));
if (sendQueue_.size() == 1) {
this->enqueueSend(sendQueue_.begin());
}
});
}
}
}Updated on 2026-03-15 at 09:02:41 +0000