Skip to content

Add generic MqttClient trait with multi-backend support#83

Open
MathiasKoch wants to merge 12 commits intofeature/kvstore-shadow-persistencefrom
feature/mqtt-trait
Open

Add generic MqttClient trait with multi-backend support#83
MathiasKoch wants to merge 12 commits intofeature/kvstore-shadow-persistencefrom
feature/mqtt-trait

Conversation

@MathiasKoch
Copy link
Member

Summary

Introduces a generic MqttClient trait that decouples all rustot services from concrete MQTT implementations. Four backend implementations are provided: embedded-mqtt (no_std/embassy), rumqttc (std/tokio), AWS Greengrass IPC, and a mock client for testing. All services — shadows, OTA, provisioning, defender metrics, and jobs — are migrated to accept any MqttClient implementation via generics.

This enables the same application code to run across bare-metal embedded devices, Linux gateways, and Greengrass components by swapping the MQTT backend at compile time.

Note: This is PR 2 of 4 in a stacked series. Depends on #82 (KV-based shadow persistence).

Design

MqttClient Trait (src/mqtt/mod.rs)

pub trait MqttClient {
    type Subscription<'m, const N: usize>: MqttSubscription where Self: 'm;
    type Error: Debug;

    fn client_id(&self) -> &str;
    async fn publish<P: ToPayload>(&self, topic: &str, payload: P) -> Result<(), Self::Error>;
    async fn subscribe<const N: usize>(&self, topics: &[(&str, QoS); N]) -> Result<Self::Subscription<'_, N>, Self::Error>;
}

Key design decisions:

  • &self methods — Interior mutability allows sharing across async tasks without &mut exclusivity
  • Const generic topic countsubscribe<const N: usize> enables zero-cost static dispatch
  • ToPayload / DeferredPayload — Lazy serialization directly into the transmit buffer, avoiding intermediate allocations
  • Associated error type — Each backend surfaces its native error type

Backend Implementations

Backend Feature flag Environment File
embedded-mqtt embedded_mqtt no_std / embassy src/mqtt/embedded.rs
rumqttc rumqttc std / tokio src/mqtt/rumqttc.rs
Greengrass IPC greengrass std / tokio src/mqtt/greengrass.rs
Mock (always available) testing src/mqtt/mock.rs
  • rumqttc: Spawns background EventLoop poller, routes messages to subscriptions via mpsc channels with MQTT wildcard matching (+/#)
  • Greengrass: Merges per-topic native subscriptions via futures::stream::SelectAll
  • embedded-mqtt: Zero-copy, direct trait mapping with PayloadBridge adapter

Service Migration

All services now accept C: MqttClient as a generic parameter instead of depending on a specific MQTT implementation. The Mqtt<C> newtype wrapper provides service-specific trait implementations.

Changelog

  • Add MqttClient, MqttSubscription, MqttMessage, and ToPayload traits in src/mqtt/
  • Add embedded-mqtt, rumqttc, greengrass, and mock backend implementations
  • Migrate shadow cloud service to generic MqttClient
  • Migrate OTA control/data interfaces to generic MqttClient
  • Migrate provisioning service to generic MqttClient
  • Migrate defender metrics service to generic MqttClient
  • Replace ShadowTestOnly with MockMqttClient in shadow tests
  • Fix Greengrass multi-topic subscription using SelectAll
  • Loosen 'static requirement on topic string references

MathiasKoch and others added 12 commits February 2, 2026 20:52
…ss impls

Introduces MqttClient, MqttSubscription, and MqttMessage traits that
abstract over different MQTT backends without lifetime parameters on the
traits themselves. GATs with 'm lifetime tie subscriptions to client
borrows, resolving the previous lifetime mismatch with embedded-mqtt.

Also bumps nightly toolchain to 2025-12-01 for zmij compatibility.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace direct embedded_mqtt::MqttClient usage with crate::mqtt::MqttClient
trait bounds via Mqtt<C> newtype wrapper. Implement ToPayload directly on
jobs structs (Describe, Update, GetPending, StartNext) to enable zero-copy
serialization through generic publish methods, and use MqttTransfer<S>
wrapper to avoid HRTB issues with BlockTransfer.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Replace direct embedded_mqtt usage in the shadows module with the
MqttClient trait abstraction, following the same pattern as the OTA
migration. Also fixes broken Error::MqttError references in
defender_metrics caused by the error enum change.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
- Add mock MQTT implementation for testing (src/mqtt/mock.rs)
- Update shadow tests to use real Shadow struct with MockMqttClient
- Remove ShadowTestOnly struct (~60 lines)

Tests now use production code, enabling future unit tests for
cloud methods (wait_delta, update, etc.).

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
Update integration test to use inferred type parameter instead of
NoopRawMutex, and remove duplicate unused test fixture structs left
over from rebase.

Co-Authored-By: Claude Opus 4.5 <noreply@anthropic.com>
subscribe() was only using the first topic and silently dropping the
rest. Callers like shadows and provisioning subscribe to accepted/rejected
pairs and expect messages from both. Create N individual StreamOperations
and merge them via futures::stream::SelectAll.
Sign up for free to join this conversation on GitHub. Already have an account? Sign in to comment

Labels

None yet

Projects

None yet

Development

Successfully merging this pull request may close these issues.

1 participant

Comments