From 860c2041da1ecf718959ef9ce6984cb221e564ad Mon Sep 17 00:00:00 2001 From: Yurii Romanchenko Date: Wed, 22 Sep 2021 22:32:07 -0700 Subject: [PATCH] Fix doFinally rx extension --- .../core/RxPeripheralManager.swift | 13 +++-- .../RxPeripheralManagerTests.swift | 50 +++++++++++++++++++ 2 files changed, 60 insertions(+), 3 deletions(-) diff --git a/Sources/RxCBCentral/core/RxPeripheralManager.swift b/Sources/RxCBCentral/core/RxPeripheralManager.swift index 7efbbf1..c2d4a1c 100644 --- a/Sources/RxCBCentral/core/RxPeripheralManager.swift +++ b/Sources/RxCBCentral/core/RxPeripheralManager.swift @@ -108,8 +108,15 @@ fileprivate struct GattQueue { extension ObservableType { func doFinally(_ finally: @escaping () -> ()) -> Observable { - return `do`(onError: { _ in finally() }, - onCompleted: finally, - onDispose: finally) + var didEmit = false + let invoke = { + guard !didEmit else { return } + didEmit = true + finally() + } + + return `do`(onError: { _ in invoke() }, + onCompleted: invoke, + onDispose: invoke) } } diff --git a/Tests/RxCBCentralTests/RxPeripheralManagerTests.swift b/Tests/RxCBCentralTests/RxPeripheralManagerTests.swift index 92e6c23..5cd283d 100644 --- a/Tests/RxCBCentralTests/RxPeripheralManagerTests.swift +++ b/Tests/RxCBCentralTests/RxPeripheralManagerTests.swift @@ -129,4 +129,54 @@ private final class RxPeripheralManagerTests: XCTestCase { XCTAssertEqual(dataObserver.events, expectation) } + + func test_queue_enqueueSeveralOperations_executesInOrder() { + // Create 3 payloads which will be sent simultaneously to peripheral + let payload1 = "Packet 1 containing dummy data" + let payload2 = "Packet 2 containing slightly more dummy data than first packet" + let payload3 = "P3" + + guard let p1Data = payload1.data(using: .utf8), + let p2Data = payload2.data(using: .utf8), + let p3Data = payload3.data(using: .utf8) else { + XCTFail() + return + } + + // Payloads are meant to be sent serially one after each other + let expectedBlob = p1Data + p2Data + p3Data + // Payload received on 'receiving' side + var receivedBlob = Data() + + // Simulate peripheral with small write length so packets will be fragmented + rxPeripheral.maxWriteLength = 5 + rxPeripheral.writeHandler = { _, _, data -> Completable in + // Simulate delay as it takes time to connect/discover, return 'successful' transfer. + return Observable.empty() + .delay( + RxTimeInterval.milliseconds(100), + scheduler: self.testScheduler + ) + .do(onCompleted: { receivedBlob.append(data) }) + .asCompletable() + } + + // Enqueue write operations + for payload in [p1Data, p2Data, p3Data] { + peripheralManager.queue( + operation: Write( + service: CBUUID(), + characteristic: CBUUID(), + data: payload + ) + ) + .subscribe({ _ in }) + .disposed(by: disposeBag) + } + + // Start the show + testScheduler.advanceTo(testScheduler.clock + 100) + + XCTAssertEqual(expectedBlob, receivedBlob) + } }