From bcab3072e386d7dcc38becbfe6cafa0aebf57851 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 15:58:12 -0700 Subject: [PATCH 01/13] plan: draft v0.8.1 hotfix for mqtt resubscribe on reconnect Adds the ExecPlan for fixing the silent MQTT-deafness bug observed on a v0.7.0 device: agent loses topic subscriptions after rumqttc auto-reconnect because subscribes only happen in init_client and clean_session defaults to true. Plan covers re-subscribe on every successful ConnAck, setting clean_session=false, and adding a connection counter to make reconnects visible in INFO logs. --- .../20260507-mqtt-resubscribe-on-reconnect.md | 437 ++++++++++++++++++ 1 file changed, 437 insertions(+) create mode 100644 plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md diff --git a/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md b/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md new file mode 100644 index 00000000..3a8faa9f --- /dev/null +++ b/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md @@ -0,0 +1,437 @@ +# MQTT re-subscribe on reconnect (v0.8.1 hotfix) + +## Scope + +Single repository: `agent/` only. Target branch is `release/v0.8`; the working branch is `fix/mqtt-resubscribe-on-reconnect`, branched from `release/v0.8`. All edits live under the `agent/` crate (Rust workspace member at `/home/ben/miru/workbench5/repos/agent/agent/`). No other repos are read or written. + +Cherry-picking this fix to `main` is out of scope — that is a separate follow-up PR. + +## Purpose / Big Picture + +A device running the agent loses MQTT command reception after a transient broker-side disconnect, and stays in that broken state until the agent process is restarted. Field log evidence shows v0.7.0 devices stuck like this for 6+ days. The bug exists in v0.6.0, v0.7.0, and current `main`. + +Root cause: the agent subscribes to its two MQTT command topics exactly once, inside `init_client`, immediately after the first connect. The underlying `rumqttc` client auto-reconnects on network drops, but it does **not** replay subscribes on the new TCP session. Because the agent uses MQTT's default `clean_session = true`, the broker also discards the device's subscription state on every disconnect. Net effect: after the first reconnect, the agent's TCP session is healthy and the broker is happy, but no command topics are subscribed on either side, so command messages silently disappear. + +After this change a user gains: + +1. **Sync and ping commands continue to be received after a transient broker disconnect.** The agent re-subscribes on every successful `ConnAck` and also asks the broker to remember its subscriptions (`clean_session = false`). Either fix alone would close the bug; both together are belt-and-suspenders so a future regression in one does not silently re-introduce the problem. +2. **Reconnects are visible in INFO logs.** A connection counter is included in the "Established connection to mqtt broker" log line, so future incidents can be diagnosed from logs alone (e.g. "connection #1" then "connection #2" half a day later, before commands stopped flowing). + +User-visible behavior to verify: trigger a sync from the backend, kill the broker connection (e.g. via `tc`/iptables drop, or a broker-side restart), wait for the agent to reconnect, trigger another sync — the second sync command must reach the device. + +## Progress + +Add entries as work proceeds. + +- [ ] Milestone 1: re-subscribe on every ConnAck Success +- [ ] Milestone 2: set `clean_session = false` +- [ ] Milestone 3: connection counter in reconnect log + +## Surprises & Discoveries + +Add entries as work proceeds. + +## Decision Log + +Add entries as work proceeds. + +## Outcomes & Retrospective + +Add entries as work proceeds. + +## Context and Orientation + +This repo is the Miru device agent: a long-running Rust binary that talks to a backend over HTTPS and to an MQTT broker for command delivery. The Cargo workspace member at `agent/` contains the binary and its modules. + +Relevant files (all paths relative to repo root, which is `/home/ben/miru/workbench5/repos/agent`): + +- `agent/src/workers/mqtt.rs` — the MQTT worker. Owns the event loop, dispatches incoming events, and handles errors. The bug lives here (no re-subscribe on `ConnAck`). +- `agent/src/mqtt/client.rs` — wrapper around `rumqttc::AsyncClient`. `Client::new` builds the `MqttOptions`. `clean_session` is never set today, so it defaults to `true`. +- `agent/src/mqtt/options.rs` — our `Options` struct. Currently has no `clean_session` field. +- `agent/src/mqtt/device.rs` — helpers `subscribe_sync` and `subscribe_ping`. Used by `init_client` and (after this change) by the ConnAck arm in `handle_event`. +- `agent/src/mqtt/topics.rs` — topic string formatters. `device_sync` is `cmd/devices/{id}/sync` (no version prefix, intentional — see comment in file). `device_ping` is `v1/cmd/devices/{id}/ping`. +- `agent/tests/workers/mqtt.rs` — integration tests for `handle_event`, `handle_error`, `handle_syncer_event`. Pattern for new ConnAck tests is the existing `successful_connack_event` test. +- `agent/tests/mqtt/mock.rs` — `MockClient` records every subscribe call. `num_subscribe_calls_to(topic)` is the assertion helper. `subscribe_fn` is overridable to return errors. +- `agent/tests/mqtt/client.rs` — `Client::new` integration tests against a real `rumqttd` broker via `mock::run_broker`. +- `scripts/test.sh` — canonical test runner: `RUST_LOG=off cargo test --features test --package miru-agent`. Always use this. +- `AGENTS.md` — repo conventions, including the strict 3-block import order (`std`, `crate`, external) used in every source file. + +Key code anchors as of this checkout: + +- `handle_event` in `agent/src/workers/mqtt.rs` lines 214–259. The `Incoming::ConnAck` arm is lines 225–231. +- `init_client` in the same file lines 158–189. The pattern to mirror for re-subscribing is at lines 181 and 184. +- `State` struct lines 298–302; constructed at lines 107–111 of `run_impl`. +- `handle_error` lines 304–358. Only re-creates the client on auth errors. Network errors are absorbed (rumqttc auto-reconnects internally) and the bug surfaces precisely because that internal auto-reconnect bypasses our subscribe code. +- `Client::new` in `agent/src/mqtt/client.rs` lines 44–73. Build site for `MqttOptions`. +- `Options` in `agent/src/mqtt/options.rs` lines 61–112. Builder-style. + +Background a beginner needs: + +- **MQTT subscribe is a session-state operation.** In MQTT 3.1.1, `clean_session = true` (rumqttc's default) tells the broker to discard everything when the client disconnects, including the subscriptions the client made. `clean_session = false` plus a stable client ID tells the broker to remember subscriptions across disconnects, so messages can be queued for the device while it's offline and delivered on reconnect. +- **rumqttc's auto-reconnect.** `rumqttc::AsyncClient` will internally re-establish the TCP connection if the broker drops it, and you observe each reconnect as a fresh `Event::Incoming(Incoming::ConnAck(..))` from `eventloop.poll()`. It does **not** re-issue subscribe packets for you. You must do that yourself in response to the `ConnAck`. +- **`session_present`.** The `ConnAck` packet carries a `session_present: bool` flag. With `clean_session = false`, on the first connect it is `false`; on a subsequent reconnect where the broker still has session state, it is `true`. We do not branch on this flag — re-subscribing unconditionally on every `ConnAck::Success` is correct and idempotent (the broker will simply replace the existing subscription with itself). + +## Plan of Work + +Three milestones. Each ends in one signed git commit. The commits should be reviewable in isolation. + +**Milestone 1 — defensive re-subscribe.** In `handle_event`, after the existing log line and storage patch in the `ConnAck::Success` path, call `mqtt::device::subscribe_sync` and `mqtt::device::subscribe_ping` exactly the way `init_client` does. Same error strings. Subscribe failures are logged but do not change the returned `err_streak` (no need to drive the cooldown; the next reconnect will retry). This alone fixes the bug for the rumqttc auto-reconnect path even on brokers that do not honor `clean_session = false`. + +**Milestone 2 — durable broker session.** In `mqtt::Options`, add a `clean_session: bool` field defaulting to `false`. In `Client::new`, propagate it to `MqttOptions::set_clean_session`. With a stable, non-empty `client_id` (which we always have — it is the device id) this asks the broker to keep our subscriptions and queued messages across disconnects, providing a second line of defense and reducing message loss during the gap. + + rumqttc's `set_clean_session` panics if the client_id is empty *and* clean_session is false. In production `client_id` is always the device id (non-empty); in `Options::default()` the fallback `client_id` is `"miru-agent"` (also non-empty). No call site is at risk. + +**Milestone 3 — observable reconnects.** Add a `connect_count: u32` field to `State`, initialized to `0`. Change `handle_event`'s signature to accept `connect_count: &mut u32`, and in the `ConnAck::Success` path increment it via `saturating_add(1)` before logging. Update the log line to `Established connection to mqtt broker (connection #{n})`. Update `run_impl` to pass `&mut state.connect_count`. Mechanically update existing handle_event tests to thread an `&mut counter` argument. + + Counting on every Success ConnAck (not just reconnects) is intentional — connection #1 is the initial connect; counts ≥2 are reconnects. That mapping makes log forensics obvious. + +### Why these three specifically and in this order + +- Milestone 1 fixes the user-visible bug. If milestones 2 and 3 are deferred for any reason, the hotfix is still complete. +- Milestone 2 hardens the fix and reduces message loss during the gap, but only matters if the broker honors `clean_session=false` — Milestone 1 must therefore not depend on it. +- Milestone 3 is observability. It is dependent on neither 1 nor 2 and is small, but it carries a signature change to `handle_event` that touches every existing test in `agent/tests/workers/mqtt.rs`. Doing it last keeps those test edits out of the milestones whose own tests are easiest to read. + +### Test strategy per milestone + +- **Milestone 1 tests** — three new tests in the `handle_connection_events` mod inside `agent/tests/workers/mqtt.rs`: + - `connack_success_resubscribes_to_sync_and_ping`: send a `ConnAck::Success` through `handle_event` with default `MockClient`. Assert `mock.num_subscribe_calls_to("cmd/devices/device_id/sync") == 1` and `mock.num_subscribe_calls_to("v1/cmd/devices/device_id/ping") == 1`. + - `connack_non_success_does_not_subscribe`: send a `ConnAck` with `code = ConnectReturnCode::BadUserNamePassword`. Assert no subscribe calls were made on the mock. + - `connack_subscribe_error_does_not_change_err_streak`: configure `subscribe_fn` to return `MQTTError::MockErr { is_authentication_error: false, is_network_conn_err: false }`. Assert `handle_event` returns `0` (the same `err_streak` it would have returned without the failure) and does not panic. + +- **Milestone 2 tests** — one new unit test in `agent/tests/mqtt/options.rs` (or, equivalently, `client.rs`): + - `default_options_have_persistent_session`: build `Options::new(Credentials::default())` and `Options::default()`. Assert `options.clean_session == false`. + + This is a flag-level assertion rather than a broker-level behavioral assertion. The broker-side check (start a `rumqttd` instance, connect with `clean_session = false`, disconnect, reconnect, assert `session_present == true`) is possible via the existing `mock::run_broker` harness in `agent/tests/mqtt/mock.rs`, but: (a) `rumqttd` 0.19's session persistence behavior is not contractually guaranteed across versions; (b) the hotfix's correctness rests on a single-line `set_clean_session(false)` call in `Client::new`, which is trivially audited by reading the diff. A flag-level test catches the only realistic regression (someone deletes the line). End-to-end reconnect testing is explicitly listed as a non-goal below. + +- **Milestone 3 tests** — modify the existing `successful_connack_event` test to also assert that the counter has been incremented (`assert_eq!(connect_count, 1)` after one Success ConnAck; build a second event and verify the counter advances to `2`). All other existing `handle_event` tests need only the mechanical signature update. + +### Non-negotiable: import order + +Every Rust source file in this repo follows a strict 3-block import order: `std` first, then `crate::*` internal imports, then external crates, each block separated by a blank line and an inline comment. New tests and edits must preserve this. See existing files in `agent/src/workers/mqtt.rs` and `agent/tests/workers/mqtt.rs` lines 1–21 for canonical examples. + +## Concrete Steps + +All commands assume the working directory is the repo root: `/home/ben/miru/workbench5/repos/agent`. Confirm with `pwd` before starting. The branch `fix/mqtt-resubscribe-on-reconnect` should already be checked out; verify with `git branch --show-current` before each commit. + +### Setup + + cd /home/ben/miru/workbench5/repos/agent + git branch --show-current # expect: fix/mqtt-resubscribe-on-reconnect + git status # expect: clean working tree + ./scripts/test.sh # baseline: must pass before starting + +### Milestone 1 — re-subscribe on every successful ConnAck + +Edit `agent/src/workers/mqtt.rs`. In `handle_event`, replace the body of the `Event::Incoming(Incoming::ConnAck(connack))` arm so it reads: + + Event::Incoming(Incoming::ConnAck(connack)) => { + if connack.code != ConnectReturnCode::Success { + return err_streak; + } + info!("Established connection to mqtt broker"); + let _ = device_stor.patch(device::Updates::connected()).await; + + // Re-subscribe on every successful (re)connect. rumqttc auto-reconnects + // internally without replaying subscribes, and the broker may have + // discarded session state, so we must restate our subscriptions here. + if let Err(e) = mqtt::device::subscribe_sync(mqtt_client, device_id).await { + error!("error subscribing to device synchronization updates: {e:?}"); + }; + if let Err(e) = mqtt::device::subscribe_ping(mqtt_client, device_id).await { + error!("error subscribing to device ping updates: {e:?}"); + }; + } + +(The error strings must match `init_client` verbatim — see `agent/src/workers/mqtt.rs` lines 181–186 for the source of truth.) + +Edit `agent/tests/workers/mqtt.rs`. Inside `pub mod handle_connection_events`, add three tests after `successful_connack_event`: + + #[tokio::test] + async fn connack_success_resubscribes_to_sync_and_ping() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), + 1 + ); + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")), + 1 + ); + } + + #[tokio::test] + async fn connack_non_success_does_not_subscribe() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::BadUserNamePassword, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + assert_eq!(err_streak, 0); + + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), + 0 + ); + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")), + 0 + ); + } + + #[tokio::test] + async fn connack_subscribe_error_does_not_change_err_streak() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mut mqtt_client = MockClient::default(); + mqtt_client.subscribe_fn = Box::new(|| { + Err(Box::new(MQTTError::MockErr(MockErr { + is_authentication_error: false, + is_network_conn_err: false, + }))) + }); + let syncer = MockSyncer::default(); + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + assert_eq!(err_streak, 0); + } + +Run the tests: + + ./scripts/test.sh + +All three new tests must pass. The two pre-existing `connack` tests must continue to pass. + +Verify clean_session is not yet set (it's set in milestone 2): + + grep -n "set_clean_session\|clean_session" agent/src/mqtt/client.rs agent/src/mqtt/options.rs # expect: no matches + +Commit: + + git add -A + git status # review the diff list + git diff --cached --stat # sanity check + git commit -S -m "fix(mqtt): re-subscribe on every successful ConnAck" + +### Milestone 2 — clean_session = false + +Edit `agent/src/mqtt/options.rs`. Add a `clean_session: bool` field on `Options`: + + #[derive(Debug, Clone)] + pub struct Options { + pub connect_address: ConnectAddress, + pub credentials: Credentials, + pub client_id: String, + pub keep_alive: Duration, + pub timeouts: Timeouts, + pub capacity: usize, + pub clean_session: bool, + } + +In the `impl Options { pub fn new(...) }` constructor, set `clean_session: false`. (Do not add a `with_clean_session` builder unless needed by a test — keep the diff minimal.) + +Edit `agent/src/mqtt/client.rs`. In `Client::new`, after `set_credentials` and before the `match options.connect_address.protocol` block, add: + + mqtt_options.set_clean_session(options.clean_session); + +Add one new unit test in `agent/tests/mqtt/options.rs` (the file already exists). Use the existing imports as a guide; if `Credentials` and `Options` aren't already imported there, add them in the proper import block: + + #[test] + fn default_options_have_persistent_session() { + let options = Options::new(Credentials { + username: "u".to_string(), + password: "p".to_string(), + }); + assert!(!options.clean_session, "clean_session must default to false so the broker preserves subscriptions across reconnects"); + + let default_options = Options::default(); + assert!(!default_options.clean_session); + } + +Run the tests: + + ./scripts/test.sh + +The new test must pass. All previously passing tests must still pass — in particular `test_mqtt_client` (which connects to a real `rumqttd`) must still succeed; with a non-empty `client_id` (`test_user`), `set_clean_session(false)` is fine. + +Commit: + + git add -A + git diff --cached --stat + git commit -S -m "fix(mqtt): set clean_session=false to preserve subscriptions across reconnects" + +### Milestone 3 — observable reconnects + +Edit `agent/src/workers/mqtt.rs`: + +1. Add a field to `State`: + + pub struct State { + pub client: mqtt::Client, + pub eventloop: EventLoop, + pub err_streak: ErrStreak, + pub connect_count: u32, + } + +2. Initialize it in `run_impl` where `State` is constructed (around line 107): + + let mut state = State { + client: mqtt_client, + eventloop, + err_streak: 0, + connect_count: 0, + }; + +3. Change `handle_event`'s signature to take a counter: + + pub async fn handle_event( + event: &Event, + mqtt_client: &ClientT, + syncer: &SyncerT, + device_id: &str, + device_stor: &storage::Device, + connect_count: &mut u32, + ) -> ErrStreak { + +4. Update the `ConnAck::Success` path to increment and log: + + if connack.code != ConnectReturnCode::Success { + return err_streak; + } + *connect_count = connect_count.saturating_add(1); + info!("Established connection to mqtt broker (connection #{count})", count = *connect_count); + let _ = device_stor.patch(device::Updates::connected()).await; + // ... existing re-subscribe block unchanged ... + +5. Update the `run_impl` call site to pass `&mut state.connect_count`: + + state.err_streak = handle_event( + &mqtt_event, + &state.client, + syncer, + &device.id, + device_stor, + &mut state.connect_count, + ).await; + +Edit `agent/tests/workers/mqtt.rs`. Every existing call site of `handle_event` needs an extra `&mut counter` argument. The mechanical pattern: declare `let mut connect_count: u32 = 0;` before the call, then pass `&mut connect_count`. Touch every test in `handle_connection_events`, `handle_sync_events`, `handle_ping_events`, and the three new milestone-1 tests. + +Strengthen `successful_connack_event` (or add a new test `connack_success_increments_connect_count`): + + #[tokio::test] + async fn connack_success_increments_connect_count() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let mut connect_count: u32 = 0; + + let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file, &mut connect_count).await; + assert_eq!(connect_count, 1); + + let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file, &mut connect_count).await; + assert_eq!(connect_count, 2); + } + +Run the tests: + + ./scripts/test.sh + +All tests must pass. + +Manual sanity check (optional but recommended): start the binary against a local broker and observe the log line at INFO level: + + Established connection to mqtt broker (connection #1) + +Commit: + + git add -A + git diff --cached --stat + git commit -S -m "feat(mqtt): log mqtt reconnects with a connection counter" + +### Final preflight + +Run preflight (the `preflight` skill) and confirm it reports `clean` before pushing or opening a PR. Do not push if preflight is not clean — fix findings first. + +## Validation and Acceptance + +**Acceptance is behavioral, not just "tests pass":** + +1. Without milestone 1's edit, `connack_success_resubscribes_to_sync_and_ping` and `connack_subscribe_error_does_not_change_err_streak` fail. After milestone 1, both pass. Verify by checking out the file before edit, running `./scripts/test.sh`, observing the failures, then reverting. +2. Without milestone 2's edit, `default_options_have_persistent_session` fails. After milestone 2, it passes. +3. Without milestone 3's edit, `connack_success_increments_connect_count` does not compile (no counter argument exists). After milestone 3, it passes and prints `connection #1` then `connection #2` on two successive Success ConnAcks. +4. `./scripts/test.sh` exits zero with all new tests included. +5. Preflight reports `clean`. +6. Manual reconnect smoke test (optional but strongly recommended before tagging v0.8.1): run the agent against a local `rumqttd` (or staging broker), confirm "connection #1" log, kill the broker connection, confirm a "connection #2" log appears with no agent restart, then trigger a sync command from the backend and confirm the agent processes it. + +The full integration check — disconnect-and-reconnect over a real broker — is **not** automated as part of this PR. The harness exists (`mock::run_broker`) and a follow-up plan can add it; doing so here would inflate the diff beyond the hotfix's scope. + +## Idempotence and Recovery + +Each milestone is a single commit and each test run is independent. Re-running `./scripts/test.sh` is safe and idempotent. + +If a milestone's tests fail after the edit, the recovery path is: + + git diff # inspect uncommitted changes + git restore # discard changes to one file + # or, after a bad commit: + git reset --soft HEAD~1 # uncommit but keep changes staged + +The signature change in milestone 3 is the only edit that breaks compilation across many files — fix all `handle_event` call sites in `agent/tests/workers/mqtt.rs` together (Cargo's compile error list will name every site). There is no risky migration here: no on-disk state changes, no API contract changes, no schema changes. The only behavioral change visible to the broker is `clean_session = false`, which is renegotiated automatically on the next connect. + +If a hotfix needs to be reverted in the field, `git revert` of any of the three commits independently is safe; reverting milestone 1 while keeping milestone 2 leaves the original bug only partially mitigated (broker-side session might survive but the agent still doesn't replay subscribes if the broker forgets), so revert in reverse order if reverting more than one. + +## Non-goals + +Stated explicitly so the implementer does not gold-plate: + +- **Not** bumping the Cargo workspace version. The release engineer owns versioning and the v0.8.1 tag. +- **Not** shortening the 12 h poller interval in `agent/src/workers/poller.rs`. That is a separate config decision tracked elsewhere. +- **Not** refactoring `init_client` to remove its now-redundant subscribe calls. They become duplicates of the ConnAck-arm subscribes after milestone 1, but removing them changes the no-broker startup path's logging and is out of hotfix scope. +- **Not** adding rumqttd-based end-to-end reconnect tests. The harness exists in `agent/tests/mqtt/mock.rs` (`run_broker`); use it in a follow-up if desired. +- **Not** cherry-picking to `main`. That is a separate PR and is out of this plan's scope. +- **Not** branching on `ConnAck.session_present`. Re-subscribing unconditionally on every Success ConnAck is correct and idempotent, and avoids a class of bugs where the broker reports `session_present = true` but has actually forgotten the subscriptions. From 5e06a551ff38e2808123c6a97fb5f55a45685a60 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 16:09:35 -0700 Subject: [PATCH 02/13] plan: refine v0.8.1 mqtt-resubscribe hotfix plan Tightens redundancy and fixes one defect in the validation section. Two review iterations: 6 conciseness fixes (cherry-pick exclusion stated once, trim duplicated rumqttc rationale, collapse milestone ordering rationale, defer to AGENTS.md for import-order rule, specify mod placement for the options test, replace tautological validation items with behavioral observables) plus one correctness fix (validation item 1 referenced an init_client + handle_event flow that's not testable because init_client is private and constructs a real mqtt::Client; rewritten to match what the M1 tests actually assert and call out the second-connect behavior as a code-audit observable). --- .../20260507-mqtt-resubscribe-on-reconnect.md | 20 +++++++------------ 1 file changed, 7 insertions(+), 13 deletions(-) diff --git a/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md b/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md index 3a8faa9f..dc241a27 100644 --- a/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md +++ b/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md @@ -4,8 +4,6 @@ Single repository: `agent/` only. Target branch is `release/v0.8`; the working branch is `fix/mqtt-resubscribe-on-reconnect`, branched from `release/v0.8`. All edits live under the `agent/` crate (Rust workspace member at `/home/ben/miru/workbench5/repos/agent/agent/`). No other repos are read or written. -Cherry-picking this fix to `main` is out of scope — that is a separate follow-up PR. - ## Purpose / Big Picture A device running the agent loses MQTT command reception after a transient broker-side disconnect, and stays in that broken state until the agent process is restarted. Field log evidence shows v0.7.0 devices stuck like this for 6+ days. The bug exists in v0.6.0, v0.7.0, and current `main`. @@ -75,7 +73,7 @@ Background a beginner needs: Three milestones. Each ends in one signed git commit. The commits should be reviewable in isolation. -**Milestone 1 — defensive re-subscribe.** In `handle_event`, after the existing log line and storage patch in the `ConnAck::Success` path, call `mqtt::device::subscribe_sync` and `mqtt::device::subscribe_ping` exactly the way `init_client` does. Same error strings. Subscribe failures are logged but do not change the returned `err_streak` (no need to drive the cooldown; the next reconnect will retry). This alone fixes the bug for the rumqttc auto-reconnect path even on brokers that do not honor `clean_session = false`. +**Milestone 1 — defensive re-subscribe.** In `handle_event`, after the existing log line and storage patch in the `ConnAck::Success` path, call `mqtt::device::subscribe_sync` and `mqtt::device::subscribe_ping` exactly the way `init_client` does. Same error strings. Subscribe failures are logged but do not change the returned `err_streak` (no need to drive the cooldown; the next reconnect will retry). This alone fixes the bug per the root-cause analysis in Purpose, even on brokers that do not honor `clean_session = false`. **Milestone 2 — durable broker session.** In `mqtt::Options`, add a `clean_session: bool` field defaulting to `false`. In `Client::new`, propagate it to `MqttOptions::set_clean_session`. With a stable, non-empty `client_id` (which we always have — it is the device id) this asks the broker to keep our subscriptions and queued messages across disconnects, providing a second line of defense and reducing message loss during the gap. @@ -85,11 +83,7 @@ Three milestones. Each ends in one signed git commit. The commits should be revi Counting on every Success ConnAck (not just reconnects) is intentional — connection #1 is the initial connect; counts ≥2 are reconnects. That mapping makes log forensics obvious. -### Why these three specifically and in this order - -- Milestone 1 fixes the user-visible bug. If milestones 2 and 3 are deferred for any reason, the hotfix is still complete. -- Milestone 2 hardens the fix and reduces message loss during the gap, but only matters if the broker honors `clean_session=false` — Milestone 1 must therefore not depend on it. -- Milestone 3 is observability. It is dependent on neither 1 nor 2 and is small, but it carries a signature change to `handle_event` that touches every existing test in `agent/tests/workers/mqtt.rs`. Doing it last keeps those test edits out of the milestones whose own tests are easiest to read. +Order rationale: M3 last because its `handle_event` signature change forces edits to every existing test in `agent/tests/workers/mqtt.rs`; doing it last keeps M1 and M2 diffs reviewable. ### Test strategy per milestone @@ -107,7 +101,7 @@ Three milestones. Each ends in one signed git commit. The commits should be revi ### Non-negotiable: import order -Every Rust source file in this repo follows a strict 3-block import order: `std` first, then `crate::*` internal imports, then external crates, each block separated by a blank line and an inline comment. New tests and edits must preserve this. See existing files in `agent/src/workers/mqtt.rs` and `agent/tests/workers/mqtt.rs` lines 1–21 for canonical examples. +Preserve the 3-block import order described in `AGENTS.md` when adding new tests. See `agent/tests/workers/mqtt.rs` lines 1–21 for a canonical example. ## Concrete Steps @@ -269,7 +263,7 @@ Edit `agent/src/mqtt/client.rs`. In `Client::new`, after `set_credentials` and b mqtt_options.set_clean_session(options.clean_session); -Add one new unit test in `agent/tests/mqtt/options.rs` (the file already exists). Use the existing imports as a guide; if `Credentials` and `Options` aren't already imported there, add them in the proper import block: +Add one new unit test in `agent/tests/mqtt/options.rs` (the file already exists). Place the new test inside `mod opts` (alongside `new_defaults` / `default`), so it inherits the existing `use super::*;` imports. If `Credentials` and `Options` aren't already imported in that mod's scope, add them in the proper import block: #[test] fn default_options_have_persistent_session() { @@ -401,9 +395,9 @@ Run preflight (the `preflight` skill) and confirm it reports `clean` before push **Acceptance is behavioral, not just "tests pass":** -1. Without milestone 1's edit, `connack_success_resubscribes_to_sync_and_ping` and `connack_subscribe_error_does_not_change_err_streak` fail. After milestone 1, both pass. Verify by checking out the file before edit, running `./scripts/test.sh`, observing the failures, then reverting. -2. Without milestone 2's edit, `default_options_have_persistent_session` fails. After milestone 2, it passes. -3. Without milestone 3's edit, `connack_success_increments_connect_count` does not compile (no counter argument exists). After milestone 3, it passes and prints `connection #1` then `connection #2` on two successive Success ConnAcks. +1. After milestone 1, the `Incoming::ConnAck(Success)` arm of `handle_event` invokes `subscribe_sync` and `subscribe_ping` exactly once per call — observable as `MockClient::num_subscribe_calls_to("cmd/devices/device_id/sync") == 1` and `... ("v1/cmd/devices/device_id/ping") == 1` after a single Success ConnAck (test `connack_success_resubscribes_to_sync_and_ping`). Non-Success ConnAcks produce zero subscribe calls (test `connack_non_success_does_not_subscribe`). A subscribe failure on the ConnAck arm does not change `err_streak` (test `connack_subscribe_error_does_not_change_err_streak`). Code audit: `init_client`'s startup subscribes at `agent/src/workers/mqtt.rs:181,184` remain in place, so a running device performs two subscribes per topic on the second connect (one at startup via `init_client`, one per ConnAck via the new arm). +2. After milestone 2, both `Options::new(...)` and `Options::default()` produce a struct with `clean_session == false`, and `Client::new` propagates that to `MqttOptions::set_clean_session(false)` (audit by reading the diff in `agent/src/mqtt/client.rs`). +3. After milestone 3, on two successive `ConnAck::Success` events through `handle_event`, the threaded `connect_count` advances `0 → 1 → 2` and the INFO log line reads `Established connection to mqtt broker (connection #1)` then `... (connection #2)`. 4. `./scripts/test.sh` exits zero with all new tests included. 5. Preflight reports `clean`. 6. Manual reconnect smoke test (optional but strongly recommended before tagging v0.8.1): run the agent against a local `rumqttd` (or staging broker), confirm "connection #1" log, kill the broker connection, confirm a "connection #2" log appears with no agent restart, then trigger a sync command from the backend and confirm the agent processes it. From 529d776f1d69ba57962b1ac140da0bb49123bcc3 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 16:09:49 -0700 Subject: [PATCH 03/13] plan: promote mqtt-resubscribe plan to active for implementation --- .../{backlog => active}/20260507-mqtt-resubscribe-on-reconnect.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plans/{backlog => active}/20260507-mqtt-resubscribe-on-reconnect.md (100%) diff --git a/plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md b/plans/active/20260507-mqtt-resubscribe-on-reconnect.md similarity index 100% rename from plans/backlog/20260507-mqtt-resubscribe-on-reconnect.md rename to plans/active/20260507-mqtt-resubscribe-on-reconnect.md From 41e89c8dcb02ca8ca5e31c598d618d8b17fd301f Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 16:15:20 -0700 Subject: [PATCH 04/13] fix(mqtt): re-subscribe on every successful ConnAck --- agent/src/workers/mqtt.rs | 10 +++++ agent/tests/workers/mqtt.rs | 87 +++++++++++++++++++++++++++++++++++++ 2 files changed, 97 insertions(+) diff --git a/agent/src/workers/mqtt.rs b/agent/src/workers/mqtt.rs index 26917f70..6e7da006 100644 --- a/agent/src/workers/mqtt.rs +++ b/agent/src/workers/mqtt.rs @@ -228,6 +228,16 @@ pub async fn handle_event( } info!("Established connection to mqtt broker"); let _ = device_stor.patch(device::Updates::connected()).await; + + // Re-subscribe on every successful (re)connect. rumqttc auto-reconnects + // internally without replaying subscribes, and the broker may have + // discarded session state, so we must restate our subscriptions here. + if let Err(e) = mqtt::device::subscribe_sync(mqtt_client, device_id).await { + error!("error subscribing to device synchronization updates: {e:?}"); + }; + if let Err(e) = mqtt::device::subscribe_ping(mqtt_client, device_id).await { + error!("error subscribing to device ping updates: {e:?}"); + }; } // update the device connection status on successful disconnections Event::Incoming(Incoming::Disconnect) => { diff --git a/agent/tests/workers/mqtt.rs b/agent/tests/workers/mqtt.rs index 49a0c4b3..6a36f814 100644 --- a/agent/tests/workers/mqtt.rs +++ b/agent/tests/workers/mqtt.rs @@ -113,6 +113,93 @@ pub mod handle_connection_events { assert!(device.last_connected_at <= Utc::now()); } + #[tokio::test] + async fn connack_success_resubscribes_to_sync_and_ping() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), + 1 + ); + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")), + 1 + ); + } + + #[tokio::test] + async fn connack_non_success_does_not_subscribe() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::BadUserNamePassword, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + assert_eq!(err_streak, 0); + + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), + 0 + ); + assert_eq!( + mqtt_client.num_subscribe_calls_to(&topics::device_ping("device_id")), + 0 + ); + } + + #[tokio::test] + async fn connack_subscribe_error_does_not_change_err_streak() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mqtt_client = MockClient { + subscribe_fn: Box::new(|| { + Err(Box::new(MQTTError::MockErr(MockErr { + is_authentication_error: false, + is_network_conn_err: false, + }))) + }), + ..Default::default() + }; + let syncer = MockSyncer::default(); + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + assert_eq!(err_streak, 0); + } + #[tokio::test] async fn disconnect_event() { let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); From d331ddea9a9ed0530786073f84d921c53b7dd9e2 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 16:19:47 -0700 Subject: [PATCH 05/13] feat(mqtt): log mqtt reconnects with a connection counter --- agent/src/workers/mqtt.rs | 10 +- agent/tests/workers/mqtt.rs | 186 +++++++++++++++++++++++++++++++----- 2 files changed, 172 insertions(+), 24 deletions(-) diff --git a/agent/src/workers/mqtt.rs b/agent/src/workers/mqtt.rs index 6e7da006..3795e993 100644 --- a/agent/src/workers/mqtt.rs +++ b/agent/src/workers/mqtt.rs @@ -108,6 +108,7 @@ pub async fn run_impl { @@ -217,6 +219,7 @@ pub async fn handle_event( syncer: &SyncerT, device_id: &str, device_stor: &storage::Device, + connect_count: &mut u32, ) -> ErrStreak { let err_streak = 0; @@ -226,7 +229,11 @@ pub async fn handle_event( if connack.code != ConnectReturnCode::Success { return err_streak; } - info!("Established connection to mqtt broker"); + *connect_count = connect_count.saturating_add(1); + info!( + "Established connection to mqtt broker (connection #{count})", + count = *connect_count + ); let _ = device_stor.patch(device::Updates::connected()).await; // Re-subscribe on every successful (re)connect. rumqttc auto-reconnects @@ -309,6 +316,7 @@ pub struct State { pub client: mqtt::Client, pub eventloop: EventLoop, pub err_streak: ErrStreak, + pub connect_count: u32, } pub async fn handle_error( diff --git a/agent/tests/workers/mqtt.rs b/agent/tests/workers/mqtt.rs index 6a36f814..3568070f 100644 --- a/agent/tests/workers/mqtt.rs +++ b/agent/tests/workers/mqtt.rs @@ -72,8 +72,16 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 0); @@ -102,9 +110,17 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); + let mut connect_count: u32 = 0; let before_event = Utc::now(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); let device = device_file.read().await.unwrap(); @@ -129,7 +145,16 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let mut connect_count: u32 = 0; + let _ = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!( mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), @@ -157,8 +182,16 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!( @@ -195,11 +228,59 @@ pub mod handle_connection_events { ..Default::default() }; let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); } + #[tokio::test] + async fn connack_success_increments_connect_count() { + let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); + let layout = Layout::new(dir); + let (device_file, _) = + storage::Device::spawn_with_default(64, layout.device(), Device::default()) + .await + .unwrap(); + + let event = Event::Incoming(Incoming::ConnAck(ConnAck { + code: ConnectReturnCode::Success, + session_present: false, + })); + let mqtt_client = MockClient::default(); + let syncer = MockSyncer::default(); + let mut connect_count: u32 = 0; + + let _ = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; + assert_eq!(connect_count, 1); + + let _ = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; + assert_eq!(connect_count, 2); + } + #[tokio::test] async fn disconnect_event() { let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); @@ -220,9 +301,17 @@ pub mod handle_connection_events { let event = Event::Incoming(Incoming::Disconnect); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); + let mut connect_count: u32 = 0; let before_event = Utc::now(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + "device_id", + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); let device = device_file.read().await.unwrap(); @@ -253,8 +342,16 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -280,8 +377,16 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 0); @@ -307,8 +412,16 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -339,8 +452,16 @@ pub mod handle_sync_events { is_network_conn_err: false, })) }); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -368,8 +489,16 @@ pub mod handle_ping_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!( @@ -401,8 +530,16 @@ pub mod handle_ping_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let err_streak = - handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; + let mut connect_count: u32 = 0; + let err_streak = handle_event( + &event, + &mqtt_client, + &syncer, + &device.id, + &device_file, + &mut connect_count, + ) + .await; assert_eq!(err_streak, 0); assert_eq!( @@ -450,6 +587,7 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 2, + connect_count: 0, }; let state = handle_error( state, @@ -509,6 +647,7 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 5, + connect_count: 0, }; let state = handle_error( state, @@ -571,6 +710,7 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 1, + connect_count: 0, }; let state = handle_error( state, From 031f183ed947914a14288a310d4434c1f3b70d6d Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 17:03:01 -0700 Subject: [PATCH 06/13] plan: mark mqtt-resubscribe hotfix complete --- .../20260507-mqtt-resubscribe-on-reconnect.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plans/{active => completed}/20260507-mqtt-resubscribe-on-reconnect.md (100%) diff --git a/plans/active/20260507-mqtt-resubscribe-on-reconnect.md b/plans/completed/20260507-mqtt-resubscribe-on-reconnect.md similarity index 100% rename from plans/active/20260507-mqtt-resubscribe-on-reconnect.md rename to plans/completed/20260507-mqtt-resubscribe-on-reconnect.md From 3285978ed2912eb81c0c5add53ac2eaa9a3c8585 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 18:34:32 -0700 Subject: [PATCH 07/13] plan: draft drop-m2 plan for v0.8.1 hotfix --- .../20260507-drop-m2-from-mqtt-resubscribe.md | 86 +++++++++++++++++++ 1 file changed, 86 insertions(+) create mode 100644 plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md diff --git a/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md b/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md new file mode 100644 index 00000000..01773167 --- /dev/null +++ b/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md @@ -0,0 +1,86 @@ +# Drop M2 from mqtt-resubscribe hotfix + +## Scope + +Single repo, single branch: read-write on `agent/`, branch `fix/mqtt-resubscribe-on-reconnect` (PR #60 against `release/v0.8`). + +## Purpose / Big Picture + +Drop M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) from the v0.8.1 hotfix so the PR ships only M1 (re-subscribe on every successful ConnAck) and M3 (connection counter). M1 alone closes the silent-deafness bug for all reconnect paths; skipping M2 retains headroom against the user's EMQX Dedicated tier 1,000-session cap. Operators see the same fix delivered with no change to broker session-table pressure. + +## Progress + +- [ ] Rebase `fix/mqtt-resubscribe-on-reconnect` onto M1 to drop M2. +- [ ] Update `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md` to record the drop. + +## Surprises & Discoveries + +(Add entries as work proceeds.) + +## Decision Log + +- Decision: Drop M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) from the v0.8.1 hotfix. + Rationale: User's EMQX Dedicated tier caps the session table at 1,000. `clean_session=false` makes sessions persist across disconnects, so session count would trend toward fleet size + churn instead of online device count. M1 alone closes the bug for all reconnect paths; M2 was belt-and-suspenders. Skipping M2 keeps session-cap headroom on the current tier. M2 can land separately if a tier upgrade makes sessions cheap. + Date/Author: 2026-05-07 / orchestrator on user request. + +## Outcomes & Retrospective + +(Summarize at completion.) + +## Context and Orientation + +The branch `fix/mqtt-resubscribe-on-reconnect` is already pushed and has PR #60 open against `release/v0.8`. It currently carries three M-commits — M1 `41e89c8` (re-subscribe on ConnAck), M2 `5cdd092` (clean_session=false, to drop), M3 `635efc5` (connection counter, keep) — plus three plan-related commits and `f5d5f41` ("plan: mark mqtt-resubscribe hotfix complete"), totalling 7 commits over `origin/release/v0.8`. + +M2 touches `agent/src/mqtt/options.rs`, `agent/src/mqtt/client.rs`, and `agent/tests/mqtt/options.rs`. M3 touches `agent/src/workers/mqtt.rs` and `agent/tests/workers/mqtt.rs`. The file sets are disjoint so the rebase that drops M2 will replay M3 cleanly. No `clean_session` references exist outside M2's files, so dropping M2 cannot break callers. + +The existing plan at `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md` documents all three milestones and must be updated to reflect the drop in its Plan of Work, Concrete Steps, Validation and Acceptance, and Decision Log sections. + +## Plan of Work + +1. Rebase to drop M2: `git rebase --onto 41e89c8 5cdd092 fix/mqtt-resubscribe-on-reconnect`. M3 replays cleanly given the disjoint file sets. +2. Run `./scripts/test.sh` to confirm tests pass without M2 and its test file. +3. Edit `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`: add the M2-drop Decision Log entry, change "three milestones" to "two milestones" wherever it appears, remove or strikethrough the M2 milestone block in Plan of Work and Concrete Steps, and remove M2-specific entries from Validation and Acceptance. +4. Commit the plan edits as `plan: drop M2 in light of EMQX 1k session cap` (signed). +5. Run `./scripts/preflight.sh` and confirm clean before push. + +## Concrete Steps + +### Milestone 1 — drop M2 via rebase + +No new commit added — the rebase rewrites M3 onto M1 directly. + + cd /home/ben/miru/workbench5/repos/agent + git branch --show-current # expect: fix/mqtt-resubscribe-on-reconnect + git log --oneline origin/release/v0.8..HEAD # baseline: 7 commits + git rebase --onto 41e89c8 5cdd092 fix/mqtt-resubscribe-on-reconnect + git log --oneline origin/release/v0.8..HEAD # expect: 6 commits, no 5cdd092 + git diff --stat origin/release/v0.8...HEAD # expect: only workers/mqtt.rs, tests/workers/mqtt.rs, plans/... + ./scripts/test.sh # expect: all tests pass + +### Milestone 2 — update plan doc + + # Edit plans/completed/20260507-mqtt-resubscribe-on-reconnect.md per Plan of Work step 3. + git add plans/completed/20260507-mqtt-resubscribe-on-reconnect.md + git commit -S -m "plan: drop M2 in light of EMQX 1k session cap" + +### Final preflight + + ./scripts/preflight.sh # expect: Preflight clean + +## Validation and Acceptance + +1. `git log --oneline origin/release/v0.8..HEAD` shows 6 commits, none with sha `5cdd092` and none whose subject mentions `clean_session`. M1 and M3 subjects (`re-subscribe on every successful ConnAck`, `log mqtt reconnects with a connection counter`) are present. +2. `git diff --name-only origin/release/v0.8...HEAD` lists only `agent/src/workers/mqtt.rs`, `agent/tests/workers/mqtt.rs`, and `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`. +3. `./scripts/test.sh` exits zero. +4. `./scripts/preflight.sh` reports `Preflight clean`. **This is the gate** — preflight must report clean before changes are pushed. +5. The plan doc no longer references the M2 milestone in Plan of Work, Concrete Steps, or Validation; the Decision Log includes the drop entry. + +## Idempotence and Recovery + +The rebase can be undone by `git reset --hard origin/fix/mqtt-resubscribe-on-reconnect`, which restores the pre-rebase state with M2 included. Plan-doc edits can be reverted with `git restore plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`. If the rebase reports a conflict (it should not, given file disjointness), abort with `git rebase --abort` and report — that signals an unexpected coupling worth investigating before retrying. + +## Non-goals + +- Not modifying `fix/mqtt-resubscribe-on-reconnect-main` (PR #61). The same operation will be applied there as a follow-up. +- Not bumping the Cargo workspace version. +- Not opening a new PR — the existing PR #60 will be updated by the task orchestrator. From 9b00541f4f8cfecab5288b930961dbe6aff1bb9e Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 18:36:33 -0700 Subject: [PATCH 08/13] plan: refine drop-m2 plan with accurate file/commit counts --- .../backlog/20260507-drop-m2-from-mqtt-resubscribe.md | 10 +++++----- 1 file changed, 5 insertions(+), 5 deletions(-) diff --git a/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md b/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md index 01773167..438ea05d 100644 --- a/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md +++ b/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md @@ -29,7 +29,7 @@ Drop M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) from the v0.8. ## Context and Orientation -The branch `fix/mqtt-resubscribe-on-reconnect` is already pushed and has PR #60 open against `release/v0.8`. It currently carries three M-commits — M1 `41e89c8` (re-subscribe on ConnAck), M2 `5cdd092` (clean_session=false, to drop), M3 `635efc5` (connection counter, keep) — plus three plan-related commits and `f5d5f41` ("plan: mark mqtt-resubscribe hotfix complete"), totalling 7 commits over `origin/release/v0.8`. +The branch `fix/mqtt-resubscribe-on-reconnect` is already pushed and has PR #60 open against `release/v0.8`. It currently carries three M-commits — M1 `41e89c8` (re-subscribe on ConnAck), M2 `5cdd092` (clean_session=false, to drop), M3 `635efc5` (connection counter, keep) — plus three pre-implementation plan commits, `f5d5f41` ("plan: mark mqtt-resubscribe hotfix complete"), and `9cc8054` ("plan: draft drop-m2 plan for v0.8.1 hotfix" — this very plan), totalling 8 commits over `origin/release/v0.8`. M2 touches `agent/src/mqtt/options.rs`, `agent/src/mqtt/client.rs`, and `agent/tests/mqtt/options.rs`. M3 touches `agent/src/workers/mqtt.rs` and `agent/tests/workers/mqtt.rs`. The file sets are disjoint so the rebase that drops M2 will replay M3 cleanly. No `clean_session` references exist outside M2's files, so dropping M2 cannot break callers. @@ -51,9 +51,9 @@ No new commit added — the rebase rewrites M3 onto M1 directly. cd /home/ben/miru/workbench5/repos/agent git branch --show-current # expect: fix/mqtt-resubscribe-on-reconnect - git log --oneline origin/release/v0.8..HEAD # baseline: 7 commits + git log --oneline origin/release/v0.8..HEAD # baseline: 8 commits git rebase --onto 41e89c8 5cdd092 fix/mqtt-resubscribe-on-reconnect - git log --oneline origin/release/v0.8..HEAD # expect: 6 commits, no 5cdd092 + git log --oneline origin/release/v0.8..HEAD # expect: 7 commits, no 5cdd092 git diff --stat origin/release/v0.8...HEAD # expect: only workers/mqtt.rs, tests/workers/mqtt.rs, plans/... ./scripts/test.sh # expect: all tests pass @@ -69,8 +69,8 @@ No new commit added — the rebase rewrites M3 onto M1 directly. ## Validation and Acceptance -1. `git log --oneline origin/release/v0.8..HEAD` shows 6 commits, none with sha `5cdd092` and none whose subject mentions `clean_session`. M1 and M3 subjects (`re-subscribe on every successful ConnAck`, `log mqtt reconnects with a connection counter`) are present. -2. `git diff --name-only origin/release/v0.8...HEAD` lists only `agent/src/workers/mqtt.rs`, `agent/tests/workers/mqtt.rs`, and `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`. +1. After the rebase + plan-doc edit commit, `git log --oneline origin/release/v0.8..HEAD` shows 8 commits, none with sha `5cdd092` and none whose subject mentions `clean_session`. M1 and M3 subjects (`re-subscribe on every successful ConnAck`, `log mqtt reconnects with a connection counter`) are present. +2. `git diff --name-only origin/release/v0.8...HEAD` lists only `agent/src/workers/mqtt.rs`, `agent/tests/workers/mqtt.rs`, `plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md`, and `plans/completed/20260507-mqtt-resubscribe-on-reconnect.md`. 3. `./scripts/test.sh` exits zero. 4. `./scripts/preflight.sh` reports `Preflight clean`. **This is the gate** — preflight must report clean before changes are pushed. 5. The plan doc no longer references the M2 milestone in Plan of Work, Concrete Steps, or Validation; the Decision Log includes the drop entry. From 69c5ea35bc7f365e7e8a6dd1a7353f675e49ffe7 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 18:36:41 -0700 Subject: [PATCH 09/13] plan: promote drop-m2 plan to active --- .../{backlog => active}/20260507-drop-m2-from-mqtt-resubscribe.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plans/{backlog => active}/20260507-drop-m2-from-mqtt-resubscribe.md (100%) diff --git a/plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md b/plans/active/20260507-drop-m2-from-mqtt-resubscribe.md similarity index 100% rename from plans/backlog/20260507-drop-m2-from-mqtt-resubscribe.md rename to plans/active/20260507-drop-m2-from-mqtt-resubscribe.md From 60283d92f1e5cf5a1bedad0647358539c8c1864d Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 18:42:09 -0700 Subject: [PATCH 10/13] plan: drop M2 in light of EMQX 1k session cap --- .../20260507-mqtt-resubscribe-on-reconnect.md | 88 ++++--------------- 1 file changed, 15 insertions(+), 73 deletions(-) diff --git a/plans/completed/20260507-mqtt-resubscribe-on-reconnect.md b/plans/completed/20260507-mqtt-resubscribe-on-reconnect.md index dc241a27..5df61835 100644 --- a/plans/completed/20260507-mqtt-resubscribe-on-reconnect.md +++ b/plans/completed/20260507-mqtt-resubscribe-on-reconnect.md @@ -12,17 +12,18 @@ Root cause: the agent subscribes to its two MQTT command topics exactly once, in After this change a user gains: -1. **Sync and ping commands continue to be received after a transient broker disconnect.** The agent re-subscribes on every successful `ConnAck` and also asks the broker to remember its subscriptions (`clean_session = false`). Either fix alone would close the bug; both together are belt-and-suspenders so a future regression in one does not silently re-introduce the problem. +1. **Sync and ping commands continue to be received after a transient broker disconnect.** The agent re-subscribes on every successful `ConnAck`, which closes the bug for all reconnect paths regardless of broker session policy. 2. **Reconnects are visible in INFO logs.** A connection counter is included in the "Established connection to mqtt broker" log line, so future incidents can be diagnosed from logs alone (e.g. "connection #1" then "connection #2" half a day later, before commands stopped flowing). User-visible behavior to verify: trigger a sync from the backend, kill the broker connection (e.g. via `tc`/iptables drop, or a broker-side restart), wait for the agent to reconnect, trigger another sync — the second sync command must reach the device. +Note: a third change originally planned for this hotfix — setting `clean_session=false` on the MQTT options so the broker preserves subscriptions across disconnects — was deferred. See the Decision Log for rationale. + ## Progress Add entries as work proceeds. - [ ] Milestone 1: re-subscribe on every ConnAck Success -- [ ] Milestone 2: set `clean_session = false` - [ ] Milestone 3: connection counter in reconnect log ## Surprises & Discoveries @@ -31,7 +32,9 @@ Add entries as work proceeds. ## Decision Log -Add entries as work proceeds. +- Decision: M2 (`fix(mqtt): set clean_session=false ...`, sha `5cdd092`) was dropped before merge. + Rationale: User's EMQX broker is on the Dedicated tier with a 1,000-session cap. With `clean_session=false`, sessions persist across disconnects — session count would trend toward fleet size + churn, eating into the cap headroom. M1 (re-subscribe on every successful ConnAck) alone closes the silent-deafness bug for all reconnect paths; M2 was belt-and-suspenders. M2 can land separately if a tier upgrade makes sessions cheap. + Date/Author: 2026-05-07 / orchestrator on user request. ## Outcomes & Retrospective @@ -71,19 +74,15 @@ Background a beginner needs: ## Plan of Work -Three milestones. Each ends in one signed git commit. The commits should be reviewable in isolation. - -**Milestone 1 — defensive re-subscribe.** In `handle_event`, after the existing log line and storage patch in the `ConnAck::Success` path, call `mqtt::device::subscribe_sync` and `mqtt::device::subscribe_ping` exactly the way `init_client` does. Same error strings. Subscribe failures are logged but do not change the returned `err_streak` (no need to drive the cooldown; the next reconnect will retry). This alone fixes the bug per the root-cause analysis in Purpose, even on brokers that do not honor `clean_session = false`. - -**Milestone 2 — durable broker session.** In `mqtt::Options`, add a `clean_session: bool` field defaulting to `false`. In `Client::new`, propagate it to `MqttOptions::set_clean_session`. With a stable, non-empty `client_id` (which we always have — it is the device id) this asks the broker to keep our subscriptions and queued messages across disconnects, providing a second line of defense and reducing message loss during the gap. +Two milestones. Each ends in one signed git commit. The commits should be reviewable in isolation. - rumqttc's `set_clean_session` panics if the client_id is empty *and* clean_session is false. In production `client_id` is always the device id (non-empty); in `Options::default()` the fallback `client_id` is `"miru-agent"` (also non-empty). No call site is at risk. +**Milestone 1 — defensive re-subscribe.** In `handle_event`, after the existing log line and storage patch in the `ConnAck::Success` path, call `mqtt::device::subscribe_sync` and `mqtt::device::subscribe_ping` exactly the way `init_client` does. Same error strings. Subscribe failures are logged but do not change the returned `err_streak` (no need to drive the cooldown; the next reconnect will retry). This alone fixes the bug per the root-cause analysis in Purpose, on every reconnect path. **Milestone 3 — observable reconnects.** Add a `connect_count: u32` field to `State`, initialized to `0`. Change `handle_event`'s signature to accept `connect_count: &mut u32`, and in the `ConnAck::Success` path increment it via `saturating_add(1)` before logging. Update the log line to `Established connection to mqtt broker (connection #{n})`. Update `run_impl` to pass `&mut state.connect_count`. Mechanically update existing handle_event tests to thread an `&mut counter` argument. Counting on every Success ConnAck (not just reconnects) is intentional — connection #1 is the initial connect; counts ≥2 are reconnects. That mapping makes log forensics obvious. -Order rationale: M3 last because its `handle_event` signature change forces edits to every existing test in `agent/tests/workers/mqtt.rs`; doing it last keeps M1 and M2 diffs reviewable. +Order rationale: M3 last because its `handle_event` signature change forces edits to every existing test in `agent/tests/workers/mqtt.rs`; doing it last keeps the M1 diff reviewable. (The "M3" label is preserved as-is — not renumbered — so the M3 commit subject stays traceable to the original plan. See the Decision Log for context on the gap in the milestone numbering.) ### Test strategy per milestone @@ -92,11 +91,6 @@ Order rationale: M3 last because its `handle_event` signature change forces edit - `connack_non_success_does_not_subscribe`: send a `ConnAck` with `code = ConnectReturnCode::BadUserNamePassword`. Assert no subscribe calls were made on the mock. - `connack_subscribe_error_does_not_change_err_streak`: configure `subscribe_fn` to return `MQTTError::MockErr { is_authentication_error: false, is_network_conn_err: false }`. Assert `handle_event` returns `0` (the same `err_streak` it would have returned without the failure) and does not panic. -- **Milestone 2 tests** — one new unit test in `agent/tests/mqtt/options.rs` (or, equivalently, `client.rs`): - - `default_options_have_persistent_session`: build `Options::new(Credentials::default())` and `Options::default()`. Assert `options.clean_session == false`. - - This is a flag-level assertion rather than a broker-level behavioral assertion. The broker-side check (start a `rumqttd` instance, connect with `clean_session = false`, disconnect, reconnect, assert `session_present == true`) is possible via the existing `mock::run_broker` harness in `agent/tests/mqtt/mock.rs`, but: (a) `rumqttd` 0.19's session persistence behavior is not contractually guaranteed across versions; (b) the hotfix's correctness rests on a single-line `set_clean_session(false)` call in `Client::new`, which is trivially audited by reading the diff. A flag-level test catches the only realistic regression (someone deletes the line). End-to-end reconnect testing is explicitly listed as a non-goal below. - - **Milestone 3 tests** — modify the existing `successful_connack_event` test to also assert that the counter has been incremented (`assert_eq!(connect_count, 1)` after one Success ConnAck; build a second event and verify the counter advances to `2`). All other existing `handle_event` tests need only the mechanical signature update. ### Non-negotiable: import order @@ -231,10 +225,6 @@ Run the tests: All three new tests must pass. The two pre-existing `connack` tests must continue to pass. -Verify clean_session is not yet set (it's set in milestone 2): - - grep -n "set_clean_session\|clean_session" agent/src/mqtt/client.rs agent/src/mqtt/options.rs # expect: no matches - Commit: git add -A @@ -242,53 +232,6 @@ Commit: git diff --cached --stat # sanity check git commit -S -m "fix(mqtt): re-subscribe on every successful ConnAck" -### Milestone 2 — clean_session = false - -Edit `agent/src/mqtt/options.rs`. Add a `clean_session: bool` field on `Options`: - - #[derive(Debug, Clone)] - pub struct Options { - pub connect_address: ConnectAddress, - pub credentials: Credentials, - pub client_id: String, - pub keep_alive: Duration, - pub timeouts: Timeouts, - pub capacity: usize, - pub clean_session: bool, - } - -In the `impl Options { pub fn new(...) }` constructor, set `clean_session: false`. (Do not add a `with_clean_session` builder unless needed by a test — keep the diff minimal.) - -Edit `agent/src/mqtt/client.rs`. In `Client::new`, after `set_credentials` and before the `match options.connect_address.protocol` block, add: - - mqtt_options.set_clean_session(options.clean_session); - -Add one new unit test in `agent/tests/mqtt/options.rs` (the file already exists). Place the new test inside `mod opts` (alongside `new_defaults` / `default`), so it inherits the existing `use super::*;` imports. If `Credentials` and `Options` aren't already imported in that mod's scope, add them in the proper import block: - - #[test] - fn default_options_have_persistent_session() { - let options = Options::new(Credentials { - username: "u".to_string(), - password: "p".to_string(), - }); - assert!(!options.clean_session, "clean_session must default to false so the broker preserves subscriptions across reconnects"); - - let default_options = Options::default(); - assert!(!default_options.clean_session); - } - -Run the tests: - - ./scripts/test.sh - -The new test must pass. All previously passing tests must still pass — in particular `test_mqtt_client` (which connects to a real `rumqttd`) must still succeed; with a non-empty `client_id` (`test_user`), `set_clean_session(false)` is fine. - -Commit: - - git add -A - git diff --cached --stat - git commit -S -m "fix(mqtt): set clean_session=false to preserve subscriptions across reconnects" - ### Milestone 3 — observable reconnects Edit `agent/src/workers/mqtt.rs`: @@ -396,11 +339,10 @@ Run preflight (the `preflight` skill) and confirm it reports `clean` before push **Acceptance is behavioral, not just "tests pass":** 1. After milestone 1, the `Incoming::ConnAck(Success)` arm of `handle_event` invokes `subscribe_sync` and `subscribe_ping` exactly once per call — observable as `MockClient::num_subscribe_calls_to("cmd/devices/device_id/sync") == 1` and `... ("v1/cmd/devices/device_id/ping") == 1` after a single Success ConnAck (test `connack_success_resubscribes_to_sync_and_ping`). Non-Success ConnAcks produce zero subscribe calls (test `connack_non_success_does_not_subscribe`). A subscribe failure on the ConnAck arm does not change `err_streak` (test `connack_subscribe_error_does_not_change_err_streak`). Code audit: `init_client`'s startup subscribes at `agent/src/workers/mqtt.rs:181,184` remain in place, so a running device performs two subscribes per topic on the second connect (one at startup via `init_client`, one per ConnAck via the new arm). -2. After milestone 2, both `Options::new(...)` and `Options::default()` produce a struct with `clean_session == false`, and `Client::new` propagates that to `MqttOptions::set_clean_session(false)` (audit by reading the diff in `agent/src/mqtt/client.rs`). -3. After milestone 3, on two successive `ConnAck::Success` events through `handle_event`, the threaded `connect_count` advances `0 → 1 → 2` and the INFO log line reads `Established connection to mqtt broker (connection #1)` then `... (connection #2)`. -4. `./scripts/test.sh` exits zero with all new tests included. -5. Preflight reports `clean`. -6. Manual reconnect smoke test (optional but strongly recommended before tagging v0.8.1): run the agent against a local `rumqttd` (or staging broker), confirm "connection #1" log, kill the broker connection, confirm a "connection #2" log appears with no agent restart, then trigger a sync command from the backend and confirm the agent processes it. +2. After milestone 3, on two successive `ConnAck::Success` events through `handle_event`, the threaded `connect_count` advances `0 → 1 → 2` and the INFO log line reads `Established connection to mqtt broker (connection #1)` then `... (connection #2)`. +3. `./scripts/test.sh` exits zero with all new tests included. +4. Preflight reports `clean`. +5. Manual reconnect smoke test (optional but strongly recommended before tagging v0.8.1): run the agent against a local `rumqttd` (or staging broker), confirm "connection #1" log, kill the broker connection, confirm a "connection #2" log appears with no agent restart, then trigger a sync command from the backend and confirm the agent processes it. The full integration check — disconnect-and-reconnect over a real broker — is **not** automated as part of this PR. The harness exists (`mock::run_broker`) and a follow-up plan can add it; doing so here would inflate the diff beyond the hotfix's scope. @@ -415,9 +357,9 @@ If a milestone's tests fail after the edit, the recovery path is: # or, after a bad commit: git reset --soft HEAD~1 # uncommit but keep changes staged -The signature change in milestone 3 is the only edit that breaks compilation across many files — fix all `handle_event` call sites in `agent/tests/workers/mqtt.rs` together (Cargo's compile error list will name every site). There is no risky migration here: no on-disk state changes, no API contract changes, no schema changes. The only behavioral change visible to the broker is `clean_session = false`, which is renegotiated automatically on the next connect. +The signature change in milestone 3 is the only edit that breaks compilation across many files — fix all `handle_event` call sites in `agent/tests/workers/mqtt.rs` together (Cargo's compile error list will name every site). There is no risky migration here: no on-disk state changes, no API contract changes, no schema changes. There is no broker-visible behavior change: the agent continues to use rumqttc's default `clean_session = true`. -If a hotfix needs to be reverted in the field, `git revert` of any of the three commits independently is safe; reverting milestone 1 while keeping milestone 2 leaves the original bug only partially mitigated (broker-side session might survive but the agent still doesn't replay subscribes if the broker forgets), so revert in reverse order if reverting more than one. +If a hotfix needs to be reverted in the field, `git revert` of either of the two commits independently is safe; reverting M1 while keeping M3 leaves the bug present and the new connection counter still in place, so prefer reverting both if reverting at all. ## Non-goals From dbe55b6c142e99836c5b481f0dc36449a04c8181 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 18:45:44 -0700 Subject: [PATCH 11/13] plan: mark drop-m2 hotfix complete --- .../20260507-drop-m2-from-mqtt-resubscribe.md | 0 1 file changed, 0 insertions(+), 0 deletions(-) rename plans/{active => completed}/20260507-drop-m2-from-mqtt-resubscribe.md (100%) diff --git a/plans/active/20260507-drop-m2-from-mqtt-resubscribe.md b/plans/completed/20260507-drop-m2-from-mqtt-resubscribe.md similarity index 100% rename from plans/active/20260507-drop-m2-from-mqtt-resubscribe.md rename to plans/completed/20260507-drop-m2-from-mqtt-resubscribe.md From 4409e4fdadc8601c39a8c50c36db5b1f49df99a8 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 20:03:37 -0700 Subject: [PATCH 12/13] refactor(mqtt): extract subscribe_all helper Both init_client and the ConnAck Success arm of handle_event were issuing subscribe_sync followed by subscribe_ping with identical error handling. Bundle the pair into mqtt::device::subscribe_all so the worker calls it in one line. Adds three unit tests covering the helper's happy path, short-circuit-on-sync-error, and error-propagation-on-ping-error behaviors. --- agent/src/mqtt/device.rs | 6 ++++ agent/src/workers/mqtt.rs | 14 +++----- agent/tests/mqtt/device.rs | 71 ++++++++++++++++++++++++++++++++++++++ 3 files changed, 81 insertions(+), 10 deletions(-) diff --git a/agent/src/mqtt/device.rs b/agent/src/mqtt/device.rs index a7bae6db..dd8f0b60 100644 --- a/agent/src/mqtt/device.rs +++ b/agent/src/mqtt/device.rs @@ -14,6 +14,12 @@ pub type SyncDevice = backend_api::models::SyncDevice; pub type Ping = backend_api::models::Ping; pub type Pong = backend_api::models::Pong; +pub async fn subscribe_all(client: &impl ClientI, device_id: &str) -> Result<(), MQTTError> { + subscribe_sync(client, device_id).await?; + subscribe_ping(client, device_id).await?; + Ok(()) +} + pub async fn subscribe_sync(client: &impl ClientI, device_id: &str) -> Result<(), MQTTError> { let topic = device_sync(device_id); client.subscribe(&topic, QoS::AtLeastOnce).await diff --git a/agent/src/workers/mqtt.rs b/agent/src/workers/mqtt.rs index 3795e993..8541af3d 100644 --- a/agent/src/workers/mqtt.rs +++ b/agent/src/workers/mqtt.rs @@ -180,11 +180,8 @@ async fn init_client( let (mqtt_client, eventloop) = mqtt::Client::new(&options).await; // subscribe to device synchronization updates - if let Err(e) = mqtt::device::subscribe_sync(&mqtt_client, device_id).await { - error!("error subscribing to device synchronization updates: {e:?}"); - }; - if let Err(e) = mqtt::device::subscribe_ping(&mqtt_client, device_id).await { - error!("error subscribing to device ping updates: {e:?}"); + if let Err(e) = mqtt::device::subscribe_all(&mqtt_client, device_id).await { + error!("error subscribing to device topic: {e:?}"); }; (mqtt_client, eventloop) @@ -239,11 +236,8 @@ pub async fn handle_event( // Re-subscribe on every successful (re)connect. rumqttc auto-reconnects // internally without replaying subscribes, and the broker may have // discarded session state, so we must restate our subscriptions here. - if let Err(e) = mqtt::device::subscribe_sync(mqtt_client, device_id).await { - error!("error subscribing to device synchronization updates: {e:?}"); - }; - if let Err(e) = mqtt::device::subscribe_ping(mqtt_client, device_id).await { - error!("error subscribing to device ping updates: {e:?}"); + if let Err(e) = mqtt::device::subscribe_all(mqtt_client, device_id).await { + error!("error subscribing to device topic: {e:?}"); }; } // update the device connection status on successful disconnections diff --git a/agent/tests/mqtt/device.rs b/agent/tests/mqtt/device.rs index 129d3c29..edb35b82 100644 --- a/agent/tests/mqtt/device.rs +++ b/agent/tests/mqtt/device.rs @@ -16,6 +16,77 @@ fn mock_error() -> MQTTError { }) } +mod subscribe_all { + use std::sync::atomic::{AtomicUsize, Ordering}; + use std::sync::Arc; + + use super::*; + + #[tokio::test] + async fn happy_path() { + let client = MockClient::default(); + device::subscribe_all(&client, "dvc_123").await.unwrap(); + + let calls = client.get_calls(); + assert_eq!(calls.len(), 2); + assert!(matches!( + &calls[0], + MockCall::Subscribe { topic, qos } + if topic == "cmd/devices/dvc_123/sync" && *qos == QoS::AtLeastOnce + )); + assert!(matches!( + &calls[1], + MockCall::Subscribe { topic, qos } + if topic == "v1/cmd/devices/dvc_123/ping" && *qos == QoS::AtLeastOnce + )); + } + + #[tokio::test] + async fn subscribe_sync_error_short_circuits() { + let client = MockClient { + subscribe_fn: Box::new(|| Err(Box::new(mock_error()))), + ..Default::default() + }; + let result = device::subscribe_all(&client, "dvc_123").await; + assert!(result.is_err()); + + // The `?` after subscribe_sync must short-circuit before subscribe_ping, + // so only the sync subscribe was attempted. + let calls = client.get_calls(); + assert_eq!(calls.len(), 1); + assert!(matches!( + &calls[0], + MockCall::Subscribe { topic, .. } if topic == "cmd/devices/dvc_123/sync" + )); + } + + #[tokio::test] + async fn subscribe_ping_error_propagates() { + let counter = Arc::new(AtomicUsize::new(0)); + let counter_for_fn = counter.clone(); + let client = MockClient { + subscribe_fn: Box::new(move || { + if counter_for_fn.fetch_add(1, Ordering::SeqCst) == 0 { + Ok(()) + } else { + Err(Box::new(mock_error())) + } + }), + ..Default::default() + }; + let result = device::subscribe_all(&client, "dvc_123").await; + assert!(result.is_err()); + + // sync succeeded, ping was attempted and failed — both calls recorded. + let calls = client.get_calls(); + assert_eq!(calls.len(), 2); + assert!(matches!( + &calls[1], + MockCall::Subscribe { topic, .. } if topic == "v1/cmd/devices/dvc_123/ping" + )); + } +} + mod subscribe_sync { use super::*; From 439e0b9e2fa419509efd962c1746713e44b3b959 Mon Sep 17 00:00:00 2001 From: Benjamin Smidt Date: Thu, 7 May 2026 20:04:06 -0700 Subject: [PATCH 13/13] refactor(mqtt): drop connect_count from State and handle_event The connection counter added in 'feat(mqtt): log mqtt reconnects with a connection counter' threaded a &mut u32 through handle_event purely so the ConnAck Success log could include 'connection #{n}'. The signature churn outweighs the diagnostic value: a future incident-responder can just grep -c the log line, and EMQX-side connection-event logging gives the same signal at the broker. Remove the field, parameter, and counter plumbing; revert the log line to its original form. --- agent/src/workers/mqtt.rs | 10 +- agent/tests/workers/mqtt.rs | 186 +++++------------------------------- 2 files changed, 24 insertions(+), 172 deletions(-) diff --git a/agent/src/workers/mqtt.rs b/agent/src/workers/mqtt.rs index 8541af3d..c22b84d0 100644 --- a/agent/src/workers/mqtt.rs +++ b/agent/src/workers/mqtt.rs @@ -108,7 +108,6 @@ pub async fn run_impl { @@ -216,7 +214,6 @@ pub async fn handle_event( syncer: &SyncerT, device_id: &str, device_stor: &storage::Device, - connect_count: &mut u32, ) -> ErrStreak { let err_streak = 0; @@ -226,11 +223,7 @@ pub async fn handle_event( if connack.code != ConnectReturnCode::Success { return err_streak; } - *connect_count = connect_count.saturating_add(1); - info!( - "Established connection to mqtt broker (connection #{count})", - count = *connect_count - ); + info!("Established connection to mqtt broker"); let _ = device_stor.patch(device::Updates::connected()).await; // Re-subscribe on every successful (re)connect. rumqttc auto-reconnects @@ -310,7 +303,6 @@ pub struct State { pub client: mqtt::Client, pub eventloop: EventLoop, pub err_streak: ErrStreak, - pub connect_count: u32, } pub async fn handle_error( diff --git a/agent/tests/workers/mqtt.rs b/agent/tests/workers/mqtt.rs index 3568070f..6a36f814 100644 --- a/agent/tests/workers/mqtt.rs +++ b/agent/tests/workers/mqtt.rs @@ -72,16 +72,8 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 0); @@ -110,17 +102,9 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; let before_event = Utc::now(); - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!(err_streak, 0); let device = device_file.read().await.unwrap(); @@ -145,16 +129,7 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let _ = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let _ = handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!( mqtt_client.num_subscribe_calls_to(&topics::device_sync("device_id")), @@ -182,16 +157,8 @@ pub mod handle_connection_events { })); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!(err_streak, 0); assert_eq!( @@ -228,59 +195,11 @@ pub mod handle_connection_events { ..Default::default() }; let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!(err_streak, 0); } - #[tokio::test] - async fn connack_success_increments_connect_count() { - let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); - let layout = Layout::new(dir); - let (device_file, _) = - storage::Device::spawn_with_default(64, layout.device(), Device::default()) - .await - .unwrap(); - - let event = Event::Incoming(Incoming::ConnAck(ConnAck { - code: ConnectReturnCode::Success, - session_present: false, - })); - let mqtt_client = MockClient::default(); - let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - - let _ = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; - assert_eq!(connect_count, 1); - - let _ = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; - assert_eq!(connect_count, 2); - } - #[tokio::test] async fn disconnect_event() { let dir = filesys::Dir::create_temp_dir("testing").await.unwrap(); @@ -301,17 +220,9 @@ pub mod handle_connection_events { let event = Event::Incoming(Incoming::Disconnect); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; let before_event = Utc::now(); - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - "device_id", - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, "device_id", &device_file).await; assert_eq!(err_streak, 0); let device = device_file.read().await.unwrap(); @@ -342,16 +253,8 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -377,16 +280,8 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 0); @@ -412,16 +307,8 @@ pub mod handle_sync_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -452,16 +339,8 @@ pub mod handle_sync_events { is_network_conn_err: false, })) }); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!(syncer.num_sync_calls(), 1); @@ -489,16 +368,8 @@ pub mod handle_ping_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!( @@ -530,16 +401,8 @@ pub mod handle_ping_events { ))); let mqtt_client = MockClient::default(); let syncer = MockSyncer::default(); - let mut connect_count: u32 = 0; - let err_streak = handle_event( - &event, - &mqtt_client, - &syncer, - &device.id, - &device_file, - &mut connect_count, - ) - .await; + let err_streak = + handle_event(&event, &mqtt_client, &syncer, &device.id, &device_file).await; assert_eq!(err_streak, 0); assert_eq!( @@ -587,7 +450,6 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 2, - connect_count: 0, }; let state = handle_error( state, @@ -647,7 +509,6 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 5, - connect_count: 0, }; let state = handle_error( state, @@ -710,7 +571,6 @@ pub mod handle_mqtt_error { client, eventloop, err_streak: 1, - connect_count: 0, }; let state = handle_error( state,