From 814289d8f1850e90b1290c953fa52ef74a274572 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Tue, 17 Mar 2026 20:36:22 -0500 Subject: [PATCH 1/8] Serialize generic MeshCore command responses --- .../MeshCore/Session/MeshCoreSession.swift | 145 ++++++++--------- .../MeshCore/Session/RequestContext.swift | 47 ++++++ ...shCoreSessionCommandCorrelationTests.swift | 147 ++++++++++++++++++ 3 files changed, 269 insertions(+), 70 deletions(-) create mode 100644 MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 20fda9b0d..6e228c29e 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -84,6 +84,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { private let dispatcher = EventDispatcher() private let pendingRequests = PendingRequests() private let binaryRequestSerializer = BinaryRequestSerializer() + private let requestResponseSerializer = RequestResponseSerializer() // State private var contactManager = ContactManager() @@ -475,37 +476,11 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { matching predicate: @escaping @Sendable (MeshEvent) -> T?, timeout: TimeInterval? = nil ) async throws -> T { - let effectiveTimeout = timeout ?? configuration.defaultTimeout - - // Subscribe BEFORE sending to avoid race condition - let events = await dispatcher.subscribe() - - // Send after subscribing - try await transport.send(data) - - // Now wait for matching event - return try await withThrowingTaskGroup(of: T?.self) { group in - group.addTask { - for await event in events { - if Task.isCancelled { return nil } - if let result = predicate(event) { - return result - } - } - return nil - } - - group.addTask { [clock = self.clock] in - try await clock.sleep(for: .seconds(effectiveTimeout)) - return nil + try await sendAndMatch(data, timeout: timeout) { event in + if let result = predicate(event) { + return .success(result) } - - if let result = try await group.next() ?? nil { - group.cancelAll() - return result - } - group.cancelAll() - throw MeshCoreError.timeout + return .ignore } } @@ -521,43 +496,69 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { private func sendAndWaitWithError( _ data: Data, matching successPredicate: @escaping @Sendable (MeshEvent) -> T?, + errorMatcher: (@Sendable (MeshEvent) -> MeshCoreError?)? = nil, timeout: TimeInterval? = nil ) async throws -> T { - let effectiveTimeout = timeout ?? configuration.defaultTimeout - - // Subscribe BEFORE sending to avoid race condition - let events = await dispatcher.subscribe() + try await sendAndMatch(data, timeout: timeout) { event in + if let error = errorMatcher?(event) { + return .failure(error) + } + if let result = successPredicate(event) { + return .success(result) + } + return .ignore + } + } - // Send after subscribing - try await transport.send(data) + private enum ResponseDisposition { + case success(T) + case failure(MeshCoreError) + case ignore + } - // Now wait for matching event - return try await withThrowingTaskGroup(of: T?.self) { group in - group.addTask { - for await event in events { - if Task.isCancelled { return nil } - // Check for error response first - if case .error(let code) = event { - throw MeshCoreError.deviceError(code: code ?? 0) - } - if let result = successPredicate(event) { - return result + private func sendAndMatch( + _ data: Data, + timeout: TimeInterval? = nil, + matching matcher: @escaping @Sendable (MeshEvent) -> ResponseDisposition + ) async throws -> T { + try await requestResponseSerializer.withSerialization { [self] in + let effectiveTimeout = timeout ?? configuration.defaultTimeout + + // Subscribe BEFORE sending to avoid race condition + let events = await dispatcher.subscribe() + + // Send after subscribing + try await transport.send(data) + + return try await withThrowingTaskGroup(of: T?.self) { group in + group.addTask { + for await event in events { + if Task.isCancelled { return nil } + + switch matcher(event) { + case .success(let result): + return result + case .failure(let error): + throw error + case .ignore: + continue + } } + return nil } - return nil - } - group.addTask { [clock = self.clock] in - try await clock.sleep(for: .seconds(effectiveTimeout)) - return nil - } + group.addTask { [clock = self.clock] in + try await clock.sleep(for: .seconds(effectiveTimeout)) + return nil + } - if let result = try await group.next() ?? nil { + if let result = try await group.next() ?? nil { + group.cancelAll() + return result + } group.cancelAll() - return result + throw MeshCoreError.timeout } - group.cancelAll() - throw MeshCoreError.timeout } } @@ -825,11 +826,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter flood: If `true`, the advertisement is broadcast using flood routing. /// - Throws: ``MeshCoreError/timeout`` or ``MeshCoreError/deviceError(code:)`` on failure. public func sendAdvertisement(flood: Bool = false) async throws { - let data = PacketBuilder.sendAdvertisement(flood: flood) - let _: Bool = try await sendAndWaitWithError(data) { event in - if case .ok = event { return true } - return nil - } + try await sendSimpleCommand(PacketBuilder.sendAdvertisement(flood: flood)) } /// Requests status information from a remote node using the binary protocol. @@ -1192,10 +1189,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func setAutoAddConfig(_ config: AutoAddConfig) async throws { - try await sendAndWaitWithError(PacketBuilder.setAutoAddConfig(config)) { event in - if case .ok = event { return () } - return nil - } + try await sendSimpleCommand(PacketBuilder.setAutoAddConfig(config)) } /// Returns the current device configuration from selfInfo. @@ -2357,10 +2351,21 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Sends a command and waits for an "OK" response from the device. private func sendSimpleCommand(_ data: Data) async throws { - let _: Bool = try await sendAndWaitWithError(data) { event in - if case .ok = event { return true } - return nil - } + let _: Bool = try await sendAndWaitWithError( + data, + matching: { event in + if case .ok(let value) = event, value == nil { + return true + } + return nil + }, + errorMatcher: { event in + if case .error(let code) = event { + return MeshCoreError.deviceError(code: code ?? 0) + } + return nil + } + ) } /// The background loop for receiving data from the transport. diff --git a/MeshCore/Sources/MeshCore/Session/RequestContext.swift b/MeshCore/Sources/MeshCore/Session/RequestContext.swift index e2a753c9d..e0f9253de 100644 --- a/MeshCore/Sources/MeshCore/Session/RequestContext.swift +++ b/MeshCore/Sources/MeshCore/Session/RequestContext.swift @@ -244,3 +244,50 @@ public actor BinaryRequestSerializer { } } } + +/// Serializes broad command-response operations that rely on event matching. +/// +/// Many MeshCore commands wait for generic events such as `.ok`, `.error`, or a +/// singleton typed response. Serializing those request/response exchanges avoids +/// cross-command event miscorrelation when multiple callers issue commands at once. +public actor RequestResponseSerializer { + private var isRequestInFlight = false + private var waiters: [CheckedContinuation] = [] + + /// Acquires the serializer, waiting if another request/response exchange is active. + public func acquire() async { + if !isRequestInFlight { + isRequestInFlight = true + return + } + + await withCheckedContinuation { continuation in + waiters.append(continuation) + } + } + + /// Releases the serializer to the next waiting request. + public func release() { + if let next = waiters.first { + waiters.removeFirst() + next.resume() + } else { + isRequestInFlight = false + } + } + + /// Executes a request/response operation while holding the serializer. + public func withSerialization( + _ operation: @Sendable () async throws -> T + ) async throws -> T { + await acquire() + do { + let result = try await operation() + release() + return result + } catch { + release() + throw error + } + } +} diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift new file mode 100644 index 000000000..f97a42e80 --- /dev/null +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -0,0 +1,147 @@ +import Foundation +import Testing +@testable import MeshCore + +@Suite("MeshCoreSession command correlation") +struct MeshCoreSessionCommandCorrelationTests { + @Test("simple commands serialize concurrent OK/ERROR waits") + func simpleCommandsSerializeConcurrentOKWaits() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let first = Task { + try await session.factoryReset() + } + let second = Task { + try await session.sendAdvertisement(flood: true) + } + + try await waitUntil("first command should be sent") { + await transport.sentData.count == 2 + } + + try? await Task.sleep(for: .milliseconds(50)) + #expect(await transport.sentData.count == 2) + + await transport.simulateOK() + + try await waitUntil("second command should wait for the first command to complete") { + await transport.sentData.count == 3 + } + + await transport.simulateOK() + + try await first.value + try await second.value + await session.stop() + } + + @Test("simple commands ignore OK responses with payloads") + func simpleCommandsIgnoreOKResponsesWithPayloads() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let resetTask = Task { + try await session.factoryReset() + } + + try await waitUntil("factoryReset should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateOK(value: 7) + + let error = await #expect(throws: MeshCoreError.self) { + try await resetTask.value + } + guard case .timeout? = error else { + Issue.record("Expected timeout after unrelated OK payload, got \(String(describing: error))") + await session.stop() + return + } + + await session.stop() + } + + @Test("simple commands still fail on device errors") + func simpleCommandsStillFailOnDeviceErrors() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let commandTask = Task { + try await session.setAutoAddConfig(AutoAddConfig(bitmask: 0x1E, maxHops: 2)) + } + + try await waitUntil("setAutoAddConfig should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 42) + + let error = await #expect(throws: MeshCoreError.self) { + try await commandTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 42) + + await session.stop() + } +} + +private func startSession( + _ session: MeshCoreSession, + transport: MockTransport +) async throws { + let startTask = Task { + try await session.start() + } + + try await waitUntil("transport should send appStart before session starts") { + await transport.sentData.count == 1 + } + + await transport.simulateReceive(makeSelfInfoPacket()) + try await startTask.value +} + +private func makeSelfInfoPacket() -> Data { + var payload = Data() + payload.append(1) + payload.append(UInt8(bitPattern: 22)) + payload.append(UInt8(bitPattern: 22)) + payload.append(Data(repeating: 0x01, count: 32)) + payload.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + payload.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + payload.append(0) + payload.append(0) + payload.append(0) + payload.append(0) + payload.append(contentsOf: withUnsafeBytes(of: UInt32(915_000).littleEndian) { Array($0) }) + payload.append(contentsOf: withUnsafeBytes(of: UInt32(125_000).littleEndian) { Array($0) }) + payload.append(7) + payload.append(5) + payload.append(contentsOf: "Test".utf8) + + var packet = Data([ResponseCode.selfInfo.rawValue]) + packet.append(payload) + return packet +} From 0b53b84ccc0958c15b85aee2dc86209197dde662 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Tue, 17 Mar 2026 20:42:55 -0500 Subject: [PATCH 2/8] Tighten typed MeshCore response matching --- .../MeshCore/Session/MeshCoreSession.swift | 21 +-- ...shCoreSessionCommandCorrelationTests.swift | 143 ++++++++++++++++++ 2 files changed, 155 insertions(+), 9 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 6e228c29e..8cba47a8b 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -573,7 +573,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func sendAppStart() async throws -> SelfInfo { let data = PacketBuilder.appStart(clientId: configuration.clientIdentifier) - return try await sendAndWaitWithError(data) { event in + return try await sendAndWait(data) { event in if case .selfInfo(let info) = event { return info } return nil } @@ -586,7 +586,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func queryDevice() async throws -> DeviceCapabilities { let data = PacketBuilder.deviceQuery() - return try await sendAndWaitWithError(data) { event in + return try await sendAndWait(data) { event in if case .deviceInfo(let info) = event { return info } return nil } @@ -598,8 +598,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func getBattery() async throws -> BatteryInfo { - let data = PacketBuilder.getBattery() - return try await sendAndWaitWithError(data) { event in + try await sendAndWait(PacketBuilder.getBattery()) { event in if case .battery(let info) = event { return info } return nil } @@ -1048,7 +1047,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func getRepeatFreq() async throws -> [FrequencyRange] { - try await sendAndWaitWithError(PacketBuilder.getRepeatFreq()) { event in + try await sendAndWait(PacketBuilder.getRepeatFreq()) { event in if case .allowedRepeatFreq(let ranges) = event { return ranges } return nil } @@ -1177,7 +1176,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func getAutoAddConfig() async throws -> AutoAddConfig { - try await sendAndWaitWithError(PacketBuilder.getAutoAddConfig()) { event in + try await sendAndWait(PacketBuilder.getAutoAddConfig()) { event in if case .autoAddConfig(let config) = event { return config } return nil } @@ -1239,8 +1238,12 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Returns: Device telemetry including battery, temperature, and sensor data. /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. public func getSelfTelemetry() async throws -> TelemetryResponse { - try await sendAndWait(PacketBuilder.getSelfTelemetry()) { event in - if case .telemetryResponse(let response) = event { return response } + let expectedPrefix = selfInfo.map { Data($0.publicKey.prefix(6)) } + return try await sendAndWait(PacketBuilder.getSelfTelemetry()) { event in + if case .telemetryResponse(let response) = event, + expectedPrefix == nil || response.publicKeyPrefix == expectedPrefix { + return response + } return nil } } @@ -1757,7 +1760,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. public func getChannel(index: UInt8) async throws -> ChannelInfo { try await sendAndWait(PacketBuilder.getChannel(index: index)) { event in - if case .channelInfo(let info) = event { return info } + if case .channelInfo(let info) = event, info.index == index { return info } return nil } } diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift index f97a42e80..edd825bd9 100644 --- a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -105,6 +105,123 @@ struct MeshCoreSessionCommandCorrelationTests { await session.stop() } + + @Test("session start ignores unrelated errors until selfInfo arrives") + func sessionStartIgnoresUnrelatedErrorsUntilSelfInfoArrives() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + let startTask = Task { + try await session.start() + } + + try await waitUntil("transport should send appStart before session starts") { + await transport.sentData.count == 1 + } + + await transport.simulateError(code: 99) + await transport.simulateReceive(makeSelfInfoPacket()) + + try await startTask.value + #expect(await session.currentSelfInfo?.name == "Test") + await session.stop() + } + + @Test("getBattery ignores unrelated errors while waiting for a battery response") + func getBatteryIgnoresUnrelatedErrorsWhileWaitingForBatteryResponse() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let batteryTask = Task { + try await session.getBattery() + } + + try await waitUntil("getBattery should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 10) + await transport.simulateReceive(makeBatteryPacket(level: 4018)) + + let battery = try await batteryTask.value + #expect(battery.level == 4018) + await session.stop() + } + + @Test("getSelfTelemetry ignores telemetry for other nodes") + func getSelfTelemetryIgnoresTelemetryForOtherNodes() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let telemetryTask = Task { + try await session.getSelfTelemetry() + } + + try await waitUntil("getSelfTelemetry should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]), + lppPayload: Data([0x01, 0x67, 0x00, 0xFA]) + ) + ) + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data(repeating: 0x01, count: 6), + lppPayload: Data([0x01, 0x67, 0x00, 0xF0]) + ) + ) + + let response = try await telemetryTask.value + #expect(response.publicKeyPrefix == Data(repeating: 0x01, count: 6)) + await session.stop() + } + + @Test("getChannel ignores responses for other channel indexes") + func getChannelIgnoresResponsesForOtherChannelIndexes() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let channelTask = Task { + try await session.getChannel(index: 3) + } + + try await waitUntil("getChannel should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeChannelInfoPacket(index: 9, name: "Wrong", secret: Data(repeating: 0xAA, count: 16)) + ) + await transport.simulateReceive( + makeChannelInfoPacket(index: 3, name: "Right", secret: Data(repeating: 0xBB, count: 16)) + ) + + let channel = try await channelTask.value + #expect(channel.index == 3) + #expect(channel.name == "Right") + await session.stop() + } } private func startSession( @@ -145,3 +262,29 @@ private func makeSelfInfoPacket() -> Data { packet.append(payload) return packet } + +private func makeBatteryPacket(level: UInt16) -> Data { + var packet = Data([ResponseCode.battery.rawValue]) + packet.append(contentsOf: withUnsafeBytes(of: level.littleEndian) { Array($0) }) + return packet +} + +private func makeTelemetryPacket(publicKeyPrefix: Data, lppPayload: Data) -> Data { + var packet = Data([ResponseCode.telemetryResponse.rawValue]) + packet.append(0x00) + packet.append(publicKeyPrefix) + packet.append(lppPayload) + return packet +} + +private func makeChannelInfoPacket(index: UInt8, name: String, secret: Data) -> Data { + var packet = Data([ResponseCode.channelInfo.rawValue, index]) + let nameBytes = Array(name.utf8.prefix(31)) + packet.append(contentsOf: nameBytes) + packet.append(0) + if nameBytes.count < 31 { + packet.append(Data(repeating: 0, count: 31 - nameBytes.count)) + } + packet.append(secret) + return packet +} From 3117ac8aa95c5743fc4817ed94f509ad0e975f51 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Tue, 17 Mar 2026 20:48:52 -0500 Subject: [PATCH 3/8] Correlate routed MeshCore binary responses by node --- .../MeshCore/Session/MeshCoreSession.swift | 17 +--- ...shCoreSessionCommandCorrelationTests.swift | 90 +++++++++++++++++++ 2 files changed, 92 insertions(+), 15 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 8cba47a8b..03f028034 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -877,9 +877,6 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() - case .error(let code): - throw MeshCoreError.deviceError(code: code ?? 0) - case .binaryResponse(let tag, let responseData): // Match by expectedAck (4-byte tag from firmware) guard let expected = expectedAck, tag == expected else { continue } @@ -896,6 +893,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { case .statusResponse(let response): // Handle already-routed response (if routing happens elsewhere) + guard response.publicKeyPrefix == publicKeyPrefix else { continue } let elapsed = ContinuousClock.now - startTime logger.info("Status request to \(prefixHex): routed response received in \(elapsed)") return response @@ -1844,9 +1842,6 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() - case .error(let code): - throw MeshCoreError.deviceError(code: code ?? 0) - case .binaryResponse(let tag, let responseData): // Match by expectedAck (4-byte tag from firmware) guard let expected = expectedAck, tag == expected else { continue } @@ -1861,6 +1856,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { case .telemetryResponse(let response): // Handle already-routed response (if routing happens elsewhere) + guard response.publicKeyPrefix == publicKeyPrefix else { continue } let elapsed = ContinuousClock.now - startTime logger.info("Telemetry request to \(prefixHex): routed response received in \(elapsed)") return response @@ -1958,9 +1954,6 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() - case .error(let code): - throw MeshCoreError.deviceError(code: code ?? 0) - case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } let entries = MMAParser.parse(responseData) @@ -2034,9 +2027,6 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() - case .error(let code): - throw MeshCoreError.deviceError(code: code ?? 0) - case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } let entries = ACLParser.parse(responseData) @@ -2142,9 +2132,6 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() - case .error(let code): - throw MeshCoreError.deviceError(code: code ?? 0) - case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } return NeighboursParser.parse( diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift index edd825bd9..ec655557f 100644 --- a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -222,6 +222,73 @@ struct MeshCoreSessionCommandCorrelationTests { #expect(channel.name == "Right") await session.stop() } + + @Test("requestStatus ignores unrelated errors and wrong-node status responses") + func requestStatusIgnoresUnrelatedErrorsAndWrongNodeResponses() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let target = Data(repeating: 0x31, count: 32) + let statusTask = Task { + try await session.requestStatus(from: target) + } + + try await waitUntil("requestStatus should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 10) + await transport.simulateReceive(makeStatusResponsePacket(publicKeyPrefix: Data(repeating: 0x99, count: 6), battery: 3900)) + await transport.simulateReceive(makeStatusResponsePacket(publicKeyPrefix: Data(repeating: 0x31, count: 6), battery: 4010)) + + let response = try await statusTask.value + #expect(response.publicKeyPrefix == Data(repeating: 0x31, count: 6)) + #expect(response.battery == 4010) + await session.stop() + } + + @Test("requestTelemetry ignores unrelated errors and wrong-node telemetry responses") + func requestTelemetryIgnoresUnrelatedErrorsAndWrongNodeResponses() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let target = Data(repeating: 0x31, count: 32) + let telemetryTask = Task { + try await session.requestTelemetry(from: target) + } + + try await waitUntil("requestTelemetry should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 11) + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data(repeating: 0x88, count: 6), + lppPayload: Data([0x01, 0x67, 0x00, 0xFA]) + ) + ) + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data(repeating: 0x31, count: 6), + lppPayload: Data([0x01, 0x67, 0x00, 0xF0]) + ) + ) + + let response = try await telemetryTask.value + #expect(response.publicKeyPrefix == Data(repeating: 0x31, count: 6)) + await session.stop() + } } private func startSession( @@ -277,6 +344,29 @@ private func makeTelemetryPacket(publicKeyPrefix: Data, lppPayload: Data) -> Dat return packet } +private func makeStatusResponsePacket(publicKeyPrefix: Data, battery: UInt16) -> Data { + var packet = Data([ResponseCode.statusResponse.rawValue, 0x00]) + packet.append(publicKeyPrefix) + packet.append(contentsOf: withUnsafeBytes(of: battery.littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(-110).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(-85).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(100).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(50).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(25).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(3600).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(5).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(10).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(15).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(20).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + return packet +} + private func makeChannelInfoPacket(index: UInt8, name: String, secret: Data) -> Data { var packet = Data([ResponseCode.channelInfo.rawValue, index]) let nameBytes = Array(name.utf8.prefix(31)) From 7446c7256fd8f9f447c13a054f47b21c0b1e2580 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Tue, 17 Mar 2026 20:56:52 -0500 Subject: [PATCH 4/8] Correlate contact and private-key responses --- .../MeshCore/Session/MeshCoreSession.swift | 7 +- ...shCoreSessionCommandCorrelationTests.swift | 86 +++++++++++++++++++ 2 files changed, 91 insertions(+), 2 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 03f028034..499043a47 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -690,7 +690,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { return try await sendAndWait(data) { event in switch event { case .contact(let contact): - return contact + return contact.publicKey == publicKey ? contact : nil case .error: // Contact not found returns error, treat as nil return nil as MeshContact? @@ -1293,9 +1293,12 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { let succeeded: Bool = try await sendAndWaitWithError( PacketBuilder.importPrivateKey(key) ) { event in - if case .ok = event { return true } + if case .ok(value: nil) = event { return true } if case .disabled = event { return false } return nil + } errorMatcher: { event in + guard case .error(let code?) = event else { return nil } + return MeshCoreError.deviceError(code: code) } if !succeeded { throw MeshCoreError.featureDisabled diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift index ec655557f..6dac5174b 100644 --- a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -223,6 +223,70 @@ struct MeshCoreSessionCommandCorrelationTests { await session.stop() } + @Test("getContact ignores responses for other public keys") + func getContactIgnoresResponsesForOtherPublicKeys() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let requestedKey = Data(repeating: 0x11, count: 32) + let contactTask = Task { + try await session.getContact(publicKey: requestedKey) + } + + try await waitUntil("getContact should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeContactPacket(publicKey: Data(repeating: 0x22, count: 32), name: "Wrong") + ) + await transport.simulateReceive( + makeContactPacket(publicKey: requestedKey, name: "Right") + ) + + let contact = try #require(await contactTask.value) + #expect(contact.publicKey == requestedKey) + #expect(contact.advertisedName == "Right") + await session.stop() + } + + @Test("importPrivateKey ignores OK responses with payloads") + func importPrivateKeyIgnoresOKResponsesWithPayloads() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let importTask = Task { + try await session.importPrivateKey(Data(repeating: 0x33, count: 64)) + } + + try await waitUntil("importPrivateKey should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateOK(value: 7) + + let error = await #expect(throws: MeshCoreError.self) { + try await importTask.value + } + guard case .timeout? = error else { + Issue.record("Expected timeout after unrelated OK payload, got \(String(describing: error))") + await session.stop() + return + } + + await session.stop() + } + @Test("requestStatus ignores unrelated errors and wrong-node status responses") func requestStatusIgnoresUnrelatedErrorsAndWrongNodeResponses() async throws { let transport = MockTransport() @@ -378,3 +442,25 @@ private func makeChannelInfoPacket(index: UInt8, name: String, secret: Data) -> packet.append(secret) return packet } + +private func makeContactPacket(publicKey: Data, name: String) -> Data { + var packet = Data([ResponseCode.contact.rawValue]) + packet.append(publicKey) + packet.append(ContactType.chat.rawValue) + packet.append(ContactFlags().rawValue) + packet.append(0xFF) + packet.append(Data(repeating: 0, count: 64)) + + let nameBytes = Array(name.utf8.prefix(31)) + packet.append(contentsOf: nameBytes) + packet.append(0) + if nameBytes.count < 31 { + packet.append(Data(repeating: 0, count: 31 - nameBytes.count)) + } + + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + return packet +} From 73991c5479fec47417ed5df753ad1d5d815bee21 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Wed, 18 Mar 2026 18:08:49 -0500 Subject: [PATCH 5/8] Simplify request serializer release --- .../MeshCore/Session/RequestContext.swift | 20 ++++--------------- 1 file changed, 4 insertions(+), 16 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/RequestContext.swift b/MeshCore/Sources/MeshCore/Session/RequestContext.swift index e0f9253de..60688a883 100644 --- a/MeshCore/Sources/MeshCore/Session/RequestContext.swift +++ b/MeshCore/Sources/MeshCore/Session/RequestContext.swift @@ -234,14 +234,8 @@ public actor BinaryRequestSerializer { _ operation: @Sendable () async throws -> T ) async throws -> T { await acquire() - do { - let result = try await operation() - release() - return result - } catch { - release() - throw error - } + defer { release() } + return try await operation() } } @@ -281,13 +275,7 @@ public actor RequestResponseSerializer { _ operation: @Sendable () async throws -> T ) async throws -> T { await acquire() - do { - let result = try await operation() - release() - return result - } catch { - release() - throw error - } + defer { release() } + return try await operation() } } From 1e81b396bfa7f8b07c83070ca774a6549558fd6e Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Wed, 18 Mar 2026 17:59:04 -0500 Subject: [PATCH 6/8] Align MeshCoreSession docs with response matching --- .../MeshCore/Session/MeshCoreSession.swift | 33 ++++++++++--------- 1 file changed, 17 insertions(+), 16 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 499043a47..6aa4213df 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -464,6 +464,8 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Sends a command and waits for a matching response. /// /// This method avoids race conditions by subscribing to events before sending the command. + /// Events that do not satisfy the matcher, including unrelated `.error` events, are + /// ignored until a matching response arrives or the timeout expires. /// /// - Parameters: /// - data: The command data to send. @@ -489,9 +491,11 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameters: /// - data: Command data to send. /// - successPredicate: Predicate to match success events and extract result. + /// - errorMatcher: Optional matcher for request-specific error events. Errors that + /// do not match are ignored so unrelated commands cannot fail the active request. /// - timeout: Optional timeout override. /// - Returns: The extracted result on success. - /// - Throws: ``MeshCoreError/deviceError(code:)`` on error response, + /// - Throws: A matched ``MeshCoreError`` from `errorMatcher`, /// ``MeshCoreError/timeout`` on timeout. private func sendAndWaitWithError( _ data: Data, @@ -524,7 +528,8 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { try await requestResponseSerializer.withSerialization { [self] in let effectiveTimeout = timeout ?? configuration.defaultTimeout - // Subscribe BEFORE sending to avoid race condition + // Subscribe BEFORE sending to avoid race condition, then ignore all + // non-matching events until this request sees its own response. let events = await dispatcher.subscribe() // Send after subscribing @@ -569,8 +574,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// This is typically called automatically by ``start()``. /// /// - Returns: Information about the device itself. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit `selfInfo`. public func sendAppStart() async throws -> SelfInfo { let data = PacketBuilder.appStart(clientId: configuration.clientIdentifier) return try await sendAndWait(data) { event in @@ -582,8 +586,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Queries the device for its capabilities and system information. /// /// - Returns: Information about the device hardware, firmware, and supported features. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit `deviceInfo`. public func queryDevice() async throws -> DeviceCapabilities { let data = PacketBuilder.deviceQuery() return try await sendAndWait(data) { event in @@ -595,8 +598,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Retrieves the current battery status from the device. /// /// - Returns: Battery voltage and charge level information. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit battery info. public func getBattery() async throws -> BatteryInfo { try await sendAndWait(PacketBuilder.getBattery()) { event in if case .battery(let info) = event { return info } @@ -683,8 +685,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter publicKey: The full 32-byte public key of the contact. /// - Returns: The contact if found, or `nil` if no contact exists with that key. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit a matching contact response. public func getContact(publicKey: Data) async throws -> MeshContact? { let data = PacketBuilder.getContactByKey(publicKey: publicKey) return try await sendAndWait(data) { event in @@ -1042,8 +1043,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Gets the allowed frequency ranges for client repeat mode (v9+ firmware). /// /// - Returns: The allowed frequency ranges for repeat mode. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit repeat-frequency data. public func getRepeatFreq() async throws -> [FrequencyRange] { try await sendAndWait(PacketBuilder.getRepeatFreq()) { event in if case .allowedRepeatFreq(let ranges) = event { return ranges } @@ -1171,8 +1171,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Gets the current auto-add configuration from the device. /// /// - Returns: The auto-add configuration (bitmask + max hops). - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit auto-add configuration. public func getAutoAddConfig() async throws -> AutoAddConfig { try await sendAndWait(PacketBuilder.getAutoAddConfig()) { event in if case .autoAddConfig(let config) = event { return config } @@ -1234,6 +1233,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Retrieves telemetry data from the device. /// /// - Returns: Device telemetry including battery, temperature, and sensor data. + /// When `selfInfo` is available, only telemetry for the current device is accepted. /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. public func getSelfTelemetry() async throws -> TelemetryResponse { let expectedPrefix = selfInfo.map { Data($0.publicKey.prefix(6)) } @@ -1288,7 +1288,8 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter key: The 64-byte expanded private key to import. /// - Throws: ``MeshCoreError/featureDisabled`` if the device does not support key import, - /// ``MeshCoreError/timeout`` or ``MeshCoreError/deviceError(code:)`` on failure. + /// ``MeshCoreError/timeout`` if the device does not acknowledge the import, + /// or ``MeshCoreError/deviceError(code:)`` for a matched device error response. public func importPrivateKey(_ key: Data) async throws { let succeeded: Bool = try await sendAndWaitWithError( PacketBuilder.importPrivateKey(key) @@ -1758,7 +1759,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter index: Channel index (0-255). /// - Returns: Channel information including name and secret. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit configuration for the requested channel. public func getChannel(index: UInt8) async throws -> ChannelInfo { try await sendAndWait(PacketBuilder.getChannel(index: index)) { event in if case .channelInfo(let info) = event, info.index == index { return info } From 7cdc73ea01be911ce4d41deaeb8b93558b2a88d6 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Thu, 19 Mar 2026 12:01:03 -0500 Subject: [PATCH 7/8] Fail fast on binary MeshCore request errors --- .../MeshCore/Session/MeshCoreSession.swift | 25 +++++ ...shCoreSessionCommandCorrelationTests.swift | 100 ++++++++++++++---- 2 files changed, 103 insertions(+), 22 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 6aa4213df..706f6377d 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -834,6 +834,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: A status response containing battery, uptime, and other metrics. /// - Throws: ``MeshCoreError/timeout`` if no response within the timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. /// ``MeshCoreError/invalidResponse`` if an unexpected response is received. public func requestStatus(from publicKey: Data) async throws -> StatusResponse { // Serialize binary requests to prevent messageSent race conditions @@ -878,6 +879,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() + case .error(let code): + timeoutContinuation.finish() + throw MeshCoreError.deviceError(code: code ?? 0) + case .binaryResponse(let tag, let responseData): // Match by expectedAck (4-byte tag from firmware) guard let expected = expectedAck, tag == expected else { continue } @@ -1799,6 +1804,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: Telemetry response containing sensor data and device status. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. /// ``MeshCoreError/invalidResponse`` if unexpected response received. public func requestTelemetry(from publicKey: Data) async throws -> TelemetryResponse { // Serialize binary requests to prevent messageSent race conditions @@ -1846,6 +1852,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() + case .error(let code): + timeoutContinuation.finish() + throw MeshCoreError.deviceError(code: code ?? 0) + case .binaryResponse(let tag, let responseData): // Match by expectedAck (4-byte tag from firmware) guard let expected = expectedAck, tag == expected else { continue } @@ -1918,6 +1928,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - end: End of the time range. /// - Returns: MMA response containing aggregated statistics. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestMMA(from publicKey: Data, start: Date, end: Date) async throws -> MMAResponse { try await binaryRequestSerializer.withSerialization { [self] in try await performMMARequest(from: publicKey, start: start, end: end) @@ -1958,6 +1969,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() + case .error(let code): + timeoutContinuation.finish() + throw MeshCoreError.deviceError(code: code ?? 0) + case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } let entries = MMAParser.parse(responseData) @@ -1998,6 +2013,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: ACL response containing authorized public keys. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestACL(from publicKey: Data) async throws -> ACLResponse { try await binaryRequestSerializer.withSerialization { [self] in try await performACLRequest(from: publicKey) @@ -2031,6 +2047,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() + case .error(let code): + timeoutContinuation.finish() + throw MeshCoreError.deviceError(code: code ?? 0) + case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } let entries = ACLParser.parse(responseData) @@ -2076,6 +2096,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - pubkeyPrefixLength: Length of public key prefix to include (default 4). /// - Returns: Neighbors response containing list of adjacent nodes. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestNeighbours( from publicKey: Data, count: UInt8 = 255, @@ -2136,6 +2157,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.yield(timeout) timeoutContinuation.finish() + case .error(let code): + timeoutContinuation.finish() + throw MeshCoreError.deviceError(code: code ?? 0) + case .binaryResponse(let tag, let responseData): guard let expected = expectedAck, tag == expected else { continue } return NeighboursParser.parse( diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift index 6dac5174b..61aebe77d 100644 --- a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -287,8 +287,8 @@ struct MeshCoreSessionCommandCorrelationTests { await session.stop() } - @Test("requestStatus ignores unrelated errors and wrong-node status responses") - func requestStatusIgnoresUnrelatedErrorsAndWrongNodeResponses() async throws { + @Test("requestStatus fails fast on device error before messageSent") + func requestStatusFailsFastOnDeviceErrorBeforeMessageSent() async throws { let transport = MockTransport() let session = MeshCoreSession( transport: transport, @@ -307,17 +307,21 @@ struct MeshCoreSessionCommandCorrelationTests { } await transport.simulateError(code: 10) - await transport.simulateReceive(makeStatusResponsePacket(publicKeyPrefix: Data(repeating: 0x99, count: 6), battery: 3900)) - await transport.simulateReceive(makeStatusResponsePacket(publicKeyPrefix: Data(repeating: 0x31, count: 6), battery: 4010)) - let response = try await statusTask.value - #expect(response.publicKeyPrefix == Data(repeating: 0x31, count: 6)) - #expect(response.battery == 4010) + let error = await #expect(throws: MeshCoreError.self) { + try await statusTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError for binary status request, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 10) await session.stop() } - @Test("requestTelemetry ignores unrelated errors and wrong-node telemetry responses") - func requestTelemetryIgnoresUnrelatedErrorsAndWrongNodeResponses() async throws { + @Test("requestTelemetry fails fast on device error before messageSent") + func requestTelemetryFailsFastOnDeviceErrorBeforeMessageSent() async throws { let transport = MockTransport() let session = MeshCoreSession( transport: transport, @@ -336,21 +340,73 @@ struct MeshCoreSessionCommandCorrelationTests { } await transport.simulateError(code: 11) - await transport.simulateReceive( - makeTelemetryPacket( - publicKeyPrefix: Data(repeating: 0x88, count: 6), - lppPayload: Data([0x01, 0x67, 0x00, 0xFA]) - ) - ) - await transport.simulateReceive( - makeTelemetryPacket( - publicKeyPrefix: Data(repeating: 0x31, count: 6), - lppPayload: Data([0x01, 0x67, 0x00, 0xF0]) - ) + + let error = await #expect(throws: MeshCoreError.self) { + try await telemetryTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError for binary telemetry request, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 11) + await session.stop() + } + + @Test("binary request errors release the serializer for following requests") + func binaryRequestErrorsReleaseTheSerializer() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") ) - let response = try await telemetryTask.value - #expect(response.publicKeyPrefix == Data(repeating: 0x31, count: 6)) + try await startSession(session, transport: transport) + + let firstTarget = Data(repeating: 0x31, count: 32) + let secondTarget = Data(repeating: 0x42, count: 32) + + let statusTask = Task { + try await session.requestStatus(from: firstTarget) + } + let telemetryTask = Task { + try await session.requestTelemetry(from: secondTarget) + } + + try await waitUntil("first binary request should be sent") { + await transport.sentData.count == 2 + } + + try? await Task.sleep(for: .milliseconds(50)) + #expect(await transport.sentData.count == 2) + + await transport.simulateError(code: 12) + + let statusError = await #expect(throws: MeshCoreError.self) { + try await statusTask.value + } + guard case .deviceError(let firstCode)? = statusError else { + Issue.record("Expected first binary request to fail with deviceError, got \(String(describing: statusError))") + await session.stop() + return + } + #expect(firstCode == 12) + + try await waitUntil("second binary request should send after the first one fails") { + await transport.sentData.count == 3 + } + + await transport.simulateError(code: 13) + + let telemetryError = await #expect(throws: MeshCoreError.self) { + try await telemetryTask.value + } + guard case .deviceError(let secondCode)? = telemetryError else { + Issue.record("Expected second binary request to fail with deviceError, got \(String(describing: telemetryError))") + await session.stop() + return + } + #expect(secondCode == 13) await session.stop() } } From 170c28c9409fdfc79973f9e570bc20ab8740e392 Mon Sep 17 00:00:00 2001 From: Robert Ekl Date: Sat, 21 Mar 2026 11:42:44 -0500 Subject: [PATCH 8/8] Fix getContact response correlation --- .../Sources/MeshCore/Session/MeshCoreSession.swift | 11 +++++++---- 1 file changed, 7 insertions(+), 4 deletions(-) diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 706f6377d..906bc4e1a 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -688,15 +688,18 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit a matching contact response. public func getContact(publicKey: Data) async throws -> MeshContact? { let data = PacketBuilder.getContactByKey(publicKey: publicKey) - return try await sendAndWait(data) { event in + return try await sendAndMatch(data) { event in switch event { case .contact(let contact): - return contact.publicKey == publicKey ? contact : nil + if contact.publicKey == publicKey { + return .success(contact) + } + return .ignore case .error: // Contact not found returns error, treat as nil - return nil as MeshContact? + return .success(nil) default: - return nil + return .ignore } } }