diff --git a/MeshCore/Sources/MeshCore/Events/EventDispatcher.swift b/MeshCore/Sources/MeshCore/Events/EventDispatcher.swift index a6a15240c..f83beed38 100644 --- a/MeshCore/Sources/MeshCore/Events/EventDispatcher.swift +++ b/MeshCore/Sources/MeshCore/Events/EventDispatcher.swift @@ -58,6 +58,17 @@ public actor EventDispatcher { public func subscribe( filter: (@Sendable (MeshEvent) -> Bool)? ) -> AsyncStream { + subscribeTracked(filter: filter).stream + } + + /// Subscribes to events and returns the stream together with a handle that can + /// be finished explicitly by the caller. + /// + /// Explicit finishing is useful for timeout races, where a waiting task may + /// otherwise remain suspended on the stream after the caller has already moved on. + public func subscribeTracked( + filter: (@Sendable (MeshEvent) -> Bool)? = nil + ) -> (id: UUID, stream: AsyncStream) { let (stream, continuation) = AsyncStream.makeStream( of: MeshEvent.self, bufferingPolicy: .bufferingNewest(100) @@ -75,7 +86,7 @@ public actor EventDispatcher { } } - return stream + return (id, stream) } /// Dispatches an event to all subscribers, applying filters. @@ -104,6 +115,14 @@ public actor EventDispatcher { subscriptions.removeAll() } + /// Finishes and removes a specific subscription. + /// + /// Safe to call multiple times; unknown ids are ignored. + public func finishSubscription(id: UUID) { + guard let subscription = subscriptions.removeValue(forKey: id) else { return } + subscription.continuation.finish() + } + /// Removes a subscription from the dispatcher. /// /// - Parameter id: The unique identifier of the subscription to remove. diff --git a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift index 1523bc83d..929498bdb 100644 --- a/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift +++ b/MeshCore/Sources/MeshCore/Session/MeshCoreSession.swift @@ -84,6 +84,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { private let dispatcher = EventDispatcher() private let pendingRequests = PendingRequests() private let binaryRequestSerializer = BinaryRequestSerializer() + private let requestResponseSerializer = RequestResponseSerializer() // State private var contactManager = ContactManager() @@ -423,10 +424,11 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeout: TimeInterval? = nil ) async -> MeshEvent? { let effectiveTimeout = timeout ?? configuration.defaultTimeout + let (subscriptionID, events) = await dispatcher.subscribeTracked() return await withTaskGroup(of: MeshEvent?.self) { group in group.addTask { - for await event in await self.events() { + for await event in events { if Task.isCancelled { return nil } if predicate(event) { return event @@ -442,6 +444,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { let result = await group.next() ?? nil group.cancelAll() + await self.dispatcher.finishSubscription(id: subscriptionID) return result } } @@ -470,14 +473,12 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeout: TimeInterval? = nil ) async -> MeshEvent? { let effectiveTimeout = timeout ?? configuration.defaultTimeout + let (subscriptionID, stream) = await dispatcher.subscribeTracked(filter: filter.matches) return await withTaskGroup(of: MeshEvent?.self) { group in group.addTask { - // Use filtered subscription for efficiency - let stream = await self.dispatcher.subscribe(filter: filter.matches) for await event in stream { if Task.isCancelled { return nil } - // Event already passed filter, return immediately return event } return nil @@ -490,6 +491,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { let result = await group.next() ?? nil group.cancelAll() + await self.dispatcher.finishSubscription(id: subscriptionID) return result } } @@ -497,6 +499,8 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Sends a command and waits for a matching response. /// /// This method avoids race conditions by subscribing to events before sending the command. + /// Events that do not satisfy the matcher, including unrelated `.error` events, are + /// ignored until a matching response arrives or the timeout expires. /// /// - Parameters: /// - data: The command data to send. @@ -509,37 +513,11 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { matching predicate: @escaping @Sendable (MeshEvent) -> T?, timeout: TimeInterval? = nil ) async throws -> T { - let effectiveTimeout = timeout ?? configuration.defaultTimeout - - // Subscribe BEFORE sending to avoid race condition - let events = await dispatcher.subscribe() - - // Send after subscribing - try await transport.send(data) - - // Now wait for matching event - return try await withThrowingTaskGroup(of: T?.self) { group in - group.addTask { - for await event in events { - if Task.isCancelled { return nil } - if let result = predicate(event) { - return result - } - } - return nil + try await sendAndMatch(data, timeout: timeout) { event in + if let result = predicate(event) { + return .success(result) } - - group.addTask { [clock = self.clock] in - try await clock.sleep(for: .seconds(effectiveTimeout)) - return nil - } - - if let result = try await group.next() ?? nil { - group.cancelAll() - return result - } - group.cancelAll() - throw MeshCoreError.timeout + return .ignore } } @@ -548,50 +526,92 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameters: /// - data: Command data to send. /// - successPredicate: Predicate to match success events and extract result. + /// - errorMatcher: Optional matcher for request-specific error events. Errors that + /// do not match are ignored so unrelated commands cannot fail the active request. /// - timeout: Optional timeout override. /// - Returns: The extracted result on success. - /// - Throws: ``MeshCoreError/deviceError(code:)`` on error response, + /// - Throws: A matched ``MeshCoreError`` from `errorMatcher`, /// ``MeshCoreError/timeout`` on timeout. private func sendAndWaitWithError( _ data: Data, matching successPredicate: @escaping @Sendable (MeshEvent) -> T?, + errorMatcher: (@Sendable (MeshEvent) -> MeshCoreError?)? = nil, timeout: TimeInterval? = nil ) async throws -> T { - let effectiveTimeout = timeout ?? configuration.defaultTimeout + try await sendAndMatch(data, timeout: timeout) { event in + if let error = errorMatcher?(event) { + return .failure(error) + } + if let result = successPredicate(event) { + return .success(result) + } + return .ignore + } + } - // Subscribe BEFORE sending to avoid race condition - let events = await dispatcher.subscribe() + private enum ResponseDisposition { + case success(T) + case failure(MeshCoreError) + case ignore + } - // Send after subscribing - try await transport.send(data) + private func sendAndMatch( + _ data: Data, + timeout: TimeInterval? = nil, + matching matcher: @escaping @Sendable (MeshEvent) -> ResponseDisposition + ) async throws -> T { + try await requestResponseSerializer.withSerialization { [self] in + let effectiveTimeout = timeout ?? configuration.defaultTimeout - // Now wait for matching event - return try await withThrowingTaskGroup(of: T?.self) { group in - group.addTask { - for await event in events { - if Task.isCancelled { return nil } - // Check for error response first - if case .error(let code) = event { - throw MeshCoreError.deviceError(code: code ?? 0) - } - if let result = successPredicate(event) { - return result + // Subscribe BEFORE sending to avoid race condition, then ignore all + // non-matching events until this request sees its own response. + let (subscriptionID, events) = await dispatcher.subscribeTracked() + + do { + // Send after subscribing + try await transport.send(data) + + return try await withThrowingTaskGroup(of: T?.self) { group in + group.addTask { + for await event in events { + if Task.isCancelled { return nil } + + switch matcher(event) { + case .success(let result): + return result + case .failure(let error): + throw error + case .ignore: + continue + } + } + return nil } - } - return nil - } - group.addTask { [clock = self.clock] in - try await clock.sleep(for: .seconds(effectiveTimeout)) - return nil - } + group.addTask { [clock = self.clock] in + try await clock.sleep(for: .seconds(effectiveTimeout)) + return nil + } - if let result = try await group.next() ?? nil { - group.cancelAll() - return result + do { + if let result = try await group.next() ?? nil { + group.cancelAll() + await dispatcher.finishSubscription(id: subscriptionID) + return result + } + group.cancelAll() + await dispatcher.finishSubscription(id: subscriptionID) + throw MeshCoreError.timeout + } catch { + group.cancelAll() + await dispatcher.finishSubscription(id: subscriptionID) + throw error + } + } + } catch { + await dispatcher.finishSubscription(id: subscriptionID) + throw error } - group.cancelAll() - throw MeshCoreError.timeout } } @@ -602,11 +622,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// This is typically called automatically by ``start()``. /// /// - Returns: Information about the device itself. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit `selfInfo`. public func sendAppStart() async throws -> SelfInfo { let data = PacketBuilder.appStart(clientId: configuration.clientIdentifier) - return try await sendAndWaitWithError(data) { event in + return try await sendAndWait(data) { event in if case .selfInfo(let info) = event { return info } return nil } @@ -615,11 +634,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Queries the device for its capabilities and system information. /// /// - Returns: Information about the device hardware, firmware, and supported features. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit `deviceInfo`. public func queryDevice() async throws -> DeviceCapabilities { let data = PacketBuilder.deviceQuery() - return try await sendAndWaitWithError(data) { event in + return try await sendAndWait(data) { event in if case .deviceInfo(let info) = event { return info } return nil } @@ -628,11 +646,9 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Retrieves the current battery status from the device. /// /// - Returns: Battery voltage and charge level information. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit battery info. public func getBattery() async throws -> BatteryInfo { - let data = PacketBuilder.getBattery() - return try await sendAndWaitWithError(data) { event in + try await sendAndWait(PacketBuilder.getBattery()) { event in if case .battery(let info) = event { return info } return nil } @@ -717,19 +733,21 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter publicKey: The full 32-byte public key of the contact. /// - Returns: The contact if found, or `nil` if no contact exists with that key. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit a matching contact response. public func getContact(publicKey: Data) async throws -> MeshContact? { let data = PacketBuilder.getContactByKey(publicKey: publicKey) - return try await sendAndWait(data) { event in + return try await sendAndMatch(data) { event in switch event { case .contact(let contact): - return contact + if contact.publicKey == publicKey { + return .success(contact) + } + return .ignore case .error: // Contact not found returns error, treat as nil - return nil as MeshContact? + return .success(nil) default: - return nil + return .ignore } } } @@ -859,11 +877,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter flood: If `true`, the advertisement is broadcast using flood routing. /// - Throws: ``MeshCoreError/timeout`` or ``MeshCoreError/deviceError(code:)`` on failure. public func sendAdvertisement(flood: Bool = false) async throws { - let data = PacketBuilder.sendAdvertisement(flood: flood) - let _: Bool = try await sendAndWaitWithError(data) { event in - if case .ok = event { return true } - return nil - } + try await sendSimpleCommand(PacketBuilder.sendAdvertisement(flood: flood)) } /// Requests status information from a remote node using the binary protocol. @@ -871,6 +885,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: A status response containing battery, uptime, and other metrics. /// - Throws: ``MeshCoreError/timeout`` if no response within the timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. /// ``MeshCoreError/invalidResponse`` if an unexpected response is received. public func requestStatus(from publicKey: Data) async throws -> StatusResponse { // Serialize binary requests to prevent messageSent race conditions @@ -916,6 +931,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.finish() case .error(let code): + timeoutContinuation.finish() throw MeshCoreError.deviceError(code: code ?? 0) case .binaryResponse(let tag, let responseData): @@ -934,6 +950,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { case .statusResponse(let response): // Handle already-routed response (if routing happens elsewhere) + guard response.publicKeyPrefix == publicKeyPrefix else { continue } let elapsed = ContinuousClock.now - startTime logger.info("Status request to \(prefixHex): routed response received in \(elapsed)") return response @@ -1082,10 +1099,9 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Gets the allowed frequency ranges for client repeat mode (v9+ firmware). /// /// - Returns: The allowed frequency ranges for repeat mode. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit repeat-frequency data. public func getRepeatFreq() async throws -> [FrequencyRange] { - try await sendAndWaitWithError(PacketBuilder.getRepeatFreq()) { event in + try await sendAndWait(PacketBuilder.getRepeatFreq()) { event in if case .allowedRepeatFreq(let ranges) = event { return ranges } return nil } @@ -1211,10 +1227,9 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Gets the current auto-add configuration from the device. /// /// - Returns: The auto-add configuration (bitmask + max hops). - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. - /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit auto-add configuration. public func getAutoAddConfig() async throws -> AutoAddConfig { - try await sendAndWaitWithError(PacketBuilder.getAutoAddConfig()) { event in + try await sendAndWait(PacketBuilder.getAutoAddConfig()) { event in if case .autoAddConfig(let config) = event { return config } return nil } @@ -1226,10 +1241,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. /// ``MeshCoreError/deviceError(code:)`` if the device returns an error. public func setAutoAddConfig(_ config: AutoAddConfig) async throws { - try await sendAndWaitWithError(PacketBuilder.setAutoAddConfig(config)) { event in - if case .ok = event { return () } - return nil - } + try await sendSimpleCommand(PacketBuilder.setAutoAddConfig(config)) } /// Returns the current device configuration from selfInfo. @@ -1277,10 +1289,15 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Retrieves telemetry data from the device. /// /// - Returns: Device telemetry including battery, temperature, and sensor data. + /// When `selfInfo` is available, only telemetry for the current device is accepted. /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. public func getSelfTelemetry() async throws -> TelemetryResponse { - try await sendAndWait(PacketBuilder.getSelfTelemetry()) { event in - if case .telemetryResponse(let response) = event { return response } + let expectedPrefix = selfInfo.map { Data($0.publicKey.prefix(6)) } + return try await sendAndWait(PacketBuilder.getSelfTelemetry()) { event in + if case .telemetryResponse(let response) = event, + expectedPrefix == nil || response.publicKeyPrefix == expectedPrefix { + return response + } return nil } } @@ -1327,14 +1344,18 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter key: The 64-byte expanded private key to import. /// - Throws: ``MeshCoreError/featureDisabled`` if the device does not support key import, - /// ``MeshCoreError/timeout`` or ``MeshCoreError/deviceError(code:)`` on failure. + /// ``MeshCoreError/timeout`` if the device does not acknowledge the import, + /// or ``MeshCoreError/deviceError(code:)`` for a matched device error response. public func importPrivateKey(_ key: Data) async throws { let succeeded: Bool = try await sendAndWaitWithError( PacketBuilder.importPrivateKey(key) ) { event in - if case .ok = event { return true } + if case .ok(value: nil) = event { return true } if case .disabled = event { return false } return nil + } errorMatcher: { event in + guard case .error(let code?) = event else { return nil } + return MeshCoreError.deviceError(code: code) } if !succeeded { throw MeshCoreError.featureDisabled @@ -1828,10 +1849,10 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// /// - Parameter index: Channel index (0-255). /// - Returns: Channel information including name and secret. - /// - Throws: ``MeshCoreError/timeout`` if the device doesn't respond. + /// - Throws: ``MeshCoreError/timeout`` if the device doesn't emit configuration for the requested channel. public func getChannel(index: UInt8) async throws -> ChannelInfo { try await sendAndWait(PacketBuilder.getChannel(index: index)) { event in - if case .channelInfo(let info) = event { return info } + if case .channelInfo(let info) = event, info.index == index { return info } return nil } } @@ -1868,6 +1889,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: Telemetry response containing sensor data and device status. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. /// ``MeshCoreError/invalidResponse`` if unexpected response received. public func requestTelemetry(from publicKey: Data) async throws -> TelemetryResponse { // Serialize binary requests to prevent messageSent race conditions @@ -1916,6 +1938,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.finish() case .error(let code): + timeoutContinuation.finish() throw MeshCoreError.deviceError(code: code ?? 0) case .binaryResponse(let tag, let responseData): @@ -1932,6 +1955,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { case .telemetryResponse(let response): // Handle already-routed response (if routing happens elsewhere) + guard response.publicKeyPrefix == publicKeyPrefix else { continue } let elapsed = ContinuousClock.now - startTime logger.info("Telemetry request to \(prefixHex): routed response received in \(elapsed)") return response @@ -1989,6 +2013,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - end: End of the time range. /// - Returns: MMA response containing aggregated statistics. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestMMA(from publicKey: Data, start: Date, end: Date) async throws -> MMAResponse { try await binaryRequestSerializer.withSerialization { [self] in try await performMMARequest(from: publicKey, start: start, end: end) @@ -2030,6 +2055,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.finish() case .error(let code): + timeoutContinuation.finish() throw MeshCoreError.deviceError(code: code ?? 0) case .binaryResponse(let tag, let responseData): @@ -2072,6 +2098,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - Parameter publicKey: The full 32-byte public key of the remote node. /// - Returns: ACL response containing authorized public keys. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestACL(from publicKey: Data) async throws -> ACLResponse { try await binaryRequestSerializer.withSerialization { [self] in try await performACLRequest(from: publicKey) @@ -2106,6 +2133,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.finish() case .error(let code): + timeoutContinuation.finish() throw MeshCoreError.deviceError(code: code ?? 0) case .binaryResponse(let tag, let responseData): @@ -2153,6 +2181,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// - pubkeyPrefixLength: Length of public key prefix to include (default 4). /// - Returns: Neighbors response containing list of adjacent nodes. /// - Throws: ``MeshCoreError/timeout`` if no response within timeout period. + /// ``MeshCoreError/deviceError(code:)`` if the device rejects the request. public func requestNeighbours( from publicKey: Data, count: UInt8 = 255, @@ -2214,6 +2243,7 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { timeoutContinuation.finish() case .error(let code): + timeoutContinuation.finish() throw MeshCoreError.deviceError(code: code ?? 0) case .binaryResponse(let tag, let responseData): @@ -2425,10 +2455,21 @@ public actor MeshCoreSession: MeshCoreSessionProtocol { /// Sends a command and waits for an "OK" response from the device. private func sendSimpleCommand(_ data: Data) async throws { - let _: Bool = try await sendAndWaitWithError(data) { event in - if case .ok = event { return true } - return nil - } + let _: Bool = try await sendAndWaitWithError( + data, + matching: { event in + if case .ok(let value) = event, value == nil { + return true + } + return nil + }, + errorMatcher: { event in + if case .error(let code) = event { + return MeshCoreError.deviceError(code: code ?? 0) + } + return nil + } + ) } /// The background loop for receiving data from the transport. diff --git a/MeshCore/Sources/MeshCore/Session/RequestContext.swift b/MeshCore/Sources/MeshCore/Session/RequestContext.swift index e2a753c9d..60688a883 100644 --- a/MeshCore/Sources/MeshCore/Session/RequestContext.swift +++ b/MeshCore/Sources/MeshCore/Session/RequestContext.swift @@ -234,13 +234,48 @@ public actor BinaryRequestSerializer { _ operation: @Sendable () async throws -> T ) async throws -> T { await acquire() - do { - let result = try await operation() - release() - return result - } catch { - release() - throw error + defer { release() } + return try await operation() + } +} + +/// Serializes broad command-response operations that rely on event matching. +/// +/// Many MeshCore commands wait for generic events such as `.ok`, `.error`, or a +/// singleton typed response. Serializing those request/response exchanges avoids +/// cross-command event miscorrelation when multiple callers issue commands at once. +public actor RequestResponseSerializer { + private var isRequestInFlight = false + private var waiters: [CheckedContinuation] = [] + + /// Acquires the serializer, waiting if another request/response exchange is active. + public func acquire() async { + if !isRequestInFlight { + isRequestInFlight = true + return + } + + await withCheckedContinuation { continuation in + waiters.append(continuation) } } + + /// Releases the serializer to the next waiting request. + public func release() { + if let next = waiters.first { + waiters.removeFirst() + next.resume() + } else { + isRequestInFlight = false + } + } + + /// Executes a request/response operation while holding the serializer. + public func withSerialization( + _ operation: @Sendable () async throws -> T + ) async throws -> T { + await acquire() + defer { release() } + return try await operation() + } } diff --git a/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift new file mode 100644 index 000000000..61aebe77d --- /dev/null +++ b/MeshCore/Tests/MeshCoreTests/Session/MeshCoreSessionCommandCorrelationTests.swift @@ -0,0 +1,522 @@ +import Foundation +import Testing +@testable import MeshCore + +@Suite("MeshCoreSession command correlation") +struct MeshCoreSessionCommandCorrelationTests { + @Test("simple commands serialize concurrent OK/ERROR waits") + func simpleCommandsSerializeConcurrentOKWaits() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let first = Task { + try await session.factoryReset() + } + let second = Task { + try await session.sendAdvertisement(flood: true) + } + + try await waitUntil("first command should be sent") { + await transport.sentData.count == 2 + } + + try? await Task.sleep(for: .milliseconds(50)) + #expect(await transport.sentData.count == 2) + + await transport.simulateOK() + + try await waitUntil("second command should wait for the first command to complete") { + await transport.sentData.count == 3 + } + + await transport.simulateOK() + + try await first.value + try await second.value + await session.stop() + } + + @Test("simple commands ignore OK responses with payloads") + func simpleCommandsIgnoreOKResponsesWithPayloads() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let resetTask = Task { + try await session.factoryReset() + } + + try await waitUntil("factoryReset should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateOK(value: 7) + + let error = await #expect(throws: MeshCoreError.self) { + try await resetTask.value + } + guard case .timeout? = error else { + Issue.record("Expected timeout after unrelated OK payload, got \(String(describing: error))") + await session.stop() + return + } + + await session.stop() + } + + @Test("simple commands still fail on device errors") + func simpleCommandsStillFailOnDeviceErrors() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let commandTask = Task { + try await session.setAutoAddConfig(AutoAddConfig(bitmask: 0x1E, maxHops: 2)) + } + + try await waitUntil("setAutoAddConfig should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 42) + + let error = await #expect(throws: MeshCoreError.self) { + try await commandTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 42) + + await session.stop() + } + + @Test("session start ignores unrelated errors until selfInfo arrives") + func sessionStartIgnoresUnrelatedErrorsUntilSelfInfoArrives() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + let startTask = Task { + try await session.start() + } + + try await waitUntil("transport should send appStart before session starts") { + await transport.sentData.count == 1 + } + + await transport.simulateError(code: 99) + await transport.simulateReceive(makeSelfInfoPacket()) + + try await startTask.value + #expect(await session.currentSelfInfo?.name == "Test") + await session.stop() + } + + @Test("getBattery ignores unrelated errors while waiting for a battery response") + func getBatteryIgnoresUnrelatedErrorsWhileWaitingForBatteryResponse() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let batteryTask = Task { + try await session.getBattery() + } + + try await waitUntil("getBattery should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 10) + await transport.simulateReceive(makeBatteryPacket(level: 4018)) + + let battery = try await batteryTask.value + #expect(battery.level == 4018) + await session.stop() + } + + @Test("getSelfTelemetry ignores telemetry for other nodes") + func getSelfTelemetryIgnoresTelemetryForOtherNodes() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let telemetryTask = Task { + try await session.getSelfTelemetry() + } + + try await waitUntil("getSelfTelemetry should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data([0xAA, 0xBB, 0xCC, 0xDD, 0xEE, 0xFF]), + lppPayload: Data([0x01, 0x67, 0x00, 0xFA]) + ) + ) + await transport.simulateReceive( + makeTelemetryPacket( + publicKeyPrefix: Data(repeating: 0x01, count: 6), + lppPayload: Data([0x01, 0x67, 0x00, 0xF0]) + ) + ) + + let response = try await telemetryTask.value + #expect(response.publicKeyPrefix == Data(repeating: 0x01, count: 6)) + await session.stop() + } + + @Test("getChannel ignores responses for other channel indexes") + func getChannelIgnoresResponsesForOtherChannelIndexes() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let channelTask = Task { + try await session.getChannel(index: 3) + } + + try await waitUntil("getChannel should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeChannelInfoPacket(index: 9, name: "Wrong", secret: Data(repeating: 0xAA, count: 16)) + ) + await transport.simulateReceive( + makeChannelInfoPacket(index: 3, name: "Right", secret: Data(repeating: 0xBB, count: 16)) + ) + + let channel = try await channelTask.value + #expect(channel.index == 3) + #expect(channel.name == "Right") + await session.stop() + } + + @Test("getContact ignores responses for other public keys") + func getContactIgnoresResponsesForOtherPublicKeys() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let requestedKey = Data(repeating: 0x11, count: 32) + let contactTask = Task { + try await session.getContact(publicKey: requestedKey) + } + + try await waitUntil("getContact should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateReceive( + makeContactPacket(publicKey: Data(repeating: 0x22, count: 32), name: "Wrong") + ) + await transport.simulateReceive( + makeContactPacket(publicKey: requestedKey, name: "Right") + ) + + let contact = try #require(await contactTask.value) + #expect(contact.publicKey == requestedKey) + #expect(contact.advertisedName == "Right") + await session.stop() + } + + @Test("importPrivateKey ignores OK responses with payloads") + func importPrivateKeyIgnoresOKResponsesWithPayloads() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let importTask = Task { + try await session.importPrivateKey(Data(repeating: 0x33, count: 64)) + } + + try await waitUntil("importPrivateKey should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateOK(value: 7) + + let error = await #expect(throws: MeshCoreError.self) { + try await importTask.value + } + guard case .timeout? = error else { + Issue.record("Expected timeout after unrelated OK payload, got \(String(describing: error))") + await session.stop() + return + } + + await session.stop() + } + + @Test("requestStatus fails fast on device error before messageSent") + func requestStatusFailsFastOnDeviceErrorBeforeMessageSent() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let target = Data(repeating: 0x31, count: 32) + let statusTask = Task { + try await session.requestStatus(from: target) + } + + try await waitUntil("requestStatus should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 10) + + let error = await #expect(throws: MeshCoreError.self) { + try await statusTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError for binary status request, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 10) + await session.stop() + } + + @Test("requestTelemetry fails fast on device error before messageSent") + func requestTelemetryFailsFastOnDeviceErrorBeforeMessageSent() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let target = Data(repeating: 0x31, count: 32) + let telemetryTask = Task { + try await session.requestTelemetry(from: target) + } + + try await waitUntil("requestTelemetry should be sent") { + await transport.sentData.count == 2 + } + + await transport.simulateError(code: 11) + + let error = await #expect(throws: MeshCoreError.self) { + try await telemetryTask.value + } + guard case .deviceError(let code)? = error else { + Issue.record("Expected deviceError for binary telemetry request, got \(String(describing: error))") + await session.stop() + return + } + #expect(code == 11) + await session.stop() + } + + @Test("binary request errors release the serializer for following requests") + func binaryRequestErrorsReleaseTheSerializer() async throws { + let transport = MockTransport() + let session = MeshCoreSession( + transport: transport, + configuration: SessionConfiguration(defaultTimeout: 0.2, clientIdentifier: "MCTst") + ) + + try await startSession(session, transport: transport) + + let firstTarget = Data(repeating: 0x31, count: 32) + let secondTarget = Data(repeating: 0x42, count: 32) + + let statusTask = Task { + try await session.requestStatus(from: firstTarget) + } + let telemetryTask = Task { + try await session.requestTelemetry(from: secondTarget) + } + + try await waitUntil("first binary request should be sent") { + await transport.sentData.count == 2 + } + + try? await Task.sleep(for: .milliseconds(50)) + #expect(await transport.sentData.count == 2) + + await transport.simulateError(code: 12) + + let statusError = await #expect(throws: MeshCoreError.self) { + try await statusTask.value + } + guard case .deviceError(let firstCode)? = statusError else { + Issue.record("Expected first binary request to fail with deviceError, got \(String(describing: statusError))") + await session.stop() + return + } + #expect(firstCode == 12) + + try await waitUntil("second binary request should send after the first one fails") { + await transport.sentData.count == 3 + } + + await transport.simulateError(code: 13) + + let telemetryError = await #expect(throws: MeshCoreError.self) { + try await telemetryTask.value + } + guard case .deviceError(let secondCode)? = telemetryError else { + Issue.record("Expected second binary request to fail with deviceError, got \(String(describing: telemetryError))") + await session.stop() + return + } + #expect(secondCode == 13) + await session.stop() + } +} + +private func startSession( + _ session: MeshCoreSession, + transport: MockTransport +) async throws { + let startTask = Task { + try await session.start() + } + + try await waitUntil("transport should send appStart before session starts") { + await transport.sentData.count == 1 + } + + await transport.simulateReceive(makeSelfInfoPacket()) + try await startTask.value +} + +private func makeSelfInfoPacket() -> Data { + var payload = Data() + payload.append(1) + payload.append(UInt8(bitPattern: 22)) + payload.append(UInt8(bitPattern: 22)) + payload.append(Data(repeating: 0x01, count: 32)) + payload.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + payload.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + payload.append(0) + payload.append(0) + payload.append(0) + payload.append(0) + payload.append(contentsOf: withUnsafeBytes(of: UInt32(915_000).littleEndian) { Array($0) }) + payload.append(contentsOf: withUnsafeBytes(of: UInt32(125_000).littleEndian) { Array($0) }) + payload.append(7) + payload.append(5) + payload.append(contentsOf: "Test".utf8) + + var packet = Data([ResponseCode.selfInfo.rawValue]) + packet.append(payload) + return packet +} + +private func makeBatteryPacket(level: UInt16) -> Data { + var packet = Data([ResponseCode.battery.rawValue]) + packet.append(contentsOf: withUnsafeBytes(of: level.littleEndian) { Array($0) }) + return packet +} + +private func makeTelemetryPacket(publicKeyPrefix: Data, lppPayload: Data) -> Data { + var packet = Data([ResponseCode.telemetryResponse.rawValue]) + packet.append(0x00) + packet.append(publicKeyPrefix) + packet.append(lppPayload) + return packet +} + +private func makeStatusResponsePacket(publicKeyPrefix: Data, battery: UInt16) -> Data { + var packet = Data([ResponseCode.statusResponse.rawValue, 0x00]) + packet.append(publicKeyPrefix) + packet.append(contentsOf: withUnsafeBytes(of: battery.littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(-110).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(-85).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(100).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(50).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(25).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(3600).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(5).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(10).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(15).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(20).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt16(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + return packet +} + +private func makeChannelInfoPacket(index: UInt8, name: String, secret: Data) -> Data { + var packet = Data([ResponseCode.channelInfo.rawValue, index]) + let nameBytes = Array(name.utf8.prefix(31)) + packet.append(contentsOf: nameBytes) + packet.append(0) + if nameBytes.count < 31 { + packet.append(Data(repeating: 0, count: 31 - nameBytes.count)) + } + packet.append(secret) + return packet +} + +private func makeContactPacket(publicKey: Data, name: String) -> Data { + var packet = Data([ResponseCode.contact.rawValue]) + packet.append(publicKey) + packet.append(ContactType.chat.rawValue) + packet.append(ContactFlags().rawValue) + packet.append(0xFF) + packet.append(Data(repeating: 0, count: 64)) + + let nameBytes = Array(name.utf8.prefix(31)) + packet.append(contentsOf: nameBytes) + packet.append(0) + if nameBytes.count < 31 { + packet.append(Data(repeating: 0, count: 31 - nameBytes.count)) + } + + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: Int32(0).littleEndian) { Array($0) }) + packet.append(contentsOf: withUnsafeBytes(of: UInt32(0).littleEndian) { Array($0) }) + return packet +}