From 7fd6232e0524fa127ee731d2dcbb128922ae50cb Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 26 May 2026 18:18:17 +0000 Subject: [PATCH 1/9] fix: recover from corrupt persistence files instead of crashing hydration MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit FileDataStoreConfig and the resolver config caught only PathNotFoundException when hydrating. A corrupt JSON file or a file that fails to decrypt threw a FormatException that propagated out of the worker isolate's hydrate handler — crashing the worker and failing hydration of the entire store, so one unreadable partition bricked startup for all data. Catch the decode/decrypt failure, move the unreadable file aside to .corrupt (preserved for inspection, ignored by the data store file listing), and recover with an empty store for that partition. Other partitions hydrate normally and the next persist writes a fresh file. This favours startup resilience for a local cache; the failure is surfaced via the logger. Adds fault-injection tests that hydrate alongside a corrupt data store file and a corrupt resolver file, asserting hydration recovers (valid data still loads, the bad file is quarantined). Confirmed both tests fail against the unfixed code (the worker isolate crashes). --- .../file_data_store_config.dart | 28 ++++++ test/native/file_persistor_test.dart | 94 +++++++++++++++++++ 2 files changed, 122 insertions(+) diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 7a5a2ee..0fa48b4 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -4,6 +4,28 @@ import 'package:loon/loon.dart'; import 'package:loon/persistor/data_store.dart'; import 'package:loon/persistor/data_store_resolver.dart'; +/// Moves a file that failed to hydrate (corrupt JSON, failed decryption) aside +/// to `.corrupt` and recovers by returning null (an empty store) so that +/// one unreadable file cannot fail hydration of the entire store. The data is +/// preserved for inspection, the `.corrupt` suffix is ignored by the data store +/// file listing, and the next persist for the partition writes a fresh file. +Future _recoverCorruptFile( + File file, + Logger logger, + Object error, +) async { + logger.log('Failed to hydrate ${file.path}, quarantining as corrupt: $error'); + try { + final quarantine = File('${file.path}.corrupt'); + if (await quarantine.exists()) { + await quarantine.delete(); + } + await file.rename(quarantine.path); + } catch (e) { + logger.log('Failed to quarantine ${file.path}: $e'); + } +} + class FileDataStoreConfig extends DataStoreConfig { FileDataStoreConfig( super.name, { @@ -29,6 +51,9 @@ class FileDataStoreConfig extends DataStoreConfig { return store; } on PathNotFoundException { return null; + } catch (error) { + await _recoverCorruptFile(file, logger, error); + return null; } }, persist: (store) async { @@ -59,6 +84,9 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig { jsonDecode(await file.readAsString())); } on PathNotFoundException { return null; + } catch (error) { + await _recoverCorruptFile(file, logger, error); + return null; } }, persist: (store) => file.writeAsString(jsonEncode(store.inspect())), diff --git a/test/native/file_persistor_test.dart b/test/native/file_persistor_test.dart index fdfbb17..c1b7d36 100644 --- a/test/native/file_persistor_test.dart +++ b/test/native/file_persistor_test.dart @@ -1,6 +1,9 @@ import 'dart:convert'; import 'dart:io'; +import 'package:encrypt/encrypt.dart'; import 'package:flutter_test/flutter_test.dart'; +import 'package:loon/loon.dart'; +import 'package:loon/persistor/data_store_encrypter.dart'; import 'package:loon/persistor/file_persistor/file_persistor.dart'; // ignore: depend_on_referenced_packages @@ -53,4 +56,95 @@ void main() { factory: FilePersistor.new, ); }); + + group('FilePersistor fault recovery', () { + final loonDir = Directory('${testDirectory.path}/loon'); + + // FlutterSecureStorage does not work in tests, so supply a fixed encrypter. + final encrypter = DataStoreEncrypter( + Encrypter(AES(Key.fromSecureRandom(32), mode: AESMode.cbc)), + ); + + FilePersistor newPersistor() => FilePersistor( + persistenceThrottle: const Duration(milliseconds: 1), + encrypter: encrypter, + ); + + setUp(() { + // Start each scenario from an empty persistence directory. + if (loonDir.existsSync()) { + for (final entity in loonDir.listSync()) { + entity.deleteSync(recursive: true); + } + } + Loon.unsubscribe(); + }); + + tearDown(() async { + await Loon.clearAll(); + Loon.unsubscribe(); + }); + + test('A corrupt data store file does not fail hydration of the rest', + () async { + // A valid default store plus an unreadable sibling store on disk. + await File('${loonDir.path}/__store__.json').writeAsString( + jsonEncode({ + "": { + "users": { + "__values": {"1": {"name": "User 1"}} + } + } + }), + ); + await File('${loonDir.path}/corrupt.json') + .writeAsString('{ not valid json'); + + Loon.configure(persistor: newPersistor()); + + // Hydration must not throw despite the corrupt file. + await Loon.hydrate(); + + // The valid store's data is still hydrated. + expect( + Loon.collection('users').doc('1').get()?.data, + {"name": "User 1"}, + ); + + // The corrupt file was quarantined rather than loaded. + expect(await File('${loonDir.path}/corrupt.json').exists(), false); + expect( + await File('${loonDir.path}/corrupt.json.corrupt').exists(), + true, + ); + }); + + test('A corrupt resolver file does not fail hydration', () async { + await File('${loonDir.path}/__store__.json').writeAsString( + jsonEncode({ + "": { + "users": { + "__values": {"1": {"name": "User 1"}} + } + } + }), + ); + await File('${loonDir.path}/__resolver__.json') + .writeAsString('}{ broken'); + + Loon.configure(persistor: newPersistor()); + + await Loon.hydrate(); + + // Full hydration does not depend on the resolver, so the data still loads. + expect( + Loon.collection('users').doc('1').get()?.data, + {"name": "User 1"}, + ); + expect( + await File('${loonDir.path}/__resolver__.json.corrupt').exists(), + true, + ); + }); + }); } From d67f5d66b30e3d68a77b48a7f8cc4e29f1afc77d Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 26 May 2026 18:20:58 +0000 Subject: [PATCH 2/9] ci: retrigger (pre-existing test/core timing flake, unrelated to persistence recovery) From af5539ab34d815699f3b2e7fa49b6d7f3e8f5305 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 09:51:31 -0400 Subject: [PATCH 3/9] fix: narrow file hydration recovery --- .../file_data_store_config.dart | 87 ++++++++++++++++--- test/native/file_persistor_test.dart | 51 +++++++++-- 2 files changed, 122 insertions(+), 16 deletions(-) diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 0fa48b4..68cbbdf 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -9,23 +9,62 @@ import 'package:loon/persistor/data_store_resolver.dart'; /// one unreadable file cannot fail hydration of the entire store. The data is /// preserved for inspection, the `.corrupt` suffix is ignored by the data store /// file listing, and the next persist for the partition writes a fresh file. +void _logHydrationFailure( + File file, + Logger logger, + Object error, + StackTrace stackTrace, +) { + logger.log( + 'Failed to hydrate ${file.path}, recovering without quarantine: ' + '$error\n$stackTrace', + ); +} + Future _recoverCorruptFile( File file, Logger logger, Object error, + StackTrace stackTrace, ) async { - logger.log('Failed to hydrate ${file.path}, quarantining as corrupt: $error'); + logger.log( + 'Failed to hydrate ${file.path}, quarantining as corrupt: ' + '$error\n$stackTrace', + ); + try { final quarantine = File('${file.path}.corrupt'); if (await quarantine.exists()) { await quarantine.delete(); } await file.rename(quarantine.path); - } catch (e) { - logger.log('Failed to quarantine ${file.path}: $e'); + } catch (error, stackTrace) { + logger.log('Failed to quarantine ${file.path}: $error\n$stackTrace'); } } +Json _decodeJsonObject(String value, String description) { + final decoded = jsonDecode(value); + if (decoded is! Json) { + throw FormatException('Expected $description to be a JSON object.'); + } + + return decoded; +} + +Json _readNestedJsonObject(Object? value, String description) { + if (value is! Json) { + throw FormatException('Expected $description to be a JSON object.'); + } + + return value; +} + +bool _isInvalidCipherTextException(Object error) { + // encrypt wraps PointyCastle without exposing its exception types. + return error.runtimeType.toString() == 'InvalidCipherTextException'; +} + class FileDataStoreConfig extends DataStoreConfig { FileDataStoreConfig( super.name, { @@ -37,22 +76,43 @@ class FileDataStoreConfig extends DataStoreConfig { hydrate: () async { try { final value = await file.readAsString(); - final json = jsonDecode( + final json = _decodeJsonObject( encrypted ? encrypter.decrypt(value) : value, + 'persisted data store', ); final store = ValueStore(); for (final entry in json.entries) { final resolverPath = entry.key; - final valueStore = ValueStore.fromJson(entry.value); + final valueStore = ValueStore.fromJson( + _readNestedJsonObject( + entry.value, + 'persisted data store entry "$resolverPath"', + ), + ); store.write(resolverPath, valueStore); } return store; } on PathNotFoundException { return null; - } catch (error) { - await _recoverCorruptFile(file, logger, error); + } on FileSystemException catch (error, stackTrace) { + _logHydrationFailure(file, logger, error, stackTrace); + return null; + } on FormatException catch (error, stackTrace) { + await _recoverCorruptFile(file, logger, error, stackTrace); + return null; + } on ArgumentError catch (error, stackTrace) { + if (!encrypted) { + rethrow; + } + await _recoverCorruptFile(file, logger, error, stackTrace); + return null; + } on Exception catch (error, stackTrace) { + if (!encrypted || !_isInvalidCipherTextException(error)) { + rethrow; + } + await _recoverCorruptFile(file, logger, error, stackTrace); return null; } }, @@ -81,11 +141,18 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig { hydrate: () async { try { return ValueRefStore( - jsonDecode(await file.readAsString())); + _decodeJsonObject( + await file.readAsString(), + 'persisted resolver', + ), + ); } on PathNotFoundException { return null; - } catch (error) { - await _recoverCorruptFile(file, logger, error); + } on FileSystemException catch (error, stackTrace) { + _logHydrationFailure(file, logger, error, stackTrace); + return null; + } on FormatException catch (error, stackTrace) { + await _recoverCorruptFile(file, logger, error, stackTrace); return null; } }, diff --git a/test/native/file_persistor_test.dart b/test/native/file_persistor_test.dart index c1b7d36..e21ccfa 100644 --- a/test/native/file_persistor_test.dart +++ b/test/native/file_persistor_test.dart @@ -17,12 +17,12 @@ late Directory testDirectory; class MockPathProvider extends Fake with MockPlatformInterfaceMixin implements PathProviderPlatform { - getApplicationDocumentsDirectory() { + Directory getApplicationDocumentsDirectory() { return testDirectory; } @override - getApplicationDocumentsPath() async { + Future getApplicationDocumentsPath() async { return testDirectory.path; } } @@ -60,9 +60,14 @@ void main() { group('FilePersistor fault recovery', () { final loonDir = Directory('${testDirectory.path}/loon'); - // FlutterSecureStorage does not work in tests, so supply a fixed encrypter. + // FlutterSecureStorage does not work in tests, so use a deterministic in-process key. final encrypter = DataStoreEncrypter( - Encrypter(AES(Key.fromSecureRandom(32), mode: AESMode.cbc)), + Encrypter( + AES( + Key.fromUtf8('0123456789abcdef0123456789abcdef'), + mode: AESMode.cbc, + ), + ), ); FilePersistor newPersistor() => FilePersistor( @@ -92,7 +97,9 @@ void main() { jsonEncode({ "": { "users": { - "__values": {"1": {"name": "User 1"}} + "__values": { + "1": {"name": "User 1"} + } } } }), @@ -119,12 +126,44 @@ void main() { ); }); + test('A corrupt encrypted data store file does not fail hydration', + () async { + await File('${loonDir.path}/__store__.json').writeAsString( + jsonEncode({ + "": { + "users": { + "__values": { + "1": {"name": "User 1"} + } + } + } + }), + ); + await File('${loonDir.path}/__store__.encrypted.json') + .writeAsString('not encrypted'); + + Loon.configure(persistor: newPersistor()); + + await Loon.hydrate(); + + expect( + Loon.collection('users').doc('1').get()?.data, + {"name": "User 1"}, + ); + expect( + await File('${loonDir.path}/__store__.encrypted.json.corrupt').exists(), + true, + ); + }); + test('A corrupt resolver file does not fail hydration', () async { await File('${loonDir.path}/__store__.json').writeAsString( jsonEncode({ "": { "users": { - "__values": {"1": {"name": "User 1"}} + "__values": { + "1": {"name": "User 1"} + } } } }), From bf5efe20ca382a2cc89d364160b2a8210ce1682d Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 09:59:09 -0400 Subject: [PATCH 4/9] fix: catch cipher failures explicitly --- example/pubspec.lock | 4 ++-- .../file_persistor/file_data_store_config.dart | 15 ++++----------- pubspec.yaml | 1 + 3 files changed, 7 insertions(+), 13 deletions(-) diff --git a/example/pubspec.lock b/example/pubspec.lock index 7d7c442..cae909c 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -339,10 +339,10 @@ packages: dependency: transitive description: name: pointycastle - sha256: "7c1e5f0d23c9016c5bbd8b1473d0d3fb3fc851b876046039509e18e0c7485f2c" + sha256: "4be0097fcf3fd3e8449e53730c631200ebc7b88016acecab2b0da2f0149222fe" url: "https://pub.dev" source: hosted - version: "3.7.3" + version: "3.9.1" process: dependency: transitive description: diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 64f575f..9bb632b 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -3,6 +3,7 @@ import 'dart:io'; import 'package:loon/loon.dart'; import 'package:loon/persistor/data_store.dart'; import 'package:loon/persistor/data_store_resolver.dart'; +import 'package:pointycastle/api.dart' show InvalidCipherTextException; void _logHydrationFailure( File file, @@ -60,11 +61,6 @@ Json _readNestedJsonObject(Object? value, String description) { return value; } -bool _isInvalidCipherTextException(Object error) { - // encrypt wraps PointyCastle without exposing its exception types. - return error.runtimeType.toString() == 'InvalidCipherTextException'; -} - /// Writes [contents] to [file] atomically: the data is written to a sibling /// temporary file, flushed to disk, then renamed over the target. A rename on /// the same filesystem is atomic, so an interrupted write (crash, OOM kill, @@ -141,14 +137,11 @@ class FileDataStoreConfig extends DataStoreConfig { } on FormatException catch (error, stackTrace) { await _recoverCorruptFile(file, logger, error, stackTrace); return null; - } on ArgumentError catch (error, stackTrace) { - if (!encrypted) { - rethrow; - } + } on InvalidCipherTextException catch (error, stackTrace) { await _recoverCorruptFile(file, logger, error, stackTrace); return null; - } on Exception catch (error, stackTrace) { - if (!encrypted || !_isInvalidCipherTextException(error)) { + } on ArgumentError catch (error, stackTrace) { + if (!encrypted) { rethrow; } await _recoverCorruptFile(file, logger, error, stackTrace); diff --git a/pubspec.yaml b/pubspec.yaml index 49761fc..5ac6285 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -14,6 +14,7 @@ dependencies: flutter_secure_storage: ^10.0.0 path: ^1.8.3 path_provider: ^2.1.5 + pointycastle: ^3.9.1 web: ^1.1.0 sqflite: ^2.4.1 sqflite_common_ffi: ^2.3.4 From db1c574ac251c12642ca7d5f9a6909c8f3ae2a76 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 10:47:40 -0400 Subject: [PATCH 5/9] test: use fake async for timing-sensitive tests --- test/core/broadcast_timing_test.dart | 30 +- test/core/loon_test.dart | 2284 +++++++++-------- .../observable_query_equivalence_test.dart | 84 +- test/core/persistor/lock_test.dart | 166 +- test/core/persistor/persist_manager_test.dart | 299 ++- test/utils.dart | 14 +- 6 files changed, 1530 insertions(+), 1347 deletions(-) diff --git a/test/core/broadcast_timing_test.dart b/test/core/broadcast_timing_test.dart index 9b3e6b6..4e4df25 100644 --- a/test/core/broadcast_timing_test.dart +++ b/test/core/broadcast_timing_test.dart @@ -2,6 +2,8 @@ import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; +import '../utils.dart'; + /// Deterministic tests for broadcast batching, coalescing, and ordering. /// /// Loon schedules a broadcast on a zero-duration timer so that all writes in a @@ -16,12 +18,6 @@ import 'package:loon/loon.dart'; /// (A dedicated file runs in its own test isolate, so the global store starts /// clean and these virtual-time tests are isolated from the rest of the suite.) -/// Advances virtual time past the broadcast's zero-duration timer and drains -/// the microtasks that deliver stream events. -void _flush(FakeAsync async) { - async.elapse(const Duration(milliseconds: 1)); -} - void _reset(FakeAsync async) { Loon.unsubscribe(); Loon.clearAll(broadcast: false); @@ -39,12 +35,12 @@ void main() { final sub = col .stream() .listen((snaps) => emissions.add([for (final s in snaps) s.data])); - _flush(async); // initial emission + flushBroadcasts(async); // initial emission col.doc('1').create(1); col.doc('2').create(2); col.doc('3').create(3); - _flush(async); + flushBroadcasts(async); // One emission for the initial value and exactly one for the batch. expect(emissions.length, 2); @@ -62,11 +58,11 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); doc.update(2); - _flush(async); + flushBroadcasts(async); // The create and update collapse into a single emission of the final value. expect(emissions, [null, 2]); @@ -83,14 +79,14 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); - _flush(async); + flushBroadcasts(async); doc.update(2); - _flush(async); + flushBroadcasts(async); doc.update(3); - _flush(async); + flushBroadcasts(async); expect(emissions, [null, 1, 2, 3]); @@ -106,12 +102,12 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); - _flush(async); + flushBroadcasts(async); doc.update(1); // same value - _flush(async); + flushBroadcasts(async); // No emission for the no-op update. expect(emissions, [null, 1]); diff --git a/test/core/loon_test.dart b/test/core/loon_test.dart index 7fe885c..ff11ca0 100644 --- a/test/core/loon_test.dart +++ b/test/core/loon_test.dart @@ -1,3 +1,4 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; @@ -15,8 +16,9 @@ void main() { }); tearDown(() async { + await Future.delayed(Duration.zero); Loon.unsubscribe(); - await Loon.clearAll(); + await Loon.clearAll(broadcast: false); }); group( @@ -408,34 +410,39 @@ void main() { ); }); - test('Skips broadcasting unchanged data by default', () async { - final userDoc = Loon.collection('users').doc('1'); - final user = TestUserModel('User 1'); - final updatedUser = TestUserModel('User 1 updated'); + test('Skips broadcasting unchanged data by default', () { + fakeAsync((async) { + final userDoc = Loon.collection('users').doc('1'); + final user = TestUserModel('User 1'); + final updatedUser = TestUserModel('User 1 updated'); - userDoc.create(user); + userDoc.create(user); - await asyncEvent(); + flushBroadcasts(async); - final stream = userDoc.stream(); + final events = []; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - userDoc.update(user); + userDoc.update(user); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(updatedUser); + userDoc.update(updatedUser); - await asyncEvent(); + flushBroadcasts(async); - expectLater( - stream, - emitsInOrder( + expect( + events, [ DocumentSnapshot(doc: userDoc, data: user), DocumentSnapshot(doc: userDoc, data: updatedUser), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); @@ -658,28 +665,33 @@ void main() { () { test( 'rebroadcasts the document to its stream listeners', - () async { - final usersCollection = Loon.collection('users'); - final userDoc = usersCollection.doc('1'); - - final stream = userDoc.stream(); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final userDoc = usersCollection.doc('1'); - userDoc.create('Test'); + final events = []; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - await asyncEvent(); + userDoc.create('Test'); + flushBroadcasts(async); - userDoc.rebroadcast(); + userDoc.rebroadcast(); + flushBroadcasts(async); - expectLater( - stream, - emitsInOrder( + expect( + events, [ null, DocumentSnapshot(doc: userDoc, data: 'Test'), DocumentSnapshot(doc: userDoc, data: 'Test'), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -690,54 +702,59 @@ void main() { () { test( "Rebuilds the document's dependencies", - () async { - bool hasDependencies = false; - - final usersCollection = Loon.collection( - 'users', - dependenciesBuilder: (snap) { - if (hasDependencies) { - return { - Loon.collection('friends').doc(snap.id), - }; - } - - return {}; - }, - ); - - final userDoc = usersCollection.doc('1'); - userDoc.create('User 1'); + () { + fakeAsync((async) { + bool hasDependencies = false; + + final usersCollection = Loon.collection( + 'users', + dependenciesBuilder: (snap) { + if (hasDependencies) { + return { + Loon.collection('friends').doc(snap.id), + }; + } - await asyncEvent(); + return {}; + }, + ); - final stream = userDoc.stream(); + final userDoc = usersCollection.doc('1'); + userDoc.create('User 1'); + flushBroadcasts(async); - final friendDoc = Loon.collection('friends').doc(userDoc.id); - // This event is ignored since the user does not yet depend on the friend document. - friendDoc.create('Friend 1'); + final events = ?>[]; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - userDoc.update('User 1 updated'); + final friendDoc = + Loon.collection('friends').doc(userDoc.id); + // This event is ignored since the user does not yet depend on the friend document. + friendDoc.create('Friend 1'); - await asyncEvent(); + userDoc.update('User 1 updated'); + flushBroadcasts(async); - hasDependencies = true; + hasDependencies = true; - userDoc.rebuildDependencies(); + userDoc.rebuildDependencies(); - // This event causes a rebroadcast of the user, as they now depend on the friend document. - friendDoc.update('Friend 1 updated'); + // This event causes a rebroadcast of the user, as they now depend on the friend document. + friendDoc.update('Friend 1 updated'); + flushBroadcasts(async); - expect( - stream, - emitsInOrder( + expect( + events, [ DocumentSnapshot(doc: userDoc, data: 'User 1'), DocumentSnapshot(doc: userDoc, data: 'User 1 updated'), DocumentSnapshot(doc: userDoc, data: 'User 1 updated'), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -751,31 +768,35 @@ void main() { group( 'stream', () { - test('Returns a stream of document snapshots', () async { - final user = TestUserModel('User 1'); - final userUpdated = TestUserModel('User 1 updated'); - final userDoc = TestUserModel.store.doc('1'); - final stream = userDoc.stream(); + test('Returns a stream of document snapshots', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userUpdated = TestUserModel('User 1 updated'); + final userDoc = TestUserModel.store.doc('1'); + final events = ?>[]; + final sub = userDoc.stream().listen(events.add); - await asyncEvent(); - userDoc.create(user); - await asyncEvent(); - userDoc.update(userUpdated); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + flushBroadcasts(async); + userDoc.update(userUpdated); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); - final events = await stream.take(4).toList(); + expect( + events, + [ + null, + DocumentSnapshot(doc: userDoc, data: user), + DocumentSnapshot(doc: userDoc, data: userUpdated), + null, + ], + ); - expect( - events, - [ - null, - DocumentSnapshot(doc: userDoc, data: user), - DocumentSnapshot(doc: userDoc, data: userUpdated), - null, - ], - ); + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); @@ -783,149 +804,158 @@ void main() { group( 'streamChanges', () { - test('Returns a stream of document change snapshots', () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); - final userObs = userDoc.observe(); - final future = userObs.streamChanges().take(2).toList(); + test('Returns a stream of document change snapshots', () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); + final userObs = userDoc.observe(); + final events = >[]; + final sub = userObs.streamChanges().listen(events.add); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); - userDoc.update(userDataUpdated); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + userDoc.update(userDataUpdated); + flushBroadcasts(async); - final events = await future; + expect( + events, + [ + DocumentChangeSnapshot( + doc: userObs, + data: userData, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userObs, + data: userDataUpdated, + prevData: userData, + event: BroadcastEvents.modified, + ), + ], + ); - expect( - events, - [ - DocumentChangeSnapshot( - doc: userObs, - data: userData, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userObs, - data: userDataUpdated, - prevData: userData, - event: BroadcastEvents.modified, - ), - ], - ); + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); test( "Maintains its dependency cache correctly", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['userId'] != null) { - return { - usersCollection.doc(snap.data['userId'].toString()), - }; - } - return null; - }, - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['userId'] != null) { + return { + usersCollection.doc(snap.data['userId'].toString()), + }; + } + return null; + }, + ); - final postDoc = postsCollection.doc('1'); - final postData = {"id": 1, "text": "Post 1", "userId": 1}; - final postData2 = {"id": 1, "text": "Post 1 updated"}; - final postData3 = { - "id": 1, - "text": "Post 1 updated", - "userId": 3 - }; - final postObs = postDoc.observe(); + final postDoc = postsCollection.doc('1'); + final postData = {"id": 1, "text": "Post 1", "userId": 1}; + final postData2 = {"id": 1, "text": "Post 1 updated"}; + final postData3 = { + "id": 1, + "text": "Post 1 updated", + "userId": 3 + }; + final postObs = postDoc.observe(); - postDoc.create(postData); - await asyncEvent(); + postDoc.create(postData); + flushBroadcasts(async); - // After creating the post document, it should have added its user dependency into the observable's - // dep tree. - expect(postObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + // After creating the post document, it should have added its user dependency into the observable's + // dep tree. + expect(postObs.inspect(), { + "deps": { "__ref": 1, - "1": 1, + "users": { + "__ref": 1, + "1": 1, + }, }, - }, - }); + }); - postDoc.update(postData2); - await asyncEvent(); + postDoc.update(postData2); + flushBroadcasts(async); - // After updating the document, it should have removed the user dependency from the observable's - // dep tree. - expect(postObs.inspect(), { - "deps": {}, - }); + // After updating the document, it should have removed the user dependency from the observable's + // dep tree. + expect(postObs.inspect(), { + "deps": {}, + }); - postDoc.update(postData3); - await asyncEvent(); + postDoc.update(postData3); + flushBroadcasts(async); - // The observable should have been updated to have a user dependency again. - expect(postObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + // The observable should have been updated to have a user dependency again. + expect(postObs.inspect(), { + "deps": { "__ref": 1, - "3": 1, + "users": { + "__ref": 1, + "3": 1, + }, }, - }, - }); + }); - postsCollection.delete(); - await asyncEvent(); + postsCollection.delete(); + flushBroadcasts(async); - // After deleting the posts collection, observable should have cleared its dependencies. - expect(postObs.inspect(), {"deps": {}}); + // After deleting the posts collection, observable should have cleared its dependencies. + expect(postObs.inspect(), {"deps": {}}); + }); }, ); test( 'Invalidates its cached value when the document is updated', - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); - final userDoc = usersCollection.doc('1'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); + final userDoc = usersCollection.doc('1'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); - userDoc.create(userData); - final userObs = userDoc.observe(); + userDoc.create(userData); + final userObs = userDoc.observe(); - await asyncEvent(); + flushBroadcasts(async); - DocumentSnapshot? snap = userObs.get(); + DocumentSnapshot? snap = userObs.get(); - // The observable's value is cached and is not recomputed unless invalidated by - // a change to its document. - expect(identical(snap, userObs.get()), true); - expect(snap, DocumentSnapshot(doc: userDoc, data: userData)); + // The observable's value is cached and is not recomputed unless invalidated by + // a change to its document. + expect(identical(snap, userObs.get()), true); + expect(snap, DocumentSnapshot(doc: userDoc, data: userData)); - userDoc.update(userDataUpdated); - snap = userObs.get(); + userDoc.update(userDataUpdated); + snap = userObs.get(); - // The updated document has not been broadcast yet, however the observable's cached - // value has been invalidated by the update to its documents and should return - // an up-to-date recalculated value. - expect( - snap, - DocumentSnapshot(doc: userDoc, data: userDataUpdated), - ); + // The updated document has not been broadcast yet, however the observable's cached + // value has been invalidated by the update to its documents and should return + // an up-to-date recalculated value. + expect( + snap, + DocumentSnapshot(doc: userDoc, data: userDataUpdated), + ); + flushBroadcasts(async); + }); }, ); }, @@ -1024,108 +1054,106 @@ void main() { test( 'Broadcasts the delete to observers of the collection and subcollections', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - final userData = TestUserModel('User 1'); - final userData2 = TestUserModel('User 2'); - - final friendDoc = userDoc - .subcollection( - 'friends', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - ) - .doc('2'); - - userDoc.create(userData); - friendDoc.create(userData2); - userDoc2.create(userData2); - - final userDocStream = userDoc.stream(); - final userCollectionStream = TestUserModel.store.stream(); - final friendDocStream = friendDoc.stream(); - final friendCollectionStream = - userDoc.subcollection('friends').stream(); - - TestUserModel.store.delete(); - - final userDocData = await userDocStream.take(2).toList(); - final userCollectionData = - await userCollectionStream.take(2).toList(); - final friendDocData = await friendDocStream.take(2).toList(); - final friendCollectionData = - await friendCollectionStream.take(2).toList(); - - expect( - userDocData, - [ - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - null, - ], - ); - - expect( - userCollectionData, - [ + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); + + final userData = TestUserModel('User 1'); + final userData2 = TestUserModel('User 2'); + + final friendDoc = userDoc + .subcollection( + 'friends', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + ) + .doc('2'); + + userDoc.create(userData); + friendDoc.create(userData2); + userDoc2.create(userData2); + flushBroadcasts(async); + + final userDocData = ?>[]; + final userCollectionData = + >>[]; + final friendDocData = ?>[]; + final friendCollectionData = + >>[]; + final userDocSub = userDoc.stream().listen(userDocData.add); + final userCollectionSub = TestUserModel.store + .stream() + .listen(userCollectionData.add); + final friendDocSub = + friendDoc.stream().listen(friendDocData.add); + final friendCollectionSub = userDoc + .subcollection('friends') + .stream() + .listen(friendCollectionData.add); + flushBroadcasts(async); + + TestUserModel.store.delete(); + flushBroadcasts(async); + + expect( + userDocData, [ DocumentSnapshot( doc: userDoc, data: userData, ), - DocumentSnapshot( - doc: userDoc2, - data: userData2, - ), + null, ], - [], - ], - ); - - expect( - friendDocData, - [ - DocumentSnapshot( - doc: friendDoc, - data: userData2, - ), - null, - ], - ); + ); - expect( - userCollectionData, - [ + expect( + userCollectionData, [ - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - DocumentSnapshot( - doc: userDoc2, - data: userData2, - ), + [ + DocumentSnapshot( + doc: userDoc, + data: userData, + ), + DocumentSnapshot( + doc: userDoc2, + data: userData2, + ), + ], + [], ], - [], - ], - ); + ); - expect( - friendCollectionData, - [ + expect( + friendDocData, [ DocumentSnapshot( doc: friendDoc, data: userData2, ), + null, ], - [], - ], - ); + ); + + expect( + friendCollectionData, + [ + [ + DocumentSnapshot( + doc: friendDoc, + data: userData2, + ), + ], + [], + ], + ); + + userDocSub.cancel(); + userCollectionSub.cancel(); + friendDocSub.cancel(); + friendCollectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -1198,81 +1226,90 @@ void main() { test( "Broadcasts the expected change events", - () async { - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final userData = TestUserModel('User 1'); - final userData2 = TestUserModel('User 2'); + final userData = TestUserModel('User 1'); + final userData2 = TestUserModel('User 2'); - final querySnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final querySnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(querySnaps.add); - userDoc.create(userData); - userDoc2.create(userData2); + userDoc.create(userData); + userDoc2.create(userData2); - await asyncEvent(); + flushBroadcasts(async); - final updatedUser2 = TestUserModel('User 2 updated'); - final userDoc3 = TestUserModel.store.doc('3'); - final userData3 = TestUserModel('User 3'); + final updatedUser2 = TestUserModel('User 2 updated'); + final userDoc3 = TestUserModel.store.doc('3'); + final userData3 = TestUserModel('User 3'); - TestUserModel.store.replace([ - DocumentSnapshot( - doc: userDoc2, - data: updatedUser2, - ), - DocumentSnapshot( - doc: userDoc3, - data: userData3, - ), - ]); + TestUserModel.store.replace([ + DocumentSnapshot( + doc: userDoc2, + data: updatedUser2, + ), + DocumentSnapshot( + doc: userDoc3, + data: userData3, + ), + ]); + flushBroadcasts(async); - expect( - await querySnaps, - [ + expect( + querySnaps, [ - DocumentChangeSnapshot( - doc: userDoc, - data: userData, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: userData2, - event: BroadcastEvents.added, - prevData: null, - ), + [ + DocumentChangeSnapshot( + doc: userDoc, + data: userData, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: userData2, + event: BroadcastEvents.added, + prevData: null, + ), + ], + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + event: BroadcastEvents.removed, + prevData: userData, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + event: BroadcastEvents.removed, + prevData: userData2, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: updatedUser2, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc3, + data: userData3, + event: BroadcastEvents.added, + prevData: null, + ), + ] ], - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - event: BroadcastEvents.removed, - prevData: userData, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - event: BroadcastEvents.removed, - prevData: userData2, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: updatedUser2, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc3, - data: userData3, - event: BroadcastEvents.added, - prevData: null, - ), - ] - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -1315,532 +1352,581 @@ void main() { group('stream', () { test( 'Returns a stream of updates to the query', - () async { - final user = TestUserModel('User 1'); - final userUpdated = TestUserModel('User 1 updated'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - final queryStream = TestUserModel.store - .where((snap) => snap.id == '1') - .stream(); + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userUpdated = TestUserModel('User 1 updated'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - await asyncEvent(); - userDoc.create(user); - userDoc2.create(user2); + final querySnaps = >>[]; + final sub = TestUserModel.store + .where((snap) => snap.id == '1') + .stream() + .listen(querySnaps.add); - await asyncEvent(); - userDoc.update(userUpdated); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - userDoc.create(user); - await asyncEvent(); - TestUserModel.store.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + userDoc2.create(user2); - final querySnaps = await queryStream.take(6).toList(); + flushBroadcasts(async); + userDoc.update(userUpdated); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + userDoc.create(user); + flushBroadcasts(async); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - querySnaps, - [ - // No data - [], - // User 1 created - [ - DocumentSnapshot(doc: userDoc, data: user), - ], - // User 1 updated - [ - DocumentSnapshot(doc: userDoc, data: userUpdated), - ], - // User 1 deleted - [], - // User 1 recreated + expect( + querySnaps, [ - DocumentSnapshot(doc: userDoc, data: user), + // No data + [], + // User 1 created + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + // User 1 updated + [ + DocumentSnapshot(doc: userDoc, data: userUpdated), + ], + // User 1 deleted + [], + // User 1 recreated + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + // User collection deleted + [], ], - // User collection deleted - [], - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); test( - 'Handles the scenario where a collection is removed and written to in the same task', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - userDoc.create(user); - userDoc2.create(user2); + 'Handles the scenario where a collection is removed and written to in the same task', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - await asyncEvent(); + userDoc.create(user); + userDoc2.create(user2); - final queryStream = TestUserModel.store.stream().take(2); + flushBroadcasts(async); - await asyncEvent(); + final querySnaps = >>[]; + final sub = + TestUserModel.store.stream().listen(querySnaps.add); + flushBroadcasts(async); - TestUserModel.store.delete(); - userDoc.create(user); + TestUserModel.store.delete(); + userDoc.create(user); + flushBroadcasts(async); - final querySnaps = await queryStream.toList(); + expect( + querySnaps, + [ + [ + DocumentSnapshot(doc: userDoc, data: user), + DocumentSnapshot(doc: userDoc2, data: user2), + ], + // Rather than emitting an empty result set when the store is deleted, + // the stream batches changes in the same task together and emits a single + // update with the re-created user. + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + ], + ); - expect( - querySnaps, - [ - [ - DocumentSnapshot(doc: userDoc, data: user), - DocumentSnapshot(doc: userDoc2, data: user2), - ], - // Rather than emitting an empty result set when the store is deleted, - // the stream batches changes in the same task together and emits a single - // update with the re-created user. - [ - DocumentSnapshot(doc: userDoc, data: user), - ], - ], - ); - }); + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); }); group( 'streamChanges', () { - test('Returns a stream of changes to the query', () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - final updatedUser = TestUserModel('User 1 updated'); + test('Returns a stream of changes to the query', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); + final updatedUser = TestUserModel('User 1 updated'); - final events = - TestUserModel.store.streamChanges().take(4).toList(); + final events = + >>[]; + final sub = + TestUserModel.store.streamChanges().listen(events.add); - await asyncEvent(); - userDoc.create(user); - userDoc2.create(user2); - await asyncEvent(); - userDoc.update(updatedUser); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - TestUserModel.store.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + userDoc2.create(user2); + flushBroadcasts(async); + userDoc.update(updatedUser); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - await events, - [ - // Create documents - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - event: BroadcastEvents.added, - prevData: null, - ), - ], - // Update document - [ - DocumentChangeSnapshot( - doc: userDoc, - data: updatedUser, - event: BroadcastEvents.modified, - prevData: user, - ), - ], - // Remove document + expect( + events, [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - event: BroadcastEvents.removed, - prevData: updatedUser, - ), + // Create documents + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + event: BroadcastEvents.added, + prevData: null, + ), + ], + // Update document + [ + DocumentChangeSnapshot( + doc: userDoc, + data: updatedUser, + event: BroadcastEvents.modified, + prevData: user, + ), + ], + // Remove document + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + event: BroadcastEvents.removed, + prevData: updatedUser, + ), + ], + // Delete user collection + [ + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + event: BroadcastEvents.removed, + prevData: user2, + ), + ] ], - // Delete user collection - [ - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - event: BroadcastEvents.removed, - prevData: user2, - ), - ] - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); - test('Localizes broadcast event types to the query', () async { - final user = TestUserModel('User 1'); - final userDoc = TestUserModel.store.doc('1'); + test('Localizes broadcast event types to the query', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userDoc = TestUserModel.store.doc('1'); - final changeSnaps = TestUserModel.store - .where((snap) => snap.data.name == 'User 1 updated') - .streamChanges() - .take(1) - .toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .where((snap) => snap.data.name == 'User 1 updated') + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - await asyncEvent(); - final updatedUser = TestUserModel('User 1 updated'); - userDoc.update(updatedUser); + userDoc.create(user); + flushBroadcasts(async); + final updatedUser = TestUserModel('User 1 updated'); + userDoc.update(updatedUser); + flushBroadcasts(async); - expect( - await changeSnaps, - [ + expect( + changeSnaps, [ - DocumentChangeSnapshot( - doc: userDoc, - data: updatedUser, - // The global event is a [BroadcastBroadcastEvents.modified] when the user is updated, - // but to this query, it should be a [BroadcastBroadcastEvents.added] event since previously - // it was not included and now it is. - event: BroadcastEvents.added, - prevData: null, - ), + [ + DocumentChangeSnapshot( + doc: userDoc, + data: updatedUser, + // The global event is a [BroadcastBroadcastEvents.modified] when the user is updated, + // but to this query, it should be a [BroadcastBroadcastEvents.added] event since previously + // it was not included and now it is. + event: BroadcastEvents.added, + prevData: null, + ), + ], ], - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); test( - 'Handles the scenario where a collection is removed and written to in the same task', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + 'Handles the scenario where a collection is removed and written to in the same task', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final changeSnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - userDoc2.create(user2); + userDoc.create(user); + userDoc2.create(user2); - await asyncEvent(); + flushBroadcasts(async); - TestUserModel.store.delete(); - userDoc.create(user); + TestUserModel.store.delete(); + userDoc.create(user); + flushBroadcasts(async); - expect( - await changeSnaps, - [ - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - prevData: null, - event: BroadcastEvents.added, - ) - ], - // The deletion of the collection and the re-creation of the document - // are batched together into a single change event that includes the correct events - // for each document. - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - prevData: user, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - prevData: user2, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - ], - ], - ); - }); + expect( + changeSnaps, + [ + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + prevData: null, + event: BroadcastEvents.added, + ) + ], + // The deletion of the collection and the re-creation of the document + // are batched together into a single change event that includes the correct events + // for each document. + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + prevData: user, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + prevData: user2, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + ], + ], + ); + + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); test( - 'Discards changes that occur to a collection in the same task as a subsequent deletion of that collection', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + 'Discards changes that occur to a collection in the same task as a subsequent deletion of that collection', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final changeSnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - userDoc2.create(user2); + userDoc.create(user); + userDoc2.create(user2); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(TestUserModel('User 1 updated')); - TestUserModel.store.delete(); + userDoc.update(TestUserModel('User 1 updated')); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - await changeSnaps, - [ - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - prevData: null, - event: BroadcastEvents.added, - ) - ], - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - prevData: user, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - prevData: user2, - event: BroadcastEvents.removed, - ), - ], - ], - ); - }); + expect( + changeSnaps, + [ + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + prevData: null, + event: BroadcastEvents.added, + ) + ], + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + prevData: user, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + prevData: user2, + event: BroadcastEvents.removed, + ), + ], + ], + ); + + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); }, ); test( "Maintains its dependency/snapshot caches correctly", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['userId'] != null) { - return { - usersCollection.doc(snap.data['userId'].toString()), - }; - } - return null; - }, - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['userId'] != null) { + return { + usersCollection.doc(snap.data['userId'].toString()), + }; + } + return null; + }, + ); - final postDoc = postsCollection.doc('1'); - final post1Data = {"id": 1, "text": "Post 1", "userId": 1}; - final post1Data2 = {"id": 1, "text": "Post 1 updated"}; - final post1Data3 = { - "id": 1, - "text": "Post 1 updated", - "userId": 3 - }; - final postDoc2 = postsCollection.doc('2'); - final post2Data = {"id": 2, "text": "Post 2", "userId": 2}; - final userDoc = usersCollection.doc('1'); - final userDoc2 = usersCollection.doc('2'); - final userDoc3 = usersCollection.doc('3'); - final postsObs = postsCollection.toQuery().observe(); - - postDoc.create(post1Data); - await asyncEvent(); - - // After creating the post document, the query should have a global and document level - // dependency. - expect(postsObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + final postDoc = postsCollection.doc('1'); + final post1Data = {"id": 1, "text": "Post 1", "userId": 1}; + final post1Data2 = {"id": 1, "text": "Post 1 updated"}; + final post1Data3 = { + "id": 1, + "text": "Post 1 updated", + "userId": 3 + }; + final postDoc2 = postsCollection.doc('2'); + final post2Data = {"id": 2, "text": "Post 2", "userId": 2}; + final userDoc = usersCollection.doc('1'); + final userDoc2 = usersCollection.doc('2'); + final userDoc3 = usersCollection.doc('3'); + final postsObs = postsCollection.toQuery().observe(); + + postDoc.create(post1Data); + flushBroadcasts(async); + + // After creating the post document, the query should have a global and document level + // dependency. + expect(postsObs.inspect(), { + "deps": { "__ref": 1, - "1": 1, + "users": { + "__ref": 1, + "1": 1, + }, }, - }, - "docDeps": { - postDoc: { - userDoc, + "docDeps": { + postDoc: { + userDoc, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), + } + }); - postDoc.update(post1Data2); - await asyncEvent(); + postDoc.update(post1Data2); + flushBroadcasts(async); - // After updating the document, it should have removed the user dependency from the observable's - // dep tree and document level dependency cache. - expect(postsObs.inspect(), { - "deps": {}, - "docDeps": {}, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data2), - } - }); + // After updating the document, it should have removed the user dependency from the observable's + // dep tree and document level dependency cache. + expect(postsObs.inspect(), { + "deps": {}, + "docDeps": {}, + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data2), + } + }); - postDoc.update(post1Data); - postDoc2.create(post2Data); - await asyncEvent(); + postDoc.update(post1Data); + postDoc2.create(post2Data); + flushBroadcasts(async); - // After updating the first post to have a user dependency again, and creating a second - // post with another user dependency, the query's dependencies should have two entries. - expect(postsObs.inspect(), { - "deps": { - "__ref": 2, - "users": { + // After updating the first post to have a user dependency again, and creating a second + // post with another user dependency, the query's dependencies should have two entries. + expect(postsObs.inspect(), { + "deps": { "__ref": 2, - "1": 1, - "2": 1, - }, - }, - "docDeps": { - postDoc: { - userDoc, + "users": { + "__ref": 2, + "1": 1, + "2": 1, + }, }, - postDoc2: { - userDoc2, + "docDeps": { + postDoc: { + userDoc, + }, + postDoc2: { + userDoc2, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), - postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), + postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), + } + }); - postDoc.update(post1Data3); - await asyncEvent(); + postDoc.update(post1Data3); + flushBroadcasts(async); - // After updating the post to be dependent on a different user, the observable query - // should have removed the reference to the previous user (user 1) and replaced it with the - // dependency on user 3. - expect(postsObs.inspect(), { - "deps": { - "__ref": 2, - "users": { + // After updating the post to be dependent on a different user, the observable query + // should have removed the reference to the previous user (user 1) and replaced it with the + // dependency on user 3. + expect(postsObs.inspect(), { + "deps": { "__ref": 2, - "2": 1, - "3": 1, - }, - }, - "docDeps": { - postDoc: { - userDoc3, + "users": { + "__ref": 2, + "2": 1, + "3": 1, + }, }, - postDoc2: { - userDoc2, + "docDeps": { + postDoc: { + userDoc3, + }, + postDoc2: { + userDoc2, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data3), - postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data3), + postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), + } + }); - postsCollection.delete(); - await asyncEvent(); + postsCollection.delete(); + flushBroadcasts(async); - // After deleting the posts collection, the query should have cleared its global - // and document level dependencies. - expect( - postsObs.inspect(), - {"deps": {}, "docDeps": {}, "docSnaps": {}}, - ); + // After deleting the posts collection, the query should have cleared its global + // and document level dependencies. + expect( + postsObs.inspect(), + {"deps": {}, "docDeps": {}, "docSnaps": {}}, + ); + }); }, ); test( 'Invalidates its cached value when its collection is updated', - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); - final userDoc = usersCollection.doc('1'); - final userDoc2 = usersCollection.doc('2'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); - final user2Data = TestUserModel('User 2'); + final userDoc = usersCollection.doc('1'); + final userDoc2 = usersCollection.doc('2'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); + final user2Data = TestUserModel('User 2'); - userDoc.create(userData); - userDoc2.create(user2Data); - final usersObs = usersCollection.observe(); + userDoc.create(userData); + userDoc2.create(user2Data); + final usersObs = usersCollection.observe(); - await asyncEvent(); + flushBroadcasts(async); - List> snaps = usersObs.get(); + List> snaps = usersObs.get(); - // The observable's value is cached and is not recomputed unless invalidated by - // a change to its collection. - expect(identical(snaps, usersObs.get()), true); - expect( - snaps, - [ - DocumentSnapshot(doc: userDoc, data: userData), - DocumentSnapshot(doc: userDoc2, data: user2Data), - ], - ); + // The observable's value is cached and is not recomputed unless invalidated by + // a change to its collection. + expect(identical(snaps, usersObs.get()), true); + expect( + snaps, + [ + DocumentSnapshot(doc: userDoc, data: userData), + DocumentSnapshot(doc: userDoc2, data: user2Data), + ], + ); - userDoc.update(userDataUpdated); - snaps = usersObs.get(); + userDoc.update(userDataUpdated); + snaps = usersObs.get(); - // The updated document has not been broadcast yet, however the observable's cached - // value has been invalidated by the update to one of its documents and should return - // an up-to-date recalculated value. - expect(snaps, [ - DocumentSnapshot(doc: userDoc, data: userDataUpdated), - DocumentSnapshot(doc: userDoc2, data: user2Data), - ]); - // It should then cache that value until it is invalidated again. - expect(identical(snaps, usersObs.get()), true); + // The updated document has not been broadcast yet, however the observable's cached + // value has been invalidated by the update to one of its documents and should return + // an up-to-date recalculated value. + expect(snaps, [ + DocumentSnapshot(doc: userDoc, data: userDataUpdated), + DocumentSnapshot(doc: userDoc2, data: user2Data), + ]); + // It should then cache that value until it is invalidated again. + expect(identical(snaps, usersObs.get()), true); - // Deleting a document in the observable's collection should invalidate its cached value. - userDoc.delete(); + // Deleting a document in the observable's collection should invalidate its cached value. + userDoc.delete(); - snaps = usersObs.get(); - expect(snaps, [ - DocumentSnapshot(doc: userDoc2, data: user2Data), - ]); - expect(identical(snaps, usersObs.get()), true); + snaps = usersObs.get(); + expect(snaps, [ + DocumentSnapshot(doc: userDoc2, data: user2Data), + ]); + expect(identical(snaps, usersObs.get()), true); - // Deleting the observable's associated collection should invalidate its cached value. - usersCollection.delete(); - snaps = usersObs.get(); - expect(snaps, []); - expect(identical(snaps, usersObs.get()), true); + // Deleting the observable's associated collection should invalidate its cached value. + usersCollection.delete(); + snaps = usersObs.get(); + expect(snaps, []); + expect(identical(snaps, usersObs.get()), true); - userDoc.create(userData); - snaps = usersObs.get(); + userDoc.create(userData); + snaps = usersObs.get(); - await asyncEvent(); + flushBroadcasts(async); - // After waiting for the broadcast, the observable's cached value should not have changed - // from its previous recalculation when it was accessed with [get()]. The broadcast should have reused - // that value. - expect(identical(snaps, usersObs.get()), true); + // After waiting for the broadcast, the observable's cached value should not have changed + // from its previous recalculation when it was accessed with [get()]. The broadcast should have reused + // that value. + expect(identical(snaps, usersObs.get()), true); + }); }, ); @@ -1966,82 +2052,95 @@ void main() { test( 'Broadcasts the clear to observers', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); - final userDocStream = userDoc.stream(); - final userCollectionStream = TestUserModel.store.stream(); - - userDoc.create(userData); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); + final userDocStreamData = ?>[]; + final userCollectionStreamData = + >>[]; + final docSub = userDoc.stream().listen(userDocStreamData.add); + final collectionSub = TestUserModel.store + .stream() + .listen(userCollectionStreamData.add); + flushBroadcasts(async); - await asyncEvent(); + userDoc.create(userData); - Loon.clearAll(); + flushBroadcasts(async); - final userDocStreamData = await userDocStream.take(3).toList(); - final userCollectionStreamData = - await userCollectionStream.take(3).toList(); + Loon.clearAll(); - expect( - userDocStreamData, - [ - null, - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - null, - ], - ); + flushBroadcasts(async); - expect( - userCollectionStreamData, - [ - [], + expect( + userDocStreamData, [ + null, DocumentSnapshot( doc: userDoc, data: userData, ), + null, + ], + ); + + expect( + userCollectionStreamData, + [ + [], + [ + DocumentSnapshot( + doc: userDoc, + data: userData, + ), + ], + [], ], - [], - ], - ); + ); + + docSub.cancel(); + collectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); test( 'Does not broadcast the clear to observers when broadcast: false', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); - final docEvents = ?>[]; - final collectionEvents = - >>[]; - final docSub = userDoc.stream().listen(docEvents.add); - final collectionSub = - TestUserModel.store.stream().listen(collectionEvents.add); - addTearDown(() { - docSub.cancel(); - collectionSub.cancel(); - }); + final docEvents = ?>[]; + final collectionEvents = + >>[]; + final docSub = userDoc.stream().listen(docEvents.add); + final collectionSub = + TestUserModel.store.stream().listen(collectionEvents.add); + flushBroadcasts(async); - userDoc.create(userData); - await asyncEvent(); + userDoc.create(userData); + flushBroadcasts(async); - await Loon.clearAll(broadcast: false); - await asyncEvent(); + Loon.clearAll(broadcast: false); + flushBroadcasts(async); - expect(Loon.inspect()['store'], {}); - expect(docEvents, [ - null, - DocumentSnapshot(doc: userDoc, data: userData), - ]); - expect(collectionEvents, [ - [], - [DocumentSnapshot(doc: userDoc, data: userData)], - ]); + expect(Loon.inspect()['store'], {}); + expect(docEvents, [ + null, + DocumentSnapshot(doc: userDoc, data: userData), + ]); + expect(collectionEvents, [ + [], + [DocumentSnapshot(doc: userDoc, data: userData)], + ]); + + docSub.cancel(); + collectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -2309,81 +2408,9 @@ void main() { ); }); - test("Rebroadcasts an observable document on dependency changes", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['text'] == 'Post 1') { - return { - usersCollection.doc('1'), - }; - } - return {}; - }, - ); - - final postDoc = postsCollection.doc('1'); - final postData = {"id": 1, "text": "Post 1"}; - final updatedPostData1 = {"text": "Post 1 updated"}; - final userDoc = usersCollection.doc('1'); - final userData = {"id": 1, "name": "User 1"}; - final updatedUserData = {"id": 1, "name": "User 1 updated"}; - final postStream = postDoc.stream(); - - postDoc.create(postData); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - userDoc.update(updatedUserData); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - usersCollection.delete(); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - postDoc.update(updatedPostData1); - await asyncEvent(); - // Skips this update to user doc, since the last update to the post - // caused the user doc to be removed as a dependency. - userDoc.update(updatedUserData); - await asyncEvent(); - postDoc.delete(); - - final snaps = await postStream.take(10).toList(); - - expect(snaps, [ - // No post yet - null, - // Post created - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user created - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user updated - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user deleted - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user re-added (ensures dependencies remain across deletion/re-creation) - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user collection deleted - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user re-added - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast when post data is updated - DocumentSnapshot(doc: postDoc, data: updatedPostData1), - // Rebroadcast when post deleted - null, - ]); - }); - - test( - "Rebroadcasts an observable query on dependency changes", - () async { - final usersCollection = Loon.collection('users'); + test("Rebroadcasts an observable document on dependency changes", () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); final postsCollection = Loon.collection( 'posts', dependenciesBuilder: (snap) { @@ -2402,158 +2429,238 @@ void main() { final userDoc = usersCollection.doc('1'); final userData = {"id": 1, "name": "User 1"}; final updatedUserData = {"id": 1, "name": "User 1 updated"}; - final postsStream = postsCollection.stream(); + final snaps = ?>[]; + final sub = postDoc.stream().listen(snaps.add); postDoc.create(postData); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); userDoc.update(updatedUserData); - await asyncEvent(); + flushBroadcasts(async); userDoc.delete(); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); usersCollection.delete(); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); postDoc.update(updatedPostData1); - await asyncEvent(); + flushBroadcasts(async); // Skips this update to user doc, since the last update to the post // caused the user doc to be removed as a dependency. userDoc.update(updatedUserData); - await asyncEvent(); - postsCollection.delete(); - await asyncEvent(); - - final snaps = await postsStream.take(10).toList(); + flushBroadcasts(async); + postDoc.delete(); + flushBroadcasts(async); expect(snaps, [ // No post yet - [], + null, // Post created - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user created - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user updated - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user deleted - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user recreated (ensures dependencies remain across deletion/re-creation) - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user collection deleted - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user recreated - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when post updated - [DocumentSnapshot(doc: postDoc, data: updatedPostData1)], - // Rebroadcast posts when posts collection deleted - [], + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user created + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user updated + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user deleted + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user re-added (ensures dependencies remain across deletion/re-creation) + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user collection deleted + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user re-added + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast when post data is updated + DocumentSnapshot(doc: postDoc, data: updatedPostData1), + // Rebroadcast when post deleted + null, ]); + + sub.cancel(); + async.flushMicrotasks(); + }); + }); + + test( + "Rebroadcasts an observable query on dependency changes", + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['text'] == 'Post 1') { + return { + usersCollection.doc('1'), + }; + } + return {}; + }, + ); + + final postDoc = postsCollection.doc('1'); + final postData = {"id": 1, "text": "Post 1"}; + final updatedPostData1 = {"text": "Post 1 updated"}; + final userDoc = usersCollection.doc('1'); + final userData = {"id": 1, "name": "User 1"}; + final updatedUserData = {"id": 1, "name": "User 1 updated"}; + final snaps = >>[]; + final sub = postsCollection.stream().listen(snaps.add); + + postDoc.create(postData); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + userDoc.update(updatedUserData); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + usersCollection.delete(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + postDoc.update(updatedPostData1); + flushBroadcasts(async); + // Skips this update to user doc, since the last update to the post + // caused the user doc to be removed as a dependency. + userDoc.update(updatedUserData); + flushBroadcasts(async); + postsCollection.delete(); + flushBroadcasts(async); + + expect(snaps, [ + // No post yet + [], + // Post created + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user created + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user updated + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user deleted + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user recreated (ensures dependencies remain across deletion/re-creation) + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user collection deleted + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user recreated + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when post updated + [DocumentSnapshot(doc: postDoc, data: updatedPostData1)], + // Rebroadcast posts when posts collection deleted + [], + ]); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); - test("Cyclical dependencies do not cause infinite rebroadcasts", - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - dependenciesBuilder: (snap) { - return { - Loon.collection('posts').doc('1'), - }; - }, - ); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - return { - usersCollection.doc('1'), - }; - }, - ); + test("Cyclical dependencies do not cause infinite rebroadcasts", () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + dependenciesBuilder: (snap) { + return { + Loon.collection('posts').doc('1'), + }; + }, + ); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + return { + usersCollection.doc('1'), + }; + }, + ); - final userDoc = usersCollection.doc('1'); - final postDoc = postsCollection.doc('1'); - final userData = TestUserModel('Test user 1'); - final updatedUserData = TestUserModel('Test user 1 updated'); - final userObservable = userDoc.observe(); - final userStream = userObservable.stream(); + final userDoc = usersCollection.doc('1'); + final postDoc = postsCollection.doc('1'); + final userData = TestUserModel('Test user 1'); + final updatedUserData = TestUserModel('Test user 1 updated'); + final userObservable = userDoc.observe(); + final userEvents = ?>[]; + final sub = userObservable.stream().listen(userEvents.add); - userDoc.create(userData); + userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - { - "users": { - "__values": { - "1": { - postDoc, - }, + expect( + Loon.inspect()['dependencyStore'], + { + "users": { + "__values": { + "1": { + postDoc, + }, + } + }, + }, + ); + expect( + Loon.inspect()['dependentsStore'], + { + postDoc: { + userDoc, } }, - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - postDoc: { - userDoc, - } - }, - ); + ); - postDoc.create({ - "id": 1, - "name": "Post 1", - }); + postDoc.create({ + "id": 1, + "name": "Post 1", + }); - expect( - Loon.inspect()['dependencyStore'], - { - "users": { - "__values": { - "1": { - postDoc, + expect( + Loon.inspect()['dependencyStore'], + { + "users": { + "__values": { + "1": { + postDoc, + }, }, }, - }, - "posts": { - "__values": { - "1": { - userDoc, + "posts": { + "__values": { + "1": { + userDoc, + }, }, }, }, - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - postDoc: { - userDoc, + ); + expect( + Loon.inspect()['dependentsStore'], + { + postDoc: { + userDoc, + }, + userDoc: { + postDoc, + } }, - userDoc: { - postDoc, - } - }, - ); + ); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(updatedUserData); + userDoc.update(updatedUserData); - await asyncEvent(); + flushBroadcasts(async); - userObservable.dispose(); + userObservable.dispose(); + async.flushMicrotasks(); - expectLater( - userStream, - emitsInOrder([ + expect(userEvents, [ // First emits null when no user has been written. null, // Emits the initially created user. @@ -2564,38 +2671,39 @@ void main() { DocumentSnapshot(doc: userDoc, data: userData), // Emits the updated user. DocumentSnapshot(doc: userDoc, data: updatedUserData), - emitsDone, - ]), - ); + ]); + + sub.cancel(); + async.flushMicrotasks(); + }); }); - test("Deleting a collection clears its dependencies", () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - ); - final friendsCollection = Loon.collection( - 'friends', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - dependenciesBuilder: (snap) { - return { - usersCollection.doc(snap.doc.id), - }; - }, - ); + test("Deleting a collection clears its dependencies", () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + ); + final friendsCollection = Loon.collection( + 'friends', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + dependenciesBuilder: (snap) { + return { + usersCollection.doc(snap.doc.id), + }; + }, + ); - final userDoc = usersCollection.doc('1'); - final friendDoc = friendsCollection.doc('1'); - userDoc.create(TestUserModel('User 1')); - friendDoc.create(TestUserModel('Friend 1')); + final userDoc = usersCollection.doc('1'); + final friendDoc = friendsCollection.doc('1'); + userDoc.create(TestUserModel('User 1')); + friendDoc.create(TestUserModel('Friend 1')); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - { + expect(Loon.inspect()['dependencyStore'], { "friends": { "__values": { "1": { @@ -2603,43 +2711,43 @@ void main() { } } } - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - userDoc: { - friendDoc, - } - }, - ); + }); + expect( + Loon.inspect()['dependentsStore'], + { + userDoc: { + friendDoc, + } + }, + ); - friendsCollection.delete(); + friendsCollection.delete(); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - {}, - ); - expect( - Loon.inspect()['dependentsStore'], - { - // The dependents are not cleared when a collection is cleared, instead - // the dependents are lazily cleared when the dependent is updated. - userDoc: { - friendDoc, + expect( + Loon.inspect()['dependencyStore'], + {}, + ); + expect( + Loon.inspect()['dependentsStore'], + { + // The dependents are not cleared when a collection is cleared, instead + // the dependents are lazily cleared when the dependent is updated. + userDoc: { + friendDoc, + }, }, - }, - ); + ); - // Now that the user has been updated, it has cleared its friend dependent. - userDoc.update(TestUserModel('User 1 updated')); + // Now that the user has been updated, it has cleared its friend dependent. + userDoc.update(TestUserModel('User 1 updated')); - await asyncEvent(); + flushBroadcasts(async); - expect(Loon.inspect()['dependencyStore'], {}); - expect(Loon.inspect()['dependentsStore'], {}); + expect(Loon.inspect()['dependencyStore'], {}); + expect(Loon.inspect()['dependentsStore'], {}); + }); }); }); diff --git a/test/core/observable_query_equivalence_test.dart b/test/core/observable_query_equivalence_test.dart index a1815ea..3ef1757 100644 --- a/test/core/observable_query_equivalence_test.dart +++ b/test/core/observable_query_equivalence_test.dart @@ -1,4 +1,5 @@ import 'dart:math'; +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; @@ -24,11 +25,8 @@ import '../utils.dart'; /// so it can be compared as an ordered list; the unsorted variant is compared /// as a set since its order is unspecified. /// -/// Notes on determinism: a single long walk is used rather than many short -/// trials because resetting the global store between trials schedules broadcasts -/// that can race the next trial's observer. A 1ms settle (rather than a -/// zero-duration one) guarantees the broadcast's zero-duration timer has fired -/// and its microtask delivery completed before each comparison. +/// Tests run under fake time so the broadcast timer and its stream delivery are +/// flushed explicitly before each comparison. int _cmp(DocumentSnapshot a, DocumentSnapshot b) { final byValue = a.data.compareTo(b.data); @@ -41,21 +39,22 @@ List _ordered(List> snaps) => List _asSet(List> snaps) => _ordered(snaps)..sort(); -Future _reset() async { +void _reset(FakeAsync async) { Loon.unsubscribe(); // broadcast: false so the reset doesn't schedule a broadcast that could // race the next test's observer. - await Loon.clearAll(broadcast: false); - await asyncEvent(); + Loon.clearAll(broadcast: false); + async.flushMicrotasks(); } -Future _walk({ +void _walk({ + required FakeAsync async, required bool sorted, required int seed, required int threshold, int rounds = 400, -}) async { - await _reset(); +}) { + _reset(async); final r = Random(seed); final col = Loon.collection('items'); @@ -66,7 +65,7 @@ Future _walk({ final emissions = >>[]; final sub = obs.stream().listen(emissions.add); - await asyncEvent(); + flushBroadcasts(async); final present = {}; for (var round = 0; round < rounds; round++) { @@ -83,7 +82,7 @@ Future _walk({ } } - await asyncEvent(); + flushBroadcasts(async); final oracle = (sorted ? col.where(filter).sortBy(_cmp) : col.where(filter)).get(); @@ -97,33 +96,37 @@ Future _walk({ } } - await sub.cancel(); + sub.cancel(); + async.flushMicrotasks(); } void main() { - tearDown(_reset); + tearDown(() { + Loon.unsubscribe(); + Loon.clearAll(broadcast: false); + }); - test('Coalesced delete and recreate evicts a cached query result', () async { - await _reset(); + test('Coalesced delete and recreate evicts a cached query result', () { + fakeAsync((async) { + _reset(async); - final col = Loon.collection('items'); - final doc = col.doc('1'); - doc.create(5); - await asyncEvent(); + final col = Loon.collection('items'); + final doc = col.doc('1'); + doc.create(5); + flushBroadcasts(async); - final query = col.where((snap) => snap.data >= 4).observe(); - final emissions = >>[]; - final changes = >>[]; - final valueSub = query.stream().listen(emissions.add); - final changeSub = query.streamChanges().listen(changes.add); - await asyncEvent(); + final query = col.where((snap) => snap.data >= 4).observe(); + final emissions = >>[]; + final changes = >>[]; + final valueSub = query.stream().listen(emissions.add); + final changeSub = query.streamChanges().listen(changes.add); + flushBroadcasts(async); - try { expect(_ordered(emissions.last), ['1=5']); doc.delete(); doc.create(0); - await asyncEvent(); + flushBroadcasts(async); expect(_ordered(emissions.last), isEmpty); expect(changes, [ @@ -136,10 +139,10 @@ void main() { ), ], ]); - } finally { - await valueSub.cancel(); - await changeSub.cancel(); - } + valueSub.cancel(); + changeSub.cancel(); + async.flushMicrotasks(); + }); }); group('ObservableQuery equivalence', () { @@ -150,12 +153,15 @@ void main() { test( '${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' 'matches a full recompute', - () async { - await _walk( - sorted: sorted, - seed: 1000 + threshold, - threshold: threshold, - ); + () { + fakeAsync((async) { + _walk( + async: async, + sorted: sorted, + seed: 1000 + threshold, + threshold: threshold, + ); + }); }, ); } diff --git a/test/core/persistor/lock_test.dart b/test/core/persistor/lock_test.dart index 231cea4..3c16093 100644 --- a/test/core/persistor/lock_test.dart +++ b/test/core/persistor/lock_test.dart @@ -1,85 +1,129 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/persistor/lock.dart'; +import '../../utils.dart'; + +const _workDuration = Duration(milliseconds: 10); +const _burstWorkDuration = Duration(milliseconds: 5); + void main() { group('Lock', () { test( 'Serializes a single waiter behind the holder', - () async { - final lock = Lock(); - final order = []; + () { + fakeAsync((async) { + final lock = Lock(); + final order = []; + var aComplete = false; + var bComplete = false; - final a = lock.run(() async { - order.add('A-start'); - await Future.delayed(const Duration(milliseconds: 10)); - order.add('A-end'); - }); + lock.run(() async { + order.add('A-start'); + await Future.delayed(_workDuration); + order.add('A-end'); + }).then((_) => aComplete = true); - // Give A time to acquire before B arrives. - await Future.delayed(Duration.zero); + async.flushMicrotasks(); - final b = lock.run(() async { - order.add('B-start'); - await Future.delayed(const Duration(milliseconds: 10)); - order.add('B-end'); + lock.run(() async { + order.add('B-start'); + await Future.delayed(_workDuration); + order.add('B-end'); + }).then((_) => bComplete = true); + + async.flushMicrotasks(); + expect(order, ['A-start']); + expect(aComplete, false); + expect(bComplete, false); + + elapseAndFlush(async, _workDuration); + expect(order, ['A-start', 'A-end', 'B-start']); + expect(aComplete, true); + expect(bComplete, false); + + elapseAndFlush(async, _workDuration); + expect(order, ['A-start', 'A-end', 'B-start', 'B-end']); + expect(bComplete, true); }); - - await Future.wait([a, b]); - - expect(order, ['A-start', 'A-end', 'B-start', 'B-end']); }, ); test( 'Serializes multiple waiters that all queue on the same holder', - () async { - final lock = Lock(); - int inFlight = 0; - int maxInFlight = 0; - - Future work() { - return lock.run(() async { - inFlight++; - if (inFlight > maxInFlight) maxInFlight = inFlight; - await Future.delayed(const Duration(milliseconds: 10)); - inFlight--; - }); - } - - final a = work(); - await Future.delayed(Duration.zero); - final b = work(); - final c = work(); - - await Future.wait([a, b, c]); - - expect(maxInFlight, 1); + () { + fakeAsync((async) { + final lock = Lock(); + var inFlight = 0; + var maxInFlight = 0; + var completed = 0; + + void work() { + lock.run(() async { + inFlight++; + if (inFlight > maxInFlight) maxInFlight = inFlight; + await Future.delayed(_workDuration); + inFlight--; + }).then((_) => completed++); + } + + work(); + async.flushMicrotasks(); + work(); + work(); + async.flushMicrotasks(); + + expect(maxInFlight, 1); + expect(completed, 0); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 1); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 2); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 3); + }); }, ); test( 'Serializes a burst of concurrent acquires', - () async { - final lock = Lock(); - int inFlight = 0; - int maxInFlight = 0; - int completed = 0; - - Future work() { - return lock.run(() async { - inFlight++; - if (inFlight > maxInFlight) maxInFlight = inFlight; - await Future.delayed(const Duration(milliseconds: 5)); - inFlight--; - completed++; - }); - } - - final futures = List.generate(10, (_) => work()); - await Future.wait(futures); - - expect(maxInFlight, 1); - expect(completed, 10); + () { + fakeAsync((async) { + final lock = Lock(); + var inFlight = 0; + var maxInFlight = 0; + var completed = 0; + + void work() { + lock.run(() async { + inFlight++; + if (inFlight > maxInFlight) maxInFlight = inFlight; + await Future.delayed(_burstWorkDuration); + inFlight--; + completed++; + }); + } + + for (var i = 0; i < 10; i++) { + work(); + } + + async.flushMicrotasks(); + expect(maxInFlight, 1); + + for (var i = 0; i < 10; i++) { + elapseAndFlush(async, _burstWorkDuration); + expect(maxInFlight, 1); + } + + expect(completed, 10); + }); }, ); diff --git a/test/core/persistor/persist_manager_test.dart b/test/core/persistor/persist_manager_test.dart index 7425c7b..a3aca49 100644 --- a/test/core/persistor/persist_manager_test.dart +++ b/test/core/persistor/persist_manager_test.dart @@ -1,12 +1,20 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; import '../../models/test_persistor.dart'; import '../../models/test_user_model.dart'; +import '../../utils.dart'; + +void _flushPersistManagerOperation(FakeAsync async) { + elapseAndFlush(async, const Duration(milliseconds: 1)); +} void main() { - tearDown(() { - Loon.clearAll(); + tearDown(() async { + Loon.configure(persistor: null); + Loon.unsubscribe(); + await Loon.clearAll(broadcast: false); }); /// Tests the behavior of the [PersistManager] to check that it properly batches @@ -14,159 +22,172 @@ void main() { group('Persist Manager', () { test( 'Batches contiguous persistence operations', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + _flushPersistManagerOperation(async); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.doc('2').create(TestUserModel('User 2')); + + _flushPersistManagerOperation(async); + + userCollection.doc('3').create(TestUserModel('User 3')); + + _flushPersistManagerOperation(async); + + // There should be two persistence calls: + // 1. The first two writes should be grouped together into a single batch in the first 1ms throttle. + // 2. The third write is grouped into its own batch. + expect(batches.length, 2); + expect( + batches.first, + { + userCollection.doc('1'), + userCollection.doc('2'), }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - - userCollection.doc('3').create(TestUserModel('User 3')); - - await TestPersistor.completer.onPersist; - - // There should be two persistence calls: - // 1. The first two writes should be grouped together into a single batch in the first 1ms throttle. - // 2. The third write is grouped into its own batch. - expect(batches.length, 2); - expect( - batches.first, - { - userCollection.doc('1'), - userCollection.doc('2'), - }, - ); - expect(batches.last, {userCollection.doc('3')}); + ); + expect(batches.last, {userCollection.doc('3')}); + }); }, ); test( 'Does not batch non-contiguous operations', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); - }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.delete(); - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - await TestPersistor.completer.onClear; - await TestPersistor.completer.onPersist; - - // There should be a separate persistence call for each document in this scenario since the operation - // order must be persist->clear->persist in order to ensure the correct sequencing of events. - expect(batches.length, 2); - expect( - batches.first, - {userCollection.doc('1')}, - ); - expect(batches.last, {userCollection.doc('2')}); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + _flushPersistManagerOperation(async); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.delete(); + userCollection.doc('2').create(TestUserModel('User 2')); + + _flushPersistManagerOperation(async); + _flushPersistManagerOperation(async); + _flushPersistManagerOperation(async); + + // There should be a separate persistence call for each document in this scenario since the operation + // order must be persist->clear->persist in order to ensure the correct sequencing of events. + expect(batches.length, 2); + expect( + batches.first, + {userCollection.doc('1')}, + ); + expect(batches.last, {userCollection.doc('2')}); + }); }, ); test( 'De-dupes persisting the same document', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); - }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.doc('1').update(TestUserModel('User 1 updated')); - - await TestPersistor.completer.onPersist; - - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - - expect(batches.length, 2); - - // The multiple updates to user doc 1 are grouped and de-duped in the same batch. Persistors can read the current - // value of the documents in the batch so there is no need to include document updates again. - expect(batches.first, [userCollection.doc('1')]); - expect(batches.last, [userCollection.doc('2')]); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + _flushPersistManagerOperation(async); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.doc('1').update(TestUserModel('User 1 updated')); + + _flushPersistManagerOperation(async); + + userCollection.doc('2').create(TestUserModel('User 2')); + + _flushPersistManagerOperation(async); + + expect(batches.length, 2); + + // The multiple updates to user doc 1 are grouped and de-duped in the same batch. Persistors can read the current + // value of the documents in the batch so there is no need to include document updates again. + expect(batches.first, [userCollection.doc('1')]); + expect(batches.last, [userCollection.doc('2')]); + }); }, ); test( 'Batches clearing of collections', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onClear: (refs) { - batches.add(refs); - }, - ), - ); - - Loon.collection('users').doc('1').create(true); - Loon.collection('posts').doc('1').create(true); - Loon.collection('messages').doc('1').create(true); - - Loon.collection('users').delete(); - Loon.collection('posts').delete(); - - await TestPersistor.completer.onClear; - - Loon.collection('messages').delete(); - - await TestPersistor.completer.onClear; - - expect(batches.length, 2); - - expect(batches.first, [ - Loon.collection('users'), - Loon.collection('posts'), - ]); - expect(batches.last, [Loon.collection('messages')]); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onClear: (refs) { + batches.add(refs); + }, + ), + ); + _flushPersistManagerOperation(async); + + Loon.collection('users').doc('1').create(true); + Loon.collection('posts').doc('1').create(true); + Loon.collection('messages').doc('1').create(true); + + Loon.collection('users').delete(); + Loon.collection('posts').delete(); + + _flushPersistManagerOperation(async); + _flushPersistManagerOperation(async); + + Loon.collection('messages').delete(); + + _flushPersistManagerOperation(async); + + expect(batches.length, 2); + + expect(batches.first, [ + Loon.collection('users'), + Loon.collection('posts'), + ]); + expect(batches.last, [Loon.collection('messages')]); + }); }, ); }); diff --git a/test/utils.dart b/test/utils.dart index 459c372..e089dc1 100644 --- a/test/utils.dart +++ b/test/utils.dart @@ -1,6 +1,6 @@ -import 'dart:async'; import 'dart:convert'; import 'package:encrypt/encrypt.dart'; +import 'package:fake_async/fake_async.dart'; import 'package:loon/loon.dart'; final testEncryptionKey = Key.fromSecureRandom(32); @@ -22,6 +22,14 @@ Json decryptData(String encrypted) { ); } -Future asyncEvent() { - return Future.delayed(const Duration(milliseconds: 1), () => null); +/// Advances past Loon's zero-duration broadcast timer and drains stream delivery. +void flushBroadcasts(FakeAsync async) { + elapseAndFlush(async, const Duration(milliseconds: 1)); +} + +/// Advances fake time and drains microtasks around any timer callbacks. +void elapseAndFlush(FakeAsync async, Duration duration) { + async.flushMicrotasks(); + async.elapse(duration); + async.flushMicrotasks(); } From a9f065e69f3659e0e67ef8c7484ed91d89df7e25 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 10:55:20 -0400 Subject: [PATCH 6/9] fix: avoid path delimiters in generated ids --- lib/utils/id.dart | 5 +++-- test/core/id_test.dart | 13 +++++++++++++ 2 files changed, 16 insertions(+), 2 deletions(-) create mode 100644 test/core/id_test.dart diff --git a/lib/utils/id.dart b/lib/utils/id.dart index 16eb2ad..b27a186 100644 --- a/lib/utils/id.dart +++ b/lib/utils/id.dart @@ -1,9 +1,10 @@ import 'dart:math'; import 'dart:typed_data'; -// 64-characters (2^6) +// 64 characters (2^6). This intentionally excludes "_" because Loon paths use +// "__" as their segment delimiter. const String _alphabet = - '_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; + '-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~'; final Uint8List _alphabytes = Uint8List.fromList(_alphabet.codeUnits); const int _u32 = 0x100000000; // 2^32 diff --git a/test/core/id_test.dart b/test/core/id_test.dart new file mode 100644 index 0000000..6a34d69 --- /dev/null +++ b/test/core/id_test.dart @@ -0,0 +1,13 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:loon/utils/id.dart'; + +void main() { + group('IDs', () { + test('generated IDs do not contain the path delimiter', () { + for (var i = 0; i < 1000; i++) { + expect(generateSecureId(), isNot(contains('__'))); + expect(generateFastId(), isNot(contains('__'))); + } + }); + }); +} From 6b96da1cb326de9096af303f913a6699ea1f5f79 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 13:18:09 -0400 Subject: [PATCH 7/9] fix: address persistence review feedback --- lib/broadcast_manager.dart | 14 +++++++- .../file_data_store_config.dart | 30 ++++++++--------- pubspec.yaml | 1 - test/core/loon_test.dart | 1 - test/core/persistor/persist_manager_test.dart | 32 ++++++++----------- test/native/file_persistor_test.dart | 19 +++++++---- 6 files changed, 52 insertions(+), 45 deletions(-) diff --git a/lib/broadcast_manager.dart b/lib/broadcast_manager.dart index b0b6825..e64984c 100644 --- a/lib/broadcast_manager.dart +++ b/lib/broadcast_manager.dart @@ -38,6 +38,13 @@ class BroadcastManager { /// Whether the broadcast store is dirty and has a pending broadcast scheduled. bool _pendingBroadcast = false; + Timer? _broadcastTimer; + + void _cancelBroadcast() { + _broadcastTimer?.cancel(); + _broadcastTimer = null; + _pendingBroadcast = false; + } void _scheduleBroadcast() { if (!_pendingBroadcast) { @@ -45,11 +52,13 @@ class BroadcastManager { // The broadcast is run async so that multiple broadcast events can be batched // together into one update across all changes that occur in the current task of the event loop. - Future.delayed(Duration.zero, _broadcast); + _broadcastTimer = Timer(Duration.zero, _broadcast); } } void _broadcast() { + _broadcastTimer = null; + _depObservers.clear(); for (final observer in _observers.toList()) { @@ -136,6 +145,7 @@ class BroadcastManager { } void clear({bool broadcast = true}) { + _cancelBroadcast(); eventStore.clear(); observerValueStore.clear(); @@ -166,6 +176,8 @@ class BroadcastManager { } void unsubscribe() { + _cancelBroadcast(); + eventStore.clear(); for (final observer in _observers.toList()) { observer.dispose(); } diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 9bb632b..61dea97 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -3,7 +3,6 @@ import 'dart:io'; import 'package:loon/loon.dart'; import 'package:loon/persistor/data_store.dart'; import 'package:loon/persistor/data_store_resolver.dart'; -import 'package:pointycastle/api.dart' show InvalidCipherTextException; void _logHydrationFailure( File file, @@ -111,8 +110,20 @@ class FileDataStoreConfig extends DataStoreConfig { hydrate: () async { try { final value = await file.readAsString(); + String contents; + if (encrypted) { + try { + contents = encrypter.decrypt(value); + } catch (error, stackTrace) { + await _recoverCorruptFile(file, logger, error, stackTrace); + return null; + } + } else { + contents = value; + } + final json = _decodeJsonObject( - encrypted ? encrypter.decrypt(value) : value, + contents, 'persisted data store', ); final store = ValueStore(); @@ -137,15 +148,6 @@ class FileDataStoreConfig extends DataStoreConfig { } on FormatException catch (error, stackTrace) { await _recoverCorruptFile(file, logger, error, stackTrace); return null; - } on InvalidCipherTextException catch (error, stackTrace) { - await _recoverCorruptFile(file, logger, error, stackTrace); - return null; - } on ArgumentError catch (error, stackTrace) { - if (!encrypted) { - rethrow; - } - await _recoverCorruptFile(file, logger, error, stackTrace); - return null; } }, persist: (store) async { @@ -181,12 +183,6 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig { ); } on PathNotFoundException { return null; - } on FileSystemException catch (error, stackTrace) { - _logHydrationFailure(file, logger, error, stackTrace); - return null; - } on FormatException catch (error, stackTrace) { - await _recoverCorruptFile(file, logger, error, stackTrace); - return null; } }, persist: (store) => diff --git a/pubspec.yaml b/pubspec.yaml index 5ac6285..49761fc 100644 --- a/pubspec.yaml +++ b/pubspec.yaml @@ -14,7 +14,6 @@ dependencies: flutter_secure_storage: ^10.0.0 path: ^1.8.3 path_provider: ^2.1.5 - pointycastle: ^3.9.1 web: ^1.1.0 sqflite: ^2.4.1 sqflite_common_ffi: ^2.3.4 diff --git a/test/core/loon_test.dart b/test/core/loon_test.dart index ff11ca0..0027379 100644 --- a/test/core/loon_test.dart +++ b/test/core/loon_test.dart @@ -16,7 +16,6 @@ void main() { }); tearDown(() async { - await Future.delayed(Duration.zero); Loon.unsubscribe(); await Loon.clearAll(broadcast: false); }); diff --git a/test/core/persistor/persist_manager_test.dart b/test/core/persistor/persist_manager_test.dart index a3aca49..75438d5 100644 --- a/test/core/persistor/persist_manager_test.dart +++ b/test/core/persistor/persist_manager_test.dart @@ -6,10 +6,6 @@ import '../../models/test_persistor.dart'; import '../../models/test_user_model.dart'; import '../../utils.dart'; -void _flushPersistManagerOperation(FakeAsync async) { - elapseAndFlush(async, const Duration(milliseconds: 1)); -} - void main() { tearDown(() async { Loon.configure(persistor: null); @@ -34,7 +30,7 @@ void main() { }, ), ); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); final userCollection = Loon.collection( 'users', @@ -45,11 +41,11 @@ void main() { userCollection.doc('1').create(TestUserModel('User 1')); userCollection.doc('2').create(TestUserModel('User 2')); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); userCollection.doc('3').create(TestUserModel('User 3')); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); // There should be two persistence calls: // 1. The first two writes should be grouped together into a single batch in the first 1ms throttle. @@ -81,7 +77,7 @@ void main() { }, ), ); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); final userCollection = Loon.collection( 'users', @@ -93,9 +89,9 @@ void main() { userCollection.delete(); userCollection.doc('2').create(TestUserModel('User 2')); - _flushPersistManagerOperation(async); - _flushPersistManagerOperation(async); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); // There should be a separate persistence call for each document in this scenario since the operation // order must be persist->clear->persist in order to ensure the correct sequencing of events. @@ -123,7 +119,7 @@ void main() { }, ), ); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); final userCollection = Loon.collection( 'users', @@ -134,11 +130,11 @@ void main() { userCollection.doc('1').create(TestUserModel('User 1')); userCollection.doc('1').update(TestUserModel('User 1 updated')); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); userCollection.doc('2').create(TestUserModel('User 2')); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); expect(batches.length, 2); @@ -164,7 +160,7 @@ void main() { }, ), ); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); Loon.collection('users').doc('1').create(true); Loon.collection('posts').doc('1').create(true); @@ -173,12 +169,12 @@ void main() { Loon.collection('users').delete(); Loon.collection('posts').delete(); - _flushPersistManagerOperation(async); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); Loon.collection('messages').delete(); - _flushPersistManagerOperation(async); + elapseAndFlush(async, const Duration(milliseconds: 1)); expect(batches.length, 2); diff --git a/test/native/file_persistor_test.dart b/test/native/file_persistor_test.dart index 60b0c5c..ba4963b 100644 --- a/test/native/file_persistor_test.dart +++ b/test/native/file_persistor_test.dart @@ -224,22 +224,27 @@ void main() { ); }); - test('A corrupt resolver file does not fail hydration', () async { + test('A corrupt resolver file fails hydration', () async { await writeDefaultStore(); await File('${loonDir.path}/__resolver__.json') .writeAsString('}{ broken'); - Loon.configure(persistor: newPersistor()); - - await Loon.hydrate(); + await expectLater( + newPersistor().init(), + throwsA( + predicate( + (error) => error.toString().contains('FormatException'), + ), + ), + ); expect( - Loon.collection('users').doc('1').get()?.data, - {"name": "User 1"}, + await File('${loonDir.path}/__resolver__.json').exists(), + true, ); expect( await File('${loonDir.path}/__resolver__.json.corrupt').exists(), - true, + false, ); }); }); From 3b4c9c4e97efb77b906430ed579c3e28ed99c795 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 13:29:44 -0400 Subject: [PATCH 8/9] refactor: simplify file persistor hydration --- .../file_data_store_config.dart | 37 ++----------------- 1 file changed, 4 insertions(+), 33 deletions(-) diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 61dea97..0dc3a42 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -43,23 +43,6 @@ Future _recoverCorruptFile( } } -Json _decodeJsonObject(String value, String description) { - final decoded = jsonDecode(value); - if (decoded is! Json) { - throw FormatException('Expected $description to be a JSON object.'); - } - - return decoded; -} - -Json _readNestedJsonObject(Object? value, String description) { - if (value is! Json) { - throw FormatException('Expected $description to be a JSON object.'); - } - - return value; -} - /// Writes [contents] to [file] atomically: the data is written to a sibling /// temporary file, flushed to disk, then renamed over the target. A rename on /// the same filesystem is atomic, so an interrupted write (crash, OOM kill, @@ -122,20 +105,12 @@ class FileDataStoreConfig extends DataStoreConfig { contents = value; } - final json = _decodeJsonObject( - contents, - 'persisted data store', - ); + final json = jsonDecode(contents); final store = ValueStore(); for (final entry in json.entries) { final resolverPath = entry.key; - final valueStore = ValueStore.fromJson( - _readNestedJsonObject( - entry.value, - 'persisted data store entry "$resolverPath"', - ), - ); + final valueStore = ValueStore.fromJson(entry.value); store.write(resolverPath, valueStore); } @@ -175,12 +150,8 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig { }) : super( hydrate: () async { try { - return ValueRefStore( - _decodeJsonObject( - await file.readAsString(), - 'persisted resolver', - ), - ); + final json = jsonDecode(await file.readAsString()); + return ValueRefStore(json); } on PathNotFoundException { return null; } From 67215ee758b05ea70d7bcd20d13e473cd61f78a2 Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 13:39:28 -0400 Subject: [PATCH 9/9] refactor: derive pending broadcasts from timer --- lib/broadcast_manager.dart | 12 ++++-------- 1 file changed, 4 insertions(+), 8 deletions(-) diff --git a/lib/broadcast_manager.dart b/lib/broadcast_manager.dart index e64984c..8d09dec 100644 --- a/lib/broadcast_manager.dart +++ b/lib/broadcast_manager.dart @@ -36,20 +36,18 @@ class BroadcastManager { /// The subset of broadcast observers from [_observers] with dependencies. final Set _depObservers = {}; - /// Whether the broadcast store is dirty and has a pending broadcast scheduled. - bool _pendingBroadcast = false; + /// Non-null while a broadcast is scheduled or currently draining. Timer? _broadcastTimer; + bool get _pendingBroadcast => _broadcastTimer != null; + void _cancelBroadcast() { _broadcastTimer?.cancel(); _broadcastTimer = null; - _pendingBroadcast = false; } void _scheduleBroadcast() { if (!_pendingBroadcast) { - _pendingBroadcast = true; - // The broadcast is run async so that multiple broadcast events can be batched // together into one update across all changes that occur in the current task of the event loop. _broadcastTimer = Timer(Duration.zero, _broadcast); @@ -57,8 +55,6 @@ class BroadcastManager { } void _broadcast() { - _broadcastTimer = null; - _depObservers.clear(); for (final observer in _observers.toList()) { @@ -72,7 +68,7 @@ class BroadcastManager { } eventStore.clear(); - _pendingBroadcast = false; + _broadcastTimer = null; } /// Schedules all dependents of the given document for broadcast.