Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions agent/src/mqtt/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,12 @@ pub type SyncDevice = backend_api::models::SyncDevice;
pub type Ping = backend_api::models::Ping;
pub type Pong = backend_api::models::Pong;

pub async fn subscribe_all(client: &impl ClientI, device_id: &str) -> Result<(), MQTTError> {
subscribe_sync(client, device_id).await?;
subscribe_ping(client, device_id).await?;
Ok(())
}

pub async fn subscribe_sync(client: &impl ClientI, device_id: &str) -> Result<(), MQTTError> {
let topic = device_sync(device_id);
client.subscribe(&topic, QoS::AtLeastOnce).await
Expand Down
14 changes: 9 additions & 5 deletions agent/src/workers/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -178,11 +178,8 @@ async fn init_client<TokenManagerT: TokenManagerExt>(
let (mqtt_client, eventloop) = mqtt::Client::new(&options).await;

// subscribe to device synchronization updates
if let Err(e) = mqtt::device::subscribe_sync(&mqtt_client, device_id).await {
error!("error subscribing to device synchronization updates: {e:?}");
};
if let Err(e) = mqtt::device::subscribe_ping(&mqtt_client, device_id).await {
error!("error subscribing to device ping updates: {e:?}");
if let Err(e) = mqtt::device::subscribe_all(&mqtt_client, device_id).await {
error!("error subscribing to device topic: {e:?}");
};

(mqtt_client, eventloop)
Expand Down Expand Up @@ -228,6 +225,13 @@ pub async fn handle_event<ClientT: ClientI, SyncerT: SyncerExt>(
}
info!("Established connection to mqtt broker");
let _ = device_stor.patch(device::Updates::connected()).await;

// Re-subscribe on every successful (re)connect. rumqttc auto-reconnects
// internally without replaying subscribes, and the broker may have
// discarded session state, so we must restate our subscriptions here.
if let Err(e) = mqtt::device::subscribe_all(mqtt_client, device_id).await {
error!("error subscribing to device topic: {e:?}");
};
}
// update the device connection status on successful disconnections
Event::Incoming(Incoming::Disconnect) => {
Expand Down
71 changes: 71 additions & 0 deletions agent/tests/mqtt/device.rs
Original file line number Diff line number Diff line change
Expand Up @@ -16,6 +16,77 @@ fn mock_error() -> MQTTError {
})
}

mod subscribe_all {
use std::sync::atomic::{AtomicUsize, Ordering};
use std::sync::Arc;

use super::*;

#[tokio::test]
async fn happy_path() {
let client = MockClient::default();
device::subscribe_all(&client, "dvc_123").await.unwrap();

let calls = client.get_calls();
assert_eq!(calls.len(), 2);
assert!(matches!(
&calls[0],
MockCall::Subscribe { topic, qos }
if topic == "cmd/devices/dvc_123/sync" && *qos == QoS::AtLeastOnce
));
assert!(matches!(
&calls[1],
MockCall::Subscribe { topic, qos }
if topic == "v1/cmd/devices/dvc_123/ping" && *qos == QoS::AtLeastOnce
));
}

#[tokio::test]
async fn subscribe_sync_error_short_circuits() {
let client = MockClient {
subscribe_fn: Box::new(|| Err(Box::new(mock_error()))),
..Default::default()
};
let result = device::subscribe_all(&client, "dvc_123").await;
assert!(result.is_err());

// The `?` after subscribe_sync must short-circuit before subscribe_ping,
// so only the sync subscribe was attempted.
let calls = client.get_calls();
assert_eq!(calls.len(), 1);
assert!(matches!(
&calls[0],
MockCall::Subscribe { topic, .. } if topic == "cmd/devices/dvc_123/sync"
));
}

#[tokio::test]
async fn subscribe_ping_error_propagates() {
let counter = Arc::new(AtomicUsize::new(0));
let counter_for_fn = counter.clone();
let client = MockClient {
subscribe_fn: Box::new(move || {
if counter_for_fn.fetch_add(1, Ordering::SeqCst) == 0 {
Ok(())
} else {
Err(Box::new(mock_error()))
}
}),
..Default::default()
};
let result = device::subscribe_all(&client, "dvc_123").await;
assert!(result.is_err());

// sync succeeded, ping was attempted and failed — both calls recorded.
let calls = client.get_calls();
assert_eq!(calls.len(), 2);
assert!(matches!(
&calls[1],
MockCall::Subscribe { topic, .. } if topic == "v1/cmd/devices/dvc_123/ping"
));
}
}

mod subscribe_sync {
use super::*;

Expand Down
87 changes: 87 additions & 0 deletions agent/tests/workers/mqtt.rs
Original file line number Diff line number Diff line change
Expand Up @@ -113,6 +113,93 @@ pub mod handle_connection_events {
assert!(device.last_connected_at <= Utc::now());
}

#[tokio::test]
async fn connack_success_resubscribes_to_sync_and_ping() {
let dir = filesys::Dir::create_temp_dir("testing").await.unwrap();
let layout = Layout::new(dir);

let (device_file, _) =
storage::Device::spawn_with_default(64, layout.device(), Device::default())
.await
.unwrap();

let event = Event::Incoming(Incoming::ConnAck(ConnAck {
code: ConnectReturnCode::Success,
session_present: false,
}));
let mqtt_client = MockClient::default();
let syncer = MockSyncer::default();
let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await;

assert_eq!(
mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")),
1
);
assert_eq!(
mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")),
1
);
}

#[tokio::test]
async fn connack_non_success_does_not_subscribe() {
let dir = filesys::Dir::create_temp_dir("testing").await.unwrap();
let layout = Layout::new(dir);

let (device_file, _) =
storage::Device::spawn_with_default(64, layout.device(), Device::default())
.await
.unwrap();

let event = Event::Incoming(Incoming::ConnAck(ConnAck {
code: ConnectReturnCode::BadUserNamePassword,
session_present: false,
}));
let mqtt_client = MockClient::default();
let syncer = MockSyncer::default();
let err_streak =
handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await;
assert_eq!(err_streak, 0);

assert_eq!(
mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")),
0
);
assert_eq!(
mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")),
0
);
}

#[tokio::test]
async fn connack_subscribe_error_does_not_change_err_streak() {
let dir = filesys::Dir::create_temp_dir("testing").await.unwrap();
let layout = Layout::new(dir);

let (device_file, _) =
storage::Device::spawn_with_default(64, layout.device(), Device::default())
.await
.unwrap();

let event = Event::Incoming(Incoming::ConnAck(ConnAck {
code: ConnectReturnCode::Success,
session_present: false,
}));
let mqtt_client = MockClient {
subscribe_fn: Box::new(|| {
Err(Box::new(MQTTError::MockErr(MockErr {
is_authentication_error: false,
is_network_conn_err: false,
})))
}),
..Default::default()
};
let syncer = MockSyncer::default();
let err_streak =
handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await;
assert_eq!(err_streak, 0);
}

#[tokio::test]
async fn disconnect_event() {
let dir = filesys::Dir::create_temp_dir("testing").await.unwrap();
Expand Down
86 changes: 86 additions & 0 deletions plans/completed/20260507-drop-m2-from-mqtt-resubscribe.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,86 @@
# Drop M2 from mqtt-resubscribe hotfix

## Scope

Single repo, single branch: read-write on `agent/`, branch `fix/mqtt-resubscribe-on-reconnect` (PR #60 against `release/v0.8`).

## Purpose / Big Picture

Drop M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) from the v0.8.1 hotfix so the PR ships only M1 (re-subscribe on every successful ConnAck) and M3 (connection counter). M1 alone closes the silent-deafness bug for all reconnect paths; skipping M2 retains headroom against the user's EMQX Dedicated tier 1,000-session cap. Operators see the same fix delivered with no change to broker session-table pressure.

## Progress

- [ ] Rebase `fix/mqtt-resubscribe-on-reconnect` onto M1 to drop M2.
- [ ] Update `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md` to record the drop.

## Surprises & Discoveries

(Add entries as work proceeds.)

## Decision Log

- Decision: Drop M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) from the v0.8.1 hotfix.
Rationale: User's EMQX Dedicated tier caps the session table at 1,000. `clean_session=false` makes sessions persist across disconnects, so session count would trend toward fleet size + churn instead of online device count. M1 alone closes the bug for all reconnect paths; M2 was belt-and-suspenders. Skipping M2 keeps session-cap headroom on the current tier. M2 can land separately if a tier upgrade makes sessions cheap.
Date/Author: 2026-05-07 / orchestrator on user request.

## Outcomes & Retrospective

(Summarize at completion.)

## Context and Orientation

The branch `fix/mqtt-resubscribe-on-reconnect` is already pushed and has PR #60 open against `release/v0.8`. It currently carries three M-commits — M1 `41e89c8` (re-subscribe on ConnAck), M2 `5cdd092` (clean_session=false, to drop), M3 `635efc5` (connection counter, keep) — plus three pre-implementation plan commits, `f5d5f41` ("plan: mark mqtt-resubscribe hotfix complete"), and `9cc8054` ("plan: draft drop-m2 plan for v0.8.1 hotfix" — this very plan), totalling 8 commits over `origin/release/v0.8`.

M2 touches `agent/src/mqtt/options.rs`, `agent/src/mqtt/client.rs`, and `agent/tests/mqtt/options.rs`. M3 touches `agent/src/workers/mqtt.rs` and `agent/tests/workers/mqtt.rs`. The file sets are disjoint so the rebase that drops M2 will replay M3 cleanly. No `clean_session` references exist outside M2's files, so dropping M2 cannot break callers.

The existing plan at `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md` documents all three milestones and must be updated to reflect the drop in its Plan of Work, Concrete Steps, Validation and Acceptance, and Decision Log sections.

## Plan of Work

1. Rebase to drop M2: `git rebase --onto 41e89c8 5cdd092 fix/mqtt-resubscribe-on-reconnect`. M3 replays cleanly given the disjoint file sets.
2. Run `./scripts/test.sh` to confirm tests pass without M2 and its test file.
3. Edit `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`: add the M2-drop Decision Log entry, change "three milestones" to "two milestones" wherever it appears, remove or strikethrough the M2 milestone block in Plan of Work and Concrete Steps, and remove M2-specific entries from Validation and Acceptance.
4. Commit the plan edits as `plan: drop M2 in light of EMQX 1k session cap` (signed).
5. Run `./scripts/preflight.sh` and confirm clean before push.

## Concrete Steps

### Milestone 1 — drop M2 via rebase

No new commit added — the rebase rewrites M3 onto M1 directly.

cd /home/ben/miru/workbench5/repos/agent
git branch --show-current # expect: fix/mqtt-resubscribe-on-reconnect
git log --oneline origin/release/v0.8..HEAD # baseline: 8 commits
git rebase --onto 41e89c8 5cdd092 fix/mqtt-resubscribe-on-reconnect
git log --oneline origin/release/v0.8..HEAD # expect: 7 commits, no 5cdd092
git diff --stat origin/release/v0.8...HEAD # expect: only workers/mqtt.rs, tests/workers/mqtt.rs, plans/...
./scripts/test.sh # expect: all tests pass

### Milestone 2 — update plan doc

# Edit plans/completed/20260507-mqtt-resubscribe-on-reconnect.md per Plan of Work step 3.
git add plans/completed/20260507-mqtt-resubscribe-on-reconnect.md
git commit -S -m "plan: drop M2 in light of EMQX 1k session cap"

### Final preflight

./scripts/preflight.sh # expect: Preflight clean

## Validation and Acceptance

1. After the rebase + plan-doc edit commit, `git log --oneline origin/release/v0.8..HEAD` shows 8 commits, none with sha `5cdd092` and none whose subject mentions `clean_session`. M1 and M3 subjects (`re-subscribe on every successful ConnAck`, `log mqtt reconnects with a connection counter`) are present.
2. `git diff --name-only origin/release/v0.8...HEAD` lists only `agent/src/workers/mqtt.rs`, `agent/tests/workers/mqtt.rs`, `plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md`, and `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`.
3. `./scripts/test.sh` exits zero.
4. `./scripts/preflight.sh` reports `Preflight clean`. **This is the gate** — preflight must report clean before changes are pushed.
5. The plan doc no longer references the M2 milestone in Plan of Work, Concrete Steps, or Validation; the Decision Log includes the drop entry.

## Idempotence and Recovery

The rebase can be undone by `git reset --hard origin/fix/mqtt-resubscribe-on-reconnect`, which restores the pre-rebase state with M2 included. Plan-doc edits can be reverted with `git restore plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`. If the rebase reports a conflict (it should not, given file disjointness), abort with `git rebase --abort` and report — that signals an unexpected coupling worth investigating before retrying.

## Non-goals

- Not modifying `fix/mqtt-resubscribe-on-reconnect-main` (PR #61). The same operation will be applied there as a follow-up.
- Not bumping the Cargo workspace version.
- Not opening a new PR — the existing PR #60 will be updated by the task orchestrator.
Loading