diff --git a/example/pubspec.lock b/example/pubspec.lock index 7d7c442..cae909c 100644 --- a/example/pubspec.lock +++ b/example/pubspec.lock @@ -339,10 +339,10 @@ packages: dependency: transitive description: name: pointycastle - sha256: "7c1e5f0d23c9016c5bbd8b1473d0d3fb3fc851b876046039509e18e0c7485f2c" + sha256: "4be0097fcf3fd3e8449e53730c631200ebc7b88016acecab2b0da2f0149222fe" url: "https://pub.dev" source: hosted - version: "3.7.3" + version: "3.9.1" process: dependency: transitive description: diff --git a/lib/broadcast_manager.dart b/lib/broadcast_manager.dart index b0b6825..8d09dec 100644 --- a/lib/broadcast_manager.dart +++ b/lib/broadcast_manager.dart @@ -36,16 +36,21 @@ class BroadcastManager { /// The subset of broadcast observers from [_observers] with dependencies. final Set _depObservers = {}; - /// Whether the broadcast store is dirty and has a pending broadcast scheduled. - bool _pendingBroadcast = false; + /// Non-null while a broadcast is scheduled or currently draining. + Timer? _broadcastTimer; + + bool get _pendingBroadcast => _broadcastTimer != null; + + void _cancelBroadcast() { + _broadcastTimer?.cancel(); + _broadcastTimer = null; + } void _scheduleBroadcast() { if (!_pendingBroadcast) { - _pendingBroadcast = true; - // The broadcast is run async so that multiple broadcast events can be batched // together into one update across all changes that occur in the current task of the event loop. - Future.delayed(Duration.zero, _broadcast); + _broadcastTimer = Timer(Duration.zero, _broadcast); } } @@ -63,7 +68,7 @@ class BroadcastManager { } eventStore.clear(); - _pendingBroadcast = false; + _broadcastTimer = null; } /// Schedules all dependents of the given document for broadcast. @@ -136,6 +141,7 @@ class BroadcastManager { } void clear({bool broadcast = true}) { + _cancelBroadcast(); eventStore.clear(); observerValueStore.clear(); @@ -166,6 +172,8 @@ class BroadcastManager { } void unsubscribe() { + _cancelBroadcast(); + eventStore.clear(); for (final observer in _observers.toList()) { observer.dispose(); } diff --git a/lib/persistor/file_persistor/file_data_store_config.dart b/lib/persistor/file_persistor/file_data_store_config.dart index 4135d75..0dc3a42 100644 --- a/lib/persistor/file_persistor/file_data_store_config.dart +++ b/lib/persistor/file_persistor/file_data_store_config.dart @@ -4,6 +4,45 @@ import 'package:loon/loon.dart'; import 'package:loon/persistor/data_store.dart'; import 'package:loon/persistor/data_store_resolver.dart'; +void _logHydrationFailure( + File file, + Logger logger, + Object error, + StackTrace stackTrace, +) { + logger.log( + 'Failed to hydrate ${file.path}, recovering without quarantine: ' + '$error\n$stackTrace', + ); +} + +/// Moves a file that failed to hydrate (corrupt JSON, failed decryption) aside +/// to `.corrupt` and recovers by returning null (an empty store) so that +/// one unreadable file cannot fail hydration of the entire store. The data is +/// preserved for inspection, the `.corrupt` suffix is ignored by the data store +/// file listing, and the next persist for the partition writes a fresh file. +Future _recoverCorruptFile( + File file, + Logger logger, + Object error, + StackTrace stackTrace, +) async { + logger.log( + 'Failed to hydrate ${file.path}, quarantining as corrupt: ' + '$error\n$stackTrace', + ); + + try { + final quarantine = File('${file.path}.corrupt'); + if (await quarantine.exists()) { + await quarantine.delete(); + } + await file.rename(quarantine.path); + } catch (error, stackTrace) { + logger.log('Failed to quarantine ${file.path}: $error\n$stackTrace'); + } +} + /// Writes [contents] to [file] atomically: the data is written to a sibling /// temporary file, flushed to disk, then renamed over the target. A rename on /// the same filesystem is atomic, so an interrupted write (crash, OOM kill, @@ -54,9 +93,19 @@ class FileDataStoreConfig extends DataStoreConfig { hydrate: () async { try { final value = await file.readAsString(); - final json = jsonDecode( - encrypted ? encrypter.decrypt(value) : value, - ); + String contents; + if (encrypted) { + try { + contents = encrypter.decrypt(value); + } catch (error, stackTrace) { + await _recoverCorruptFile(file, logger, error, stackTrace); + return null; + } + } else { + contents = value; + } + + final json = jsonDecode(contents); final store = ValueStore(); for (final entry in json.entries) { @@ -68,6 +117,12 @@ class FileDataStoreConfig extends DataStoreConfig { return store; } on PathNotFoundException { return null; + } on FileSystemException catch (error, stackTrace) { + _logHydrationFailure(file, logger, error, stackTrace); + return null; + } on FormatException catch (error, stackTrace) { + await _recoverCorruptFile(file, logger, error, stackTrace); + return null; } }, persist: (store) async { @@ -95,8 +150,8 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig { }) : super( hydrate: () async { try { - return ValueRefStore( - jsonDecode(await file.readAsString())); + final json = jsonDecode(await file.readAsString()); + return ValueRefStore(json); } on PathNotFoundException { return null; } diff --git a/lib/utils/id.dart b/lib/utils/id.dart index 16eb2ad..b27a186 100644 --- a/lib/utils/id.dart +++ b/lib/utils/id.dart @@ -1,9 +1,10 @@ import 'dart:math'; import 'dart:typed_data'; -// 64-characters (2^6) +// 64 characters (2^6). This intentionally excludes "_" because Loon paths use +// "__" as their segment delimiter. const String _alphabet = - '_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ'; + '-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~'; final Uint8List _alphabytes = Uint8List.fromList(_alphabet.codeUnits); const int _u32 = 0x100000000; // 2^32 diff --git a/test/core/broadcast_timing_test.dart b/test/core/broadcast_timing_test.dart index 9b3e6b6..4e4df25 100644 --- a/test/core/broadcast_timing_test.dart +++ b/test/core/broadcast_timing_test.dart @@ -2,6 +2,8 @@ import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; +import '../utils.dart'; + /// Deterministic tests for broadcast batching, coalescing, and ordering. /// /// Loon schedules a broadcast on a zero-duration timer so that all writes in a @@ -16,12 +18,6 @@ import 'package:loon/loon.dart'; /// (A dedicated file runs in its own test isolate, so the global store starts /// clean and these virtual-time tests are isolated from the rest of the suite.) -/// Advances virtual time past the broadcast's zero-duration timer and drains -/// the microtasks that deliver stream events. -void _flush(FakeAsync async) { - async.elapse(const Duration(milliseconds: 1)); -} - void _reset(FakeAsync async) { Loon.unsubscribe(); Loon.clearAll(broadcast: false); @@ -39,12 +35,12 @@ void main() { final sub = col .stream() .listen((snaps) => emissions.add([for (final s in snaps) s.data])); - _flush(async); // initial emission + flushBroadcasts(async); // initial emission col.doc('1').create(1); col.doc('2').create(2); col.doc('3').create(3); - _flush(async); + flushBroadcasts(async); // One emission for the initial value and exactly one for the batch. expect(emissions.length, 2); @@ -62,11 +58,11 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); doc.update(2); - _flush(async); + flushBroadcasts(async); // The create and update collapse into a single emission of the final value. expect(emissions, [null, 2]); @@ -83,14 +79,14 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); - _flush(async); + flushBroadcasts(async); doc.update(2); - _flush(async); + flushBroadcasts(async); doc.update(3); - _flush(async); + flushBroadcasts(async); expect(emissions, [null, 1, 2, 3]); @@ -106,12 +102,12 @@ void main() { final emissions = []; final sub = doc.stream().listen((snap) => emissions.add(snap?.data)); - _flush(async); // initial null + flushBroadcasts(async); // initial null doc.create(1); - _flush(async); + flushBroadcasts(async); doc.update(1); // same value - _flush(async); + flushBroadcasts(async); // No emission for the no-op update. expect(emissions, [null, 1]); diff --git a/test/core/id_test.dart b/test/core/id_test.dart new file mode 100644 index 0000000..6a34d69 --- /dev/null +++ b/test/core/id_test.dart @@ -0,0 +1,13 @@ +import 'package:flutter_test/flutter_test.dart'; +import 'package:loon/utils/id.dart'; + +void main() { + group('IDs', () { + test('generated IDs do not contain the path delimiter', () { + for (var i = 0; i < 1000; i++) { + expect(generateSecureId(), isNot(contains('__'))); + expect(generateFastId(), isNot(contains('__'))); + } + }); + }); +} diff --git a/test/core/loon_test.dart b/test/core/loon_test.dart index 7fe885c..0027379 100644 --- a/test/core/loon_test.dart +++ b/test/core/loon_test.dart @@ -1,3 +1,4 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; @@ -16,7 +17,7 @@ void main() { tearDown(() async { Loon.unsubscribe(); - await Loon.clearAll(); + await Loon.clearAll(broadcast: false); }); group( @@ -408,34 +409,39 @@ void main() { ); }); - test('Skips broadcasting unchanged data by default', () async { - final userDoc = Loon.collection('users').doc('1'); - final user = TestUserModel('User 1'); - final updatedUser = TestUserModel('User 1 updated'); + test('Skips broadcasting unchanged data by default', () { + fakeAsync((async) { + final userDoc = Loon.collection('users').doc('1'); + final user = TestUserModel('User 1'); + final updatedUser = TestUserModel('User 1 updated'); - userDoc.create(user); + userDoc.create(user); - await asyncEvent(); + flushBroadcasts(async); - final stream = userDoc.stream(); + final events = []; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - userDoc.update(user); + userDoc.update(user); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(updatedUser); + userDoc.update(updatedUser); - await asyncEvent(); + flushBroadcasts(async); - expectLater( - stream, - emitsInOrder( + expect( + events, [ DocumentSnapshot(doc: userDoc, data: user), DocumentSnapshot(doc: userDoc, data: updatedUser), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); @@ -658,28 +664,33 @@ void main() { () { test( 'rebroadcasts the document to its stream listeners', - () async { - final usersCollection = Loon.collection('users'); - final userDoc = usersCollection.doc('1'); - - final stream = userDoc.stream(); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final userDoc = usersCollection.doc('1'); - userDoc.create('Test'); + final events = []; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - await asyncEvent(); + userDoc.create('Test'); + flushBroadcasts(async); - userDoc.rebroadcast(); + userDoc.rebroadcast(); + flushBroadcasts(async); - expectLater( - stream, - emitsInOrder( + expect( + events, [ null, DocumentSnapshot(doc: userDoc, data: 'Test'), DocumentSnapshot(doc: userDoc, data: 'Test'), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -690,54 +701,59 @@ void main() { () { test( "Rebuilds the document's dependencies", - () async { - bool hasDependencies = false; - - final usersCollection = Loon.collection( - 'users', - dependenciesBuilder: (snap) { - if (hasDependencies) { - return { - Loon.collection('friends').doc(snap.id), - }; - } - - return {}; - }, - ); - - final userDoc = usersCollection.doc('1'); - userDoc.create('User 1'); + () { + fakeAsync((async) { + bool hasDependencies = false; + + final usersCollection = Loon.collection( + 'users', + dependenciesBuilder: (snap) { + if (hasDependencies) { + return { + Loon.collection('friends').doc(snap.id), + }; + } - await asyncEvent(); + return {}; + }, + ); - final stream = userDoc.stream(); + final userDoc = usersCollection.doc('1'); + userDoc.create('User 1'); + flushBroadcasts(async); - final friendDoc = Loon.collection('friends').doc(userDoc.id); - // This event is ignored since the user does not yet depend on the friend document. - friendDoc.create('Friend 1'); + final events = ?>[]; + final sub = userDoc.stream().listen(events.add); + flushBroadcasts(async); - userDoc.update('User 1 updated'); + final friendDoc = + Loon.collection('friends').doc(userDoc.id); + // This event is ignored since the user does not yet depend on the friend document. + friendDoc.create('Friend 1'); - await asyncEvent(); + userDoc.update('User 1 updated'); + flushBroadcasts(async); - hasDependencies = true; + hasDependencies = true; - userDoc.rebuildDependencies(); + userDoc.rebuildDependencies(); - // This event causes a rebroadcast of the user, as they now depend on the friend document. - friendDoc.update('Friend 1 updated'); + // This event causes a rebroadcast of the user, as they now depend on the friend document. + friendDoc.update('Friend 1 updated'); + flushBroadcasts(async); - expect( - stream, - emitsInOrder( + expect( + events, [ DocumentSnapshot(doc: userDoc, data: 'User 1'), DocumentSnapshot(doc: userDoc, data: 'User 1 updated'), DocumentSnapshot(doc: userDoc, data: 'User 1 updated'), ], - ), - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -751,31 +767,35 @@ void main() { group( 'stream', () { - test('Returns a stream of document snapshots', () async { - final user = TestUserModel('User 1'); - final userUpdated = TestUserModel('User 1 updated'); - final userDoc = TestUserModel.store.doc('1'); - final stream = userDoc.stream(); + test('Returns a stream of document snapshots', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userUpdated = TestUserModel('User 1 updated'); + final userDoc = TestUserModel.store.doc('1'); + final events = ?>[]; + final sub = userDoc.stream().listen(events.add); - await asyncEvent(); - userDoc.create(user); - await asyncEvent(); - userDoc.update(userUpdated); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + flushBroadcasts(async); + userDoc.update(userUpdated); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); - final events = await stream.take(4).toList(); + expect( + events, + [ + null, + DocumentSnapshot(doc: userDoc, data: user), + DocumentSnapshot(doc: userDoc, data: userUpdated), + null, + ], + ); - expect( - events, - [ - null, - DocumentSnapshot(doc: userDoc, data: user), - DocumentSnapshot(doc: userDoc, data: userUpdated), - null, - ], - ); + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); @@ -783,149 +803,158 @@ void main() { group( 'streamChanges', () { - test('Returns a stream of document change snapshots', () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); - final userObs = userDoc.observe(); - final future = userObs.streamChanges().take(2).toList(); + test('Returns a stream of document change snapshots', () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); + final userObs = userDoc.observe(); + final events = >[]; + final sub = userObs.streamChanges().listen(events.add); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); - userDoc.update(userDataUpdated); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + userDoc.update(userDataUpdated); + flushBroadcasts(async); - final events = await future; + expect( + events, + [ + DocumentChangeSnapshot( + doc: userObs, + data: userData, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userObs, + data: userDataUpdated, + prevData: userData, + event: BroadcastEvents.modified, + ), + ], + ); - expect( - events, - [ - DocumentChangeSnapshot( - doc: userObs, - data: userData, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userObs, - data: userDataUpdated, - prevData: userData, - event: BroadcastEvents.modified, - ), - ], - ); + sub.cancel(); + async.flushMicrotasks(); + }); }); }, ); test( "Maintains its dependency cache correctly", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['userId'] != null) { - return { - usersCollection.doc(snap.data['userId'].toString()), - }; - } - return null; - }, - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['userId'] != null) { + return { + usersCollection.doc(snap.data['userId'].toString()), + }; + } + return null; + }, + ); - final postDoc = postsCollection.doc('1'); - final postData = {"id": 1, "text": "Post 1", "userId": 1}; - final postData2 = {"id": 1, "text": "Post 1 updated"}; - final postData3 = { - "id": 1, - "text": "Post 1 updated", - "userId": 3 - }; - final postObs = postDoc.observe(); + final postDoc = postsCollection.doc('1'); + final postData = {"id": 1, "text": "Post 1", "userId": 1}; + final postData2 = {"id": 1, "text": "Post 1 updated"}; + final postData3 = { + "id": 1, + "text": "Post 1 updated", + "userId": 3 + }; + final postObs = postDoc.observe(); - postDoc.create(postData); - await asyncEvent(); + postDoc.create(postData); + flushBroadcasts(async); - // After creating the post document, it should have added its user dependency into the observable's - // dep tree. - expect(postObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + // After creating the post document, it should have added its user dependency into the observable's + // dep tree. + expect(postObs.inspect(), { + "deps": { "__ref": 1, - "1": 1, + "users": { + "__ref": 1, + "1": 1, + }, }, - }, - }); + }); - postDoc.update(postData2); - await asyncEvent(); + postDoc.update(postData2); + flushBroadcasts(async); - // After updating the document, it should have removed the user dependency from the observable's - // dep tree. - expect(postObs.inspect(), { - "deps": {}, - }); + // After updating the document, it should have removed the user dependency from the observable's + // dep tree. + expect(postObs.inspect(), { + "deps": {}, + }); - postDoc.update(postData3); - await asyncEvent(); + postDoc.update(postData3); + flushBroadcasts(async); - // The observable should have been updated to have a user dependency again. - expect(postObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + // The observable should have been updated to have a user dependency again. + expect(postObs.inspect(), { + "deps": { "__ref": 1, - "3": 1, + "users": { + "__ref": 1, + "3": 1, + }, }, - }, - }); + }); - postsCollection.delete(); - await asyncEvent(); + postsCollection.delete(); + flushBroadcasts(async); - // After deleting the posts collection, observable should have cleared its dependencies. - expect(postObs.inspect(), {"deps": {}}); + // After deleting the posts collection, observable should have cleared its dependencies. + expect(postObs.inspect(), {"deps": {}}); + }); }, ); test( 'Invalidates its cached value when the document is updated', - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); - final userDoc = usersCollection.doc('1'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); + final userDoc = usersCollection.doc('1'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); - userDoc.create(userData); - final userObs = userDoc.observe(); + userDoc.create(userData); + final userObs = userDoc.observe(); - await asyncEvent(); + flushBroadcasts(async); - DocumentSnapshot? snap = userObs.get(); + DocumentSnapshot? snap = userObs.get(); - // The observable's value is cached and is not recomputed unless invalidated by - // a change to its document. - expect(identical(snap, userObs.get()), true); - expect(snap, DocumentSnapshot(doc: userDoc, data: userData)); + // The observable's value is cached and is not recomputed unless invalidated by + // a change to its document. + expect(identical(snap, userObs.get()), true); + expect(snap, DocumentSnapshot(doc: userDoc, data: userData)); - userDoc.update(userDataUpdated); - snap = userObs.get(); + userDoc.update(userDataUpdated); + snap = userObs.get(); - // The updated document has not been broadcast yet, however the observable's cached - // value has been invalidated by the update to its documents and should return - // an up-to-date recalculated value. - expect( - snap, - DocumentSnapshot(doc: userDoc, data: userDataUpdated), - ); + // The updated document has not been broadcast yet, however the observable's cached + // value has been invalidated by the update to its documents and should return + // an up-to-date recalculated value. + expect( + snap, + DocumentSnapshot(doc: userDoc, data: userDataUpdated), + ); + flushBroadcasts(async); + }); }, ); }, @@ -1024,108 +1053,106 @@ void main() { test( 'Broadcasts the delete to observers of the collection and subcollections', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - final userData = TestUserModel('User 1'); - final userData2 = TestUserModel('User 2'); - - final friendDoc = userDoc - .subcollection( - 'friends', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - ) - .doc('2'); - - userDoc.create(userData); - friendDoc.create(userData2); - userDoc2.create(userData2); - - final userDocStream = userDoc.stream(); - final userCollectionStream = TestUserModel.store.stream(); - final friendDocStream = friendDoc.stream(); - final friendCollectionStream = - userDoc.subcollection('friends').stream(); - - TestUserModel.store.delete(); - - final userDocData = await userDocStream.take(2).toList(); - final userCollectionData = - await userCollectionStream.take(2).toList(); - final friendDocData = await friendDocStream.take(2).toList(); - final friendCollectionData = - await friendCollectionStream.take(2).toList(); - - expect( - userDocData, - [ - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - null, - ], - ); - - expect( - userCollectionData, - [ + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); + + final userData = TestUserModel('User 1'); + final userData2 = TestUserModel('User 2'); + + final friendDoc = userDoc + .subcollection( + 'friends', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + ) + .doc('2'); + + userDoc.create(userData); + friendDoc.create(userData2); + userDoc2.create(userData2); + flushBroadcasts(async); + + final userDocData = ?>[]; + final userCollectionData = + >>[]; + final friendDocData = ?>[]; + final friendCollectionData = + >>[]; + final userDocSub = userDoc.stream().listen(userDocData.add); + final userCollectionSub = TestUserModel.store + .stream() + .listen(userCollectionData.add); + final friendDocSub = + friendDoc.stream().listen(friendDocData.add); + final friendCollectionSub = userDoc + .subcollection('friends') + .stream() + .listen(friendCollectionData.add); + flushBroadcasts(async); + + TestUserModel.store.delete(); + flushBroadcasts(async); + + expect( + userDocData, [ DocumentSnapshot( doc: userDoc, data: userData, ), - DocumentSnapshot( - doc: userDoc2, - data: userData2, - ), + null, ], - [], - ], - ); - - expect( - friendDocData, - [ - DocumentSnapshot( - doc: friendDoc, - data: userData2, - ), - null, - ], - ); + ); - expect( - userCollectionData, - [ + expect( + userCollectionData, [ - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - DocumentSnapshot( - doc: userDoc2, - data: userData2, - ), + [ + DocumentSnapshot( + doc: userDoc, + data: userData, + ), + DocumentSnapshot( + doc: userDoc2, + data: userData2, + ), + ], + [], ], - [], - ], - ); + ); - expect( - friendCollectionData, - [ + expect( + friendDocData, [ DocumentSnapshot( doc: friendDoc, data: userData2, ), + null, ], - [], - ], - ); + ); + + expect( + friendCollectionData, + [ + [ + DocumentSnapshot( + doc: friendDoc, + data: userData2, + ), + ], + [], + ], + ); + + userDocSub.cancel(); + userCollectionSub.cancel(); + friendDocSub.cancel(); + friendCollectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -1198,81 +1225,90 @@ void main() { test( "Broadcasts the expected change events", - () async { - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final userData = TestUserModel('User 1'); - final userData2 = TestUserModel('User 2'); + final userData = TestUserModel('User 1'); + final userData2 = TestUserModel('User 2'); - final querySnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final querySnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(querySnaps.add); - userDoc.create(userData); - userDoc2.create(userData2); + userDoc.create(userData); + userDoc2.create(userData2); - await asyncEvent(); + flushBroadcasts(async); - final updatedUser2 = TestUserModel('User 2 updated'); - final userDoc3 = TestUserModel.store.doc('3'); - final userData3 = TestUserModel('User 3'); + final updatedUser2 = TestUserModel('User 2 updated'); + final userDoc3 = TestUserModel.store.doc('3'); + final userData3 = TestUserModel('User 3'); - TestUserModel.store.replace([ - DocumentSnapshot( - doc: userDoc2, - data: updatedUser2, - ), - DocumentSnapshot( - doc: userDoc3, - data: userData3, - ), - ]); + TestUserModel.store.replace([ + DocumentSnapshot( + doc: userDoc2, + data: updatedUser2, + ), + DocumentSnapshot( + doc: userDoc3, + data: userData3, + ), + ]); + flushBroadcasts(async); - expect( - await querySnaps, - [ + expect( + querySnaps, [ - DocumentChangeSnapshot( - doc: userDoc, - data: userData, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: userData2, - event: BroadcastEvents.added, - prevData: null, - ), + [ + DocumentChangeSnapshot( + doc: userDoc, + data: userData, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: userData2, + event: BroadcastEvents.added, + prevData: null, + ), + ], + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + event: BroadcastEvents.removed, + prevData: userData, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + event: BroadcastEvents.removed, + prevData: userData2, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: updatedUser2, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc3, + data: userData3, + event: BroadcastEvents.added, + prevData: null, + ), + ] ], - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - event: BroadcastEvents.removed, - prevData: userData, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - event: BroadcastEvents.removed, - prevData: userData2, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: updatedUser2, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc3, - data: userData3, - event: BroadcastEvents.added, - prevData: null, - ), - ] - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -1315,532 +1351,581 @@ void main() { group('stream', () { test( 'Returns a stream of updates to the query', - () async { - final user = TestUserModel('User 1'); - final userUpdated = TestUserModel('User 1 updated'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - final queryStream = TestUserModel.store - .where((snap) => snap.id == '1') - .stream(); + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userUpdated = TestUserModel('User 1 updated'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - await asyncEvent(); - userDoc.create(user); - userDoc2.create(user2); + final querySnaps = >>[]; + final sub = TestUserModel.store + .where((snap) => snap.id == '1') + .stream() + .listen(querySnaps.add); - await asyncEvent(); - userDoc.update(userUpdated); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - userDoc.create(user); - await asyncEvent(); - TestUserModel.store.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + userDoc2.create(user2); - final querySnaps = await queryStream.take(6).toList(); + flushBroadcasts(async); + userDoc.update(userUpdated); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + userDoc.create(user); + flushBroadcasts(async); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - querySnaps, - [ - // No data - [], - // User 1 created - [ - DocumentSnapshot(doc: userDoc, data: user), - ], - // User 1 updated - [ - DocumentSnapshot(doc: userDoc, data: userUpdated), - ], - // User 1 deleted - [], - // User 1 recreated + expect( + querySnaps, [ - DocumentSnapshot(doc: userDoc, data: user), + // No data + [], + // User 1 created + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + // User 1 updated + [ + DocumentSnapshot(doc: userDoc, data: userUpdated), + ], + // User 1 deleted + [], + // User 1 recreated + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + // User collection deleted + [], ], - // User collection deleted - [], - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); test( - 'Handles the scenario where a collection is removed and written to in the same task', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - - userDoc.create(user); - userDoc2.create(user2); + 'Handles the scenario where a collection is removed and written to in the same task', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - await asyncEvent(); + userDoc.create(user); + userDoc2.create(user2); - final queryStream = TestUserModel.store.stream().take(2); + flushBroadcasts(async); - await asyncEvent(); + final querySnaps = >>[]; + final sub = + TestUserModel.store.stream().listen(querySnaps.add); + flushBroadcasts(async); - TestUserModel.store.delete(); - userDoc.create(user); + TestUserModel.store.delete(); + userDoc.create(user); + flushBroadcasts(async); - final querySnaps = await queryStream.toList(); + expect( + querySnaps, + [ + [ + DocumentSnapshot(doc: userDoc, data: user), + DocumentSnapshot(doc: userDoc2, data: user2), + ], + // Rather than emitting an empty result set when the store is deleted, + // the stream batches changes in the same task together and emits a single + // update with the re-created user. + [ + DocumentSnapshot(doc: userDoc, data: user), + ], + ], + ); - expect( - querySnaps, - [ - [ - DocumentSnapshot(doc: userDoc, data: user), - DocumentSnapshot(doc: userDoc2, data: user2), - ], - // Rather than emitting an empty result set when the store is deleted, - // the stream batches changes in the same task together and emits a single - // update with the re-created user. - [ - DocumentSnapshot(doc: userDoc, data: user), - ], - ], - ); - }); + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); }); group( 'streamChanges', () { - test('Returns a stream of changes to the query', () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); - final updatedUser = TestUserModel('User 1 updated'); + test('Returns a stream of changes to the query', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); + final updatedUser = TestUserModel('User 1 updated'); - final events = - TestUserModel.store.streamChanges().take(4).toList(); + final events = + >>[]; + final sub = + TestUserModel.store.streamChanges().listen(events.add); - await asyncEvent(); - userDoc.create(user); - userDoc2.create(user2); - await asyncEvent(); - userDoc.update(updatedUser); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - TestUserModel.store.delete(); - await asyncEvent(); + flushBroadcasts(async); + userDoc.create(user); + userDoc2.create(user2); + flushBroadcasts(async); + userDoc.update(updatedUser); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - await events, - [ - // Create documents - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - event: BroadcastEvents.added, - prevData: null, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - event: BroadcastEvents.added, - prevData: null, - ), - ], - // Update document - [ - DocumentChangeSnapshot( - doc: userDoc, - data: updatedUser, - event: BroadcastEvents.modified, - prevData: user, - ), - ], - // Remove document + expect( + events, [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - event: BroadcastEvents.removed, - prevData: updatedUser, - ), + // Create documents + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + event: BroadcastEvents.added, + prevData: null, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + event: BroadcastEvents.added, + prevData: null, + ), + ], + // Update document + [ + DocumentChangeSnapshot( + doc: userDoc, + data: updatedUser, + event: BroadcastEvents.modified, + prevData: user, + ), + ], + // Remove document + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + event: BroadcastEvents.removed, + prevData: updatedUser, + ), + ], + // Delete user collection + [ + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + event: BroadcastEvents.removed, + prevData: user2, + ), + ] ], - // Delete user collection - [ - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - event: BroadcastEvents.removed, - prevData: user2, - ), - ] - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); - test('Localizes broadcast event types to the query', () async { - final user = TestUserModel('User 1'); - final userDoc = TestUserModel.store.doc('1'); + test('Localizes broadcast event types to the query', () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final userDoc = TestUserModel.store.doc('1'); - final changeSnaps = TestUserModel.store - .where((snap) => snap.data.name == 'User 1 updated') - .streamChanges() - .take(1) - .toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .where((snap) => snap.data.name == 'User 1 updated') + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - await asyncEvent(); - final updatedUser = TestUserModel('User 1 updated'); - userDoc.update(updatedUser); + userDoc.create(user); + flushBroadcasts(async); + final updatedUser = TestUserModel('User 1 updated'); + userDoc.update(updatedUser); + flushBroadcasts(async); - expect( - await changeSnaps, - [ + expect( + changeSnaps, [ - DocumentChangeSnapshot( - doc: userDoc, - data: updatedUser, - // The global event is a [BroadcastBroadcastEvents.modified] when the user is updated, - // but to this query, it should be a [BroadcastBroadcastEvents.added] event since previously - // it was not included and now it is. - event: BroadcastEvents.added, - prevData: null, - ), + [ + DocumentChangeSnapshot( + doc: userDoc, + data: updatedUser, + // The global event is a [BroadcastBroadcastEvents.modified] when the user is updated, + // but to this query, it should be a [BroadcastBroadcastEvents.added] event since previously + // it was not included and now it is. + event: BroadcastEvents.added, + prevData: null, + ), + ], ], - ], - ); + ); + + sub.cancel(); + async.flushMicrotasks(); + }); }); test( - 'Handles the scenario where a collection is removed and written to in the same task', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + 'Handles the scenario where a collection is removed and written to in the same task', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final changeSnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - userDoc2.create(user2); + userDoc.create(user); + userDoc2.create(user2); - await asyncEvent(); + flushBroadcasts(async); - TestUserModel.store.delete(); - userDoc.create(user); + TestUserModel.store.delete(); + userDoc.create(user); + flushBroadcasts(async); - expect( - await changeSnaps, - [ - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - prevData: null, - event: BroadcastEvents.added, - ) - ], - // The deletion of the collection and the re-creation of the document - // are batched together into a single change event that includes the correct events - // for each document. - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - prevData: user, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - prevData: user2, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - ], - ], - ); - }); + expect( + changeSnaps, + [ + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + prevData: null, + event: BroadcastEvents.added, + ) + ], + // The deletion of the collection and the re-creation of the document + // are batched together into a single change event that includes the correct events + // for each document. + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + prevData: user, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + prevData: user2, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + ], + ], + ); + + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); test( - 'Discards changes that occur to a collection in the same task as a subsequent deletion of that collection', - () async { - final user = TestUserModel('User 1'); - final user2 = TestUserModel('User 2'); - final userDoc = TestUserModel.store.doc('1'); - final userDoc2 = TestUserModel.store.doc('2'); + 'Discards changes that occur to a collection in the same task as a subsequent deletion of that collection', + () { + fakeAsync((async) { + final user = TestUserModel('User 1'); + final user2 = TestUserModel('User 2'); + final userDoc = TestUserModel.store.doc('1'); + final userDoc2 = TestUserModel.store.doc('2'); - final changeSnaps = - TestUserModel.store.streamChanges().take(2).toList(); + final changeSnaps = + >>[]; + final sub = TestUserModel.store + .streamChanges() + .listen(changeSnaps.add); - userDoc.create(user); - userDoc2.create(user2); + userDoc.create(user); + userDoc2.create(user2); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(TestUserModel('User 1 updated')); - TestUserModel.store.delete(); + userDoc.update(TestUserModel('User 1 updated')); + TestUserModel.store.delete(); + flushBroadcasts(async); - expect( - await changeSnaps, - [ - [ - DocumentChangeSnapshot( - doc: userDoc, - data: user, - prevData: null, - event: BroadcastEvents.added, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: user2, - prevData: null, - event: BroadcastEvents.added, - ) - ], - [ - DocumentChangeSnapshot( - doc: userDoc, - data: null, - prevData: user, - event: BroadcastEvents.removed, - ), - DocumentChangeSnapshot( - doc: userDoc2, - data: null, - prevData: user2, - event: BroadcastEvents.removed, - ), - ], - ], - ); - }); + expect( + changeSnaps, + [ + [ + DocumentChangeSnapshot( + doc: userDoc, + data: user, + prevData: null, + event: BroadcastEvents.added, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: user2, + prevData: null, + event: BroadcastEvents.added, + ) + ], + [ + DocumentChangeSnapshot( + doc: userDoc, + data: null, + prevData: user, + event: BroadcastEvents.removed, + ), + DocumentChangeSnapshot( + doc: userDoc2, + data: null, + prevData: user2, + event: BroadcastEvents.removed, + ), + ], + ], + ); + + sub.cancel(); + async.flushMicrotasks(); + }); + }, + ); }, ); test( "Maintains its dependency/snapshot caches correctly", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['userId'] != null) { - return { - usersCollection.doc(snap.data['userId'].toString()), - }; - } - return null; - }, - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['userId'] != null) { + return { + usersCollection.doc(snap.data['userId'].toString()), + }; + } + return null; + }, + ); - final postDoc = postsCollection.doc('1'); - final post1Data = {"id": 1, "text": "Post 1", "userId": 1}; - final post1Data2 = {"id": 1, "text": "Post 1 updated"}; - final post1Data3 = { - "id": 1, - "text": "Post 1 updated", - "userId": 3 - }; - final postDoc2 = postsCollection.doc('2'); - final post2Data = {"id": 2, "text": "Post 2", "userId": 2}; - final userDoc = usersCollection.doc('1'); - final userDoc2 = usersCollection.doc('2'); - final userDoc3 = usersCollection.doc('3'); - final postsObs = postsCollection.toQuery().observe(); - - postDoc.create(post1Data); - await asyncEvent(); - - // After creating the post document, the query should have a global and document level - // dependency. - expect(postsObs.inspect(), { - "deps": { - "__ref": 1, - "users": { + final postDoc = postsCollection.doc('1'); + final post1Data = {"id": 1, "text": "Post 1", "userId": 1}; + final post1Data2 = {"id": 1, "text": "Post 1 updated"}; + final post1Data3 = { + "id": 1, + "text": "Post 1 updated", + "userId": 3 + }; + final postDoc2 = postsCollection.doc('2'); + final post2Data = {"id": 2, "text": "Post 2", "userId": 2}; + final userDoc = usersCollection.doc('1'); + final userDoc2 = usersCollection.doc('2'); + final userDoc3 = usersCollection.doc('3'); + final postsObs = postsCollection.toQuery().observe(); + + postDoc.create(post1Data); + flushBroadcasts(async); + + // After creating the post document, the query should have a global and document level + // dependency. + expect(postsObs.inspect(), { + "deps": { "__ref": 1, - "1": 1, + "users": { + "__ref": 1, + "1": 1, + }, }, - }, - "docDeps": { - postDoc: { - userDoc, + "docDeps": { + postDoc: { + userDoc, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), + } + }); - postDoc.update(post1Data2); - await asyncEvent(); + postDoc.update(post1Data2); + flushBroadcasts(async); - // After updating the document, it should have removed the user dependency from the observable's - // dep tree and document level dependency cache. - expect(postsObs.inspect(), { - "deps": {}, - "docDeps": {}, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data2), - } - }); + // After updating the document, it should have removed the user dependency from the observable's + // dep tree and document level dependency cache. + expect(postsObs.inspect(), { + "deps": {}, + "docDeps": {}, + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data2), + } + }); - postDoc.update(post1Data); - postDoc2.create(post2Data); - await asyncEvent(); + postDoc.update(post1Data); + postDoc2.create(post2Data); + flushBroadcasts(async); - // After updating the first post to have a user dependency again, and creating a second - // post with another user dependency, the query's dependencies should have two entries. - expect(postsObs.inspect(), { - "deps": { - "__ref": 2, - "users": { + // After updating the first post to have a user dependency again, and creating a second + // post with another user dependency, the query's dependencies should have two entries. + expect(postsObs.inspect(), { + "deps": { "__ref": 2, - "1": 1, - "2": 1, - }, - }, - "docDeps": { - postDoc: { - userDoc, + "users": { + "__ref": 2, + "1": 1, + "2": 1, + }, }, - postDoc2: { - userDoc2, + "docDeps": { + postDoc: { + userDoc, + }, + postDoc2: { + userDoc2, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), - postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data), + postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), + } + }); - postDoc.update(post1Data3); - await asyncEvent(); + postDoc.update(post1Data3); + flushBroadcasts(async); - // After updating the post to be dependent on a different user, the observable query - // should have removed the reference to the previous user (user 1) and replaced it with the - // dependency on user 3. - expect(postsObs.inspect(), { - "deps": { - "__ref": 2, - "users": { + // After updating the post to be dependent on a different user, the observable query + // should have removed the reference to the previous user (user 1) and replaced it with the + // dependency on user 3. + expect(postsObs.inspect(), { + "deps": { "__ref": 2, - "2": 1, - "3": 1, - }, - }, - "docDeps": { - postDoc: { - userDoc3, + "users": { + "__ref": 2, + "2": 1, + "3": 1, + }, }, - postDoc2: { - userDoc2, + "docDeps": { + postDoc: { + userDoc3, + }, + postDoc2: { + userDoc2, + }, }, - }, - "docSnaps": { - postDoc: DocumentSnapshot(doc: postDoc, data: post1Data3), - postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), - } - }); + "docSnaps": { + postDoc: DocumentSnapshot(doc: postDoc, data: post1Data3), + postDoc2: DocumentSnapshot(doc: postDoc2, data: post2Data), + } + }); - postsCollection.delete(); - await asyncEvent(); + postsCollection.delete(); + flushBroadcasts(async); - // After deleting the posts collection, the query should have cleared its global - // and document level dependencies. - expect( - postsObs.inspect(), - {"deps": {}, "docDeps": {}, "docSnaps": {}}, - ); + // After deleting the posts collection, the query should have cleared its global + // and document level dependencies. + expect( + postsObs.inspect(), + {"deps": {}, "docDeps": {}, "docSnaps": {}}, + ); + }); }, ); test( 'Invalidates its cached value when its collection is updated', - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); + () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); - final userDoc = usersCollection.doc('1'); - final userDoc2 = usersCollection.doc('2'); - final userData = TestUserModel('User 1'); - final userDataUpdated = TestUserModel('User 1 updated'); - final user2Data = TestUserModel('User 2'); + final userDoc = usersCollection.doc('1'); + final userDoc2 = usersCollection.doc('2'); + final userData = TestUserModel('User 1'); + final userDataUpdated = TestUserModel('User 1 updated'); + final user2Data = TestUserModel('User 2'); - userDoc.create(userData); - userDoc2.create(user2Data); - final usersObs = usersCollection.observe(); + userDoc.create(userData); + userDoc2.create(user2Data); + final usersObs = usersCollection.observe(); - await asyncEvent(); + flushBroadcasts(async); - List> snaps = usersObs.get(); + List> snaps = usersObs.get(); - // The observable's value is cached and is not recomputed unless invalidated by - // a change to its collection. - expect(identical(snaps, usersObs.get()), true); - expect( - snaps, - [ - DocumentSnapshot(doc: userDoc, data: userData), - DocumentSnapshot(doc: userDoc2, data: user2Data), - ], - ); + // The observable's value is cached and is not recomputed unless invalidated by + // a change to its collection. + expect(identical(snaps, usersObs.get()), true); + expect( + snaps, + [ + DocumentSnapshot(doc: userDoc, data: userData), + DocumentSnapshot(doc: userDoc2, data: user2Data), + ], + ); - userDoc.update(userDataUpdated); - snaps = usersObs.get(); + userDoc.update(userDataUpdated); + snaps = usersObs.get(); - // The updated document has not been broadcast yet, however the observable's cached - // value has been invalidated by the update to one of its documents and should return - // an up-to-date recalculated value. - expect(snaps, [ - DocumentSnapshot(doc: userDoc, data: userDataUpdated), - DocumentSnapshot(doc: userDoc2, data: user2Data), - ]); - // It should then cache that value until it is invalidated again. - expect(identical(snaps, usersObs.get()), true); + // The updated document has not been broadcast yet, however the observable's cached + // value has been invalidated by the update to one of its documents and should return + // an up-to-date recalculated value. + expect(snaps, [ + DocumentSnapshot(doc: userDoc, data: userDataUpdated), + DocumentSnapshot(doc: userDoc2, data: user2Data), + ]); + // It should then cache that value until it is invalidated again. + expect(identical(snaps, usersObs.get()), true); - // Deleting a document in the observable's collection should invalidate its cached value. - userDoc.delete(); + // Deleting a document in the observable's collection should invalidate its cached value. + userDoc.delete(); - snaps = usersObs.get(); - expect(snaps, [ - DocumentSnapshot(doc: userDoc2, data: user2Data), - ]); - expect(identical(snaps, usersObs.get()), true); + snaps = usersObs.get(); + expect(snaps, [ + DocumentSnapshot(doc: userDoc2, data: user2Data), + ]); + expect(identical(snaps, usersObs.get()), true); - // Deleting the observable's associated collection should invalidate its cached value. - usersCollection.delete(); - snaps = usersObs.get(); - expect(snaps, []); - expect(identical(snaps, usersObs.get()), true); + // Deleting the observable's associated collection should invalidate its cached value. + usersCollection.delete(); + snaps = usersObs.get(); + expect(snaps, []); + expect(identical(snaps, usersObs.get()), true); - userDoc.create(userData); - snaps = usersObs.get(); + userDoc.create(userData); + snaps = usersObs.get(); - await asyncEvent(); + flushBroadcasts(async); - // After waiting for the broadcast, the observable's cached value should not have changed - // from its previous recalculation when it was accessed with [get()]. The broadcast should have reused - // that value. - expect(identical(snaps, usersObs.get()), true); + // After waiting for the broadcast, the observable's cached value should not have changed + // from its previous recalculation when it was accessed with [get()]. The broadcast should have reused + // that value. + expect(identical(snaps, usersObs.get()), true); + }); }, ); @@ -1966,82 +2051,95 @@ void main() { test( 'Broadcasts the clear to observers', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); - final userDocStream = userDoc.stream(); - final userCollectionStream = TestUserModel.store.stream(); - - userDoc.create(userData); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); + final userDocStreamData = ?>[]; + final userCollectionStreamData = + >>[]; + final docSub = userDoc.stream().listen(userDocStreamData.add); + final collectionSub = TestUserModel.store + .stream() + .listen(userCollectionStreamData.add); + flushBroadcasts(async); - await asyncEvent(); + userDoc.create(userData); - Loon.clearAll(); + flushBroadcasts(async); - final userDocStreamData = await userDocStream.take(3).toList(); - final userCollectionStreamData = - await userCollectionStream.take(3).toList(); + Loon.clearAll(); - expect( - userDocStreamData, - [ - null, - DocumentSnapshot( - doc: userDoc, - data: userData, - ), - null, - ], - ); + flushBroadcasts(async); - expect( - userCollectionStreamData, - [ - [], + expect( + userDocStreamData, [ + null, DocumentSnapshot( doc: userDoc, data: userData, ), + null, + ], + ); + + expect( + userCollectionStreamData, + [ + [], + [ + DocumentSnapshot( + doc: userDoc, + data: userData, + ), + ], + [], ], - [], - ], - ); + ); + + docSub.cancel(); + collectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); test( 'Does not broadcast the clear to observers when broadcast: false', - () async { - final userDoc = TestUserModel.store.doc('1'); - final userData = TestUserModel('User 1'); + () { + fakeAsync((async) { + final userDoc = TestUserModel.store.doc('1'); + final userData = TestUserModel('User 1'); - final docEvents = ?>[]; - final collectionEvents = - >>[]; - final docSub = userDoc.stream().listen(docEvents.add); - final collectionSub = - TestUserModel.store.stream().listen(collectionEvents.add); - addTearDown(() { - docSub.cancel(); - collectionSub.cancel(); - }); + final docEvents = ?>[]; + final collectionEvents = + >>[]; + final docSub = userDoc.stream().listen(docEvents.add); + final collectionSub = + TestUserModel.store.stream().listen(collectionEvents.add); + flushBroadcasts(async); - userDoc.create(userData); - await asyncEvent(); + userDoc.create(userData); + flushBroadcasts(async); - await Loon.clearAll(broadcast: false); - await asyncEvent(); + Loon.clearAll(broadcast: false); + flushBroadcasts(async); - expect(Loon.inspect()['store'], {}); - expect(docEvents, [ - null, - DocumentSnapshot(doc: userDoc, data: userData), - ]); - expect(collectionEvents, [ - [], - [DocumentSnapshot(doc: userDoc, data: userData)], - ]); + expect(Loon.inspect()['store'], {}); + expect(docEvents, [ + null, + DocumentSnapshot(doc: userDoc, data: userData), + ]); + expect(collectionEvents, [ + [], + [DocumentSnapshot(doc: userDoc, data: userData)], + ]); + + docSub.cancel(); + collectionSub.cancel(); + async.flushMicrotasks(); + }); }, ); }, @@ -2309,81 +2407,9 @@ void main() { ); }); - test("Rebroadcasts an observable document on dependency changes", - () async { - final usersCollection = Loon.collection('users'); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - if (snap.data['text'] == 'Post 1') { - return { - usersCollection.doc('1'), - }; - } - return {}; - }, - ); - - final postDoc = postsCollection.doc('1'); - final postData = {"id": 1, "text": "Post 1"}; - final updatedPostData1 = {"text": "Post 1 updated"}; - final userDoc = usersCollection.doc('1'); - final userData = {"id": 1, "name": "User 1"}; - final updatedUserData = {"id": 1, "name": "User 1 updated"}; - final postStream = postDoc.stream(); - - postDoc.create(postData); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - userDoc.update(updatedUserData); - await asyncEvent(); - userDoc.delete(); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - usersCollection.delete(); - await asyncEvent(); - usersCollection.doc('1').create(userData); - await asyncEvent(); - postDoc.update(updatedPostData1); - await asyncEvent(); - // Skips this update to user doc, since the last update to the post - // caused the user doc to be removed as a dependency. - userDoc.update(updatedUserData); - await asyncEvent(); - postDoc.delete(); - - final snaps = await postStream.take(10).toList(); - - expect(snaps, [ - // No post yet - null, - // Post created - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user created - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user updated - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user deleted - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user re-added (ensures dependencies remain across deletion/re-creation) - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user collection deleted - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast post when user re-added - DocumentSnapshot(doc: postDoc, data: postData), - // Rebroadcast when post data is updated - DocumentSnapshot(doc: postDoc, data: updatedPostData1), - // Rebroadcast when post deleted - null, - ]); - }); - - test( - "Rebroadcasts an observable query on dependency changes", - () async { - final usersCollection = Loon.collection('users'); + test("Rebroadcasts an observable document on dependency changes", () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); final postsCollection = Loon.collection( 'posts', dependenciesBuilder: (snap) { @@ -2402,158 +2428,238 @@ void main() { final userDoc = usersCollection.doc('1'); final userData = {"id": 1, "name": "User 1"}; final updatedUserData = {"id": 1, "name": "User 1 updated"}; - final postsStream = postsCollection.stream(); + final snaps = ?>[]; + final sub = postDoc.stream().listen(snaps.add); postDoc.create(postData); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); userDoc.update(updatedUserData); - await asyncEvent(); + flushBroadcasts(async); userDoc.delete(); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); usersCollection.delete(); - await asyncEvent(); - userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); + usersCollection.doc('1').create(userData); + flushBroadcasts(async); postDoc.update(updatedPostData1); - await asyncEvent(); + flushBroadcasts(async); // Skips this update to user doc, since the last update to the post // caused the user doc to be removed as a dependency. userDoc.update(updatedUserData); - await asyncEvent(); - postsCollection.delete(); - await asyncEvent(); - - final snaps = await postsStream.take(10).toList(); + flushBroadcasts(async); + postDoc.delete(); + flushBroadcasts(async); expect(snaps, [ // No post yet - [], + null, // Post created - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user created - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user updated - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user deleted - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user recreated (ensures dependencies remain across deletion/re-creation) - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user collection deleted - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when user recreated - [DocumentSnapshot(doc: postDoc, data: postData)], - // Rebroadcast posts when post updated - [DocumentSnapshot(doc: postDoc, data: updatedPostData1)], - // Rebroadcast posts when posts collection deleted - [], + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user created + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user updated + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user deleted + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user re-added (ensures dependencies remain across deletion/re-creation) + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user collection deleted + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast post when user re-added + DocumentSnapshot(doc: postDoc, data: postData), + // Rebroadcast when post data is updated + DocumentSnapshot(doc: postDoc, data: updatedPostData1), + // Rebroadcast when post deleted + null, ]); + + sub.cancel(); + async.flushMicrotasks(); + }); + }); + + test( + "Rebroadcasts an observable query on dependency changes", + () { + fakeAsync((async) { + final usersCollection = Loon.collection('users'); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + if (snap.data['text'] == 'Post 1') { + return { + usersCollection.doc('1'), + }; + } + return {}; + }, + ); + + final postDoc = postsCollection.doc('1'); + final postData = {"id": 1, "text": "Post 1"}; + final updatedPostData1 = {"text": "Post 1 updated"}; + final userDoc = usersCollection.doc('1'); + final userData = {"id": 1, "name": "User 1"}; + final updatedUserData = {"id": 1, "name": "User 1 updated"}; + final snaps = >>[]; + final sub = postsCollection.stream().listen(snaps.add); + + postDoc.create(postData); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + userDoc.update(updatedUserData); + flushBroadcasts(async); + userDoc.delete(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + usersCollection.delete(); + flushBroadcasts(async); + userDoc.create(userData); + flushBroadcasts(async); + postDoc.update(updatedPostData1); + flushBroadcasts(async); + // Skips this update to user doc, since the last update to the post + // caused the user doc to be removed as a dependency. + userDoc.update(updatedUserData); + flushBroadcasts(async); + postsCollection.delete(); + flushBroadcasts(async); + + expect(snaps, [ + // No post yet + [], + // Post created + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user created + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user updated + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user deleted + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user recreated (ensures dependencies remain across deletion/re-creation) + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user collection deleted + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when user recreated + [DocumentSnapshot(doc: postDoc, data: postData)], + // Rebroadcast posts when post updated + [DocumentSnapshot(doc: postDoc, data: updatedPostData1)], + // Rebroadcast posts when posts collection deleted + [], + ]); + + sub.cancel(); + async.flushMicrotasks(); + }); }, ); - test("Cyclical dependencies do not cause infinite rebroadcasts", - () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - dependenciesBuilder: (snap) { - return { - Loon.collection('posts').doc('1'), - }; - }, - ); - final postsCollection = Loon.collection( - 'posts', - dependenciesBuilder: (snap) { - return { - usersCollection.doc('1'), - }; - }, - ); + test("Cyclical dependencies do not cause infinite rebroadcasts", () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + dependenciesBuilder: (snap) { + return { + Loon.collection('posts').doc('1'), + }; + }, + ); + final postsCollection = Loon.collection( + 'posts', + dependenciesBuilder: (snap) { + return { + usersCollection.doc('1'), + }; + }, + ); - final userDoc = usersCollection.doc('1'); - final postDoc = postsCollection.doc('1'); - final userData = TestUserModel('Test user 1'); - final updatedUserData = TestUserModel('Test user 1 updated'); - final userObservable = userDoc.observe(); - final userStream = userObservable.stream(); + final userDoc = usersCollection.doc('1'); + final postDoc = postsCollection.doc('1'); + final userData = TestUserModel('Test user 1'); + final updatedUserData = TestUserModel('Test user 1 updated'); + final userObservable = userDoc.observe(); + final userEvents = ?>[]; + final sub = userObservable.stream().listen(userEvents.add); - userDoc.create(userData); + userDoc.create(userData); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - { - "users": { - "__values": { - "1": { - postDoc, - }, + expect( + Loon.inspect()['dependencyStore'], + { + "users": { + "__values": { + "1": { + postDoc, + }, + } + }, + }, + ); + expect( + Loon.inspect()['dependentsStore'], + { + postDoc: { + userDoc, } }, - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - postDoc: { - userDoc, - } - }, - ); + ); - postDoc.create({ - "id": 1, - "name": "Post 1", - }); + postDoc.create({ + "id": 1, + "name": "Post 1", + }); - expect( - Loon.inspect()['dependencyStore'], - { - "users": { - "__values": { - "1": { - postDoc, + expect( + Loon.inspect()['dependencyStore'], + { + "users": { + "__values": { + "1": { + postDoc, + }, }, }, - }, - "posts": { - "__values": { - "1": { - userDoc, + "posts": { + "__values": { + "1": { + userDoc, + }, }, }, }, - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - postDoc: { - userDoc, + ); + expect( + Loon.inspect()['dependentsStore'], + { + postDoc: { + userDoc, + }, + userDoc: { + postDoc, + } }, - userDoc: { - postDoc, - } - }, - ); + ); - await asyncEvent(); + flushBroadcasts(async); - userDoc.update(updatedUserData); + userDoc.update(updatedUserData); - await asyncEvent(); + flushBroadcasts(async); - userObservable.dispose(); + userObservable.dispose(); + async.flushMicrotasks(); - expectLater( - userStream, - emitsInOrder([ + expect(userEvents, [ // First emits null when no user has been written. null, // Emits the initially created user. @@ -2564,38 +2670,39 @@ void main() { DocumentSnapshot(doc: userDoc, data: userData), // Emits the updated user. DocumentSnapshot(doc: userDoc, data: updatedUserData), - emitsDone, - ]), - ); + ]); + + sub.cancel(); + async.flushMicrotasks(); + }); }); - test("Deleting a collection clears its dependencies", () async { - final usersCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - ); - final friendsCollection = Loon.collection( - 'friends', - fromJson: TestUserModel.fromJson, - toJson: (snap) => snap.toJson(), - dependenciesBuilder: (snap) { - return { - usersCollection.doc(snap.doc.id), - }; - }, - ); + test("Deleting a collection clears its dependencies", () { + fakeAsync((async) { + final usersCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + ); + final friendsCollection = Loon.collection( + 'friends', + fromJson: TestUserModel.fromJson, + toJson: (snap) => snap.toJson(), + dependenciesBuilder: (snap) { + return { + usersCollection.doc(snap.doc.id), + }; + }, + ); - final userDoc = usersCollection.doc('1'); - final friendDoc = friendsCollection.doc('1'); - userDoc.create(TestUserModel('User 1')); - friendDoc.create(TestUserModel('Friend 1')); + final userDoc = usersCollection.doc('1'); + final friendDoc = friendsCollection.doc('1'); + userDoc.create(TestUserModel('User 1')); + friendDoc.create(TestUserModel('Friend 1')); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - { + expect(Loon.inspect()['dependencyStore'], { "friends": { "__values": { "1": { @@ -2603,43 +2710,43 @@ void main() { } } } - }, - ); - expect( - Loon.inspect()['dependentsStore'], - { - userDoc: { - friendDoc, - } - }, - ); + }); + expect( + Loon.inspect()['dependentsStore'], + { + userDoc: { + friendDoc, + } + }, + ); - friendsCollection.delete(); + friendsCollection.delete(); - await asyncEvent(); + flushBroadcasts(async); - expect( - Loon.inspect()['dependencyStore'], - {}, - ); - expect( - Loon.inspect()['dependentsStore'], - { - // The dependents are not cleared when a collection is cleared, instead - // the dependents are lazily cleared when the dependent is updated. - userDoc: { - friendDoc, + expect( + Loon.inspect()['dependencyStore'], + {}, + ); + expect( + Loon.inspect()['dependentsStore'], + { + // The dependents are not cleared when a collection is cleared, instead + // the dependents are lazily cleared when the dependent is updated. + userDoc: { + friendDoc, + }, }, - }, - ); + ); - // Now that the user has been updated, it has cleared its friend dependent. - userDoc.update(TestUserModel('User 1 updated')); + // Now that the user has been updated, it has cleared its friend dependent. + userDoc.update(TestUserModel('User 1 updated')); - await asyncEvent(); + flushBroadcasts(async); - expect(Loon.inspect()['dependencyStore'], {}); - expect(Loon.inspect()['dependentsStore'], {}); + expect(Loon.inspect()['dependencyStore'], {}); + expect(Loon.inspect()['dependentsStore'], {}); + }); }); }); diff --git a/test/core/observable_query_equivalence_test.dart b/test/core/observable_query_equivalence_test.dart index a1815ea..3ef1757 100644 --- a/test/core/observable_query_equivalence_test.dart +++ b/test/core/observable_query_equivalence_test.dart @@ -1,4 +1,5 @@ import 'dart:math'; +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; @@ -24,11 +25,8 @@ import '../utils.dart'; /// so it can be compared as an ordered list; the unsorted variant is compared /// as a set since its order is unspecified. /// -/// Notes on determinism: a single long walk is used rather than many short -/// trials because resetting the global store between trials schedules broadcasts -/// that can race the next trial's observer. A 1ms settle (rather than a -/// zero-duration one) guarantees the broadcast's zero-duration timer has fired -/// and its microtask delivery completed before each comparison. +/// Tests run under fake time so the broadcast timer and its stream delivery are +/// flushed explicitly before each comparison. int _cmp(DocumentSnapshot a, DocumentSnapshot b) { final byValue = a.data.compareTo(b.data); @@ -41,21 +39,22 @@ List _ordered(List> snaps) => List _asSet(List> snaps) => _ordered(snaps)..sort(); -Future _reset() async { +void _reset(FakeAsync async) { Loon.unsubscribe(); // broadcast: false so the reset doesn't schedule a broadcast that could // race the next test's observer. - await Loon.clearAll(broadcast: false); - await asyncEvent(); + Loon.clearAll(broadcast: false); + async.flushMicrotasks(); } -Future _walk({ +void _walk({ + required FakeAsync async, required bool sorted, required int seed, required int threshold, int rounds = 400, -}) async { - await _reset(); +}) { + _reset(async); final r = Random(seed); final col = Loon.collection('items'); @@ -66,7 +65,7 @@ Future _walk({ final emissions = >>[]; final sub = obs.stream().listen(emissions.add); - await asyncEvent(); + flushBroadcasts(async); final present = {}; for (var round = 0; round < rounds; round++) { @@ -83,7 +82,7 @@ Future _walk({ } } - await asyncEvent(); + flushBroadcasts(async); final oracle = (sorted ? col.where(filter).sortBy(_cmp) : col.where(filter)).get(); @@ -97,33 +96,37 @@ Future _walk({ } } - await sub.cancel(); + sub.cancel(); + async.flushMicrotasks(); } void main() { - tearDown(_reset); + tearDown(() { + Loon.unsubscribe(); + Loon.clearAll(broadcast: false); + }); - test('Coalesced delete and recreate evicts a cached query result', () async { - await _reset(); + test('Coalesced delete and recreate evicts a cached query result', () { + fakeAsync((async) { + _reset(async); - final col = Loon.collection('items'); - final doc = col.doc('1'); - doc.create(5); - await asyncEvent(); + final col = Loon.collection('items'); + final doc = col.doc('1'); + doc.create(5); + flushBroadcasts(async); - final query = col.where((snap) => snap.data >= 4).observe(); - final emissions = >>[]; - final changes = >>[]; - final valueSub = query.stream().listen(emissions.add); - final changeSub = query.streamChanges().listen(changes.add); - await asyncEvent(); + final query = col.where((snap) => snap.data >= 4).observe(); + final emissions = >>[]; + final changes = >>[]; + final valueSub = query.stream().listen(emissions.add); + final changeSub = query.streamChanges().listen(changes.add); + flushBroadcasts(async); - try { expect(_ordered(emissions.last), ['1=5']); doc.delete(); doc.create(0); - await asyncEvent(); + flushBroadcasts(async); expect(_ordered(emissions.last), isEmpty); expect(changes, [ @@ -136,10 +139,10 @@ void main() { ), ], ]); - } finally { - await valueSub.cancel(); - await changeSub.cancel(); - } + valueSub.cancel(); + changeSub.cancel(); + async.flushMicrotasks(); + }); }); group('ObservableQuery equivalence', () { @@ -150,12 +153,15 @@ void main() { test( '${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' 'matches a full recompute', - () async { - await _walk( - sorted: sorted, - seed: 1000 + threshold, - threshold: threshold, - ); + () { + fakeAsync((async) { + _walk( + async: async, + sorted: sorted, + seed: 1000 + threshold, + threshold: threshold, + ); + }); }, ); } diff --git a/test/core/persistor/lock_test.dart b/test/core/persistor/lock_test.dart index 231cea4..3c16093 100644 --- a/test/core/persistor/lock_test.dart +++ b/test/core/persistor/lock_test.dart @@ -1,85 +1,129 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/persistor/lock.dart'; +import '../../utils.dart'; + +const _workDuration = Duration(milliseconds: 10); +const _burstWorkDuration = Duration(milliseconds: 5); + void main() { group('Lock', () { test( 'Serializes a single waiter behind the holder', - () async { - final lock = Lock(); - final order = []; + () { + fakeAsync((async) { + final lock = Lock(); + final order = []; + var aComplete = false; + var bComplete = false; - final a = lock.run(() async { - order.add('A-start'); - await Future.delayed(const Duration(milliseconds: 10)); - order.add('A-end'); - }); + lock.run(() async { + order.add('A-start'); + await Future.delayed(_workDuration); + order.add('A-end'); + }).then((_) => aComplete = true); - // Give A time to acquire before B arrives. - await Future.delayed(Duration.zero); + async.flushMicrotasks(); - final b = lock.run(() async { - order.add('B-start'); - await Future.delayed(const Duration(milliseconds: 10)); - order.add('B-end'); + lock.run(() async { + order.add('B-start'); + await Future.delayed(_workDuration); + order.add('B-end'); + }).then((_) => bComplete = true); + + async.flushMicrotasks(); + expect(order, ['A-start']); + expect(aComplete, false); + expect(bComplete, false); + + elapseAndFlush(async, _workDuration); + expect(order, ['A-start', 'A-end', 'B-start']); + expect(aComplete, true); + expect(bComplete, false); + + elapseAndFlush(async, _workDuration); + expect(order, ['A-start', 'A-end', 'B-start', 'B-end']); + expect(bComplete, true); }); - - await Future.wait([a, b]); - - expect(order, ['A-start', 'A-end', 'B-start', 'B-end']); }, ); test( 'Serializes multiple waiters that all queue on the same holder', - () async { - final lock = Lock(); - int inFlight = 0; - int maxInFlight = 0; - - Future work() { - return lock.run(() async { - inFlight++; - if (inFlight > maxInFlight) maxInFlight = inFlight; - await Future.delayed(const Duration(milliseconds: 10)); - inFlight--; - }); - } - - final a = work(); - await Future.delayed(Duration.zero); - final b = work(); - final c = work(); - - await Future.wait([a, b, c]); - - expect(maxInFlight, 1); + () { + fakeAsync((async) { + final lock = Lock(); + var inFlight = 0; + var maxInFlight = 0; + var completed = 0; + + void work() { + lock.run(() async { + inFlight++; + if (inFlight > maxInFlight) maxInFlight = inFlight; + await Future.delayed(_workDuration); + inFlight--; + }).then((_) => completed++); + } + + work(); + async.flushMicrotasks(); + work(); + work(); + async.flushMicrotasks(); + + expect(maxInFlight, 1); + expect(completed, 0); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 1); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 2); + + elapseAndFlush(async, _workDuration); + expect(maxInFlight, 1); + expect(completed, 3); + }); }, ); test( 'Serializes a burst of concurrent acquires', - () async { - final lock = Lock(); - int inFlight = 0; - int maxInFlight = 0; - int completed = 0; - - Future work() { - return lock.run(() async { - inFlight++; - if (inFlight > maxInFlight) maxInFlight = inFlight; - await Future.delayed(const Duration(milliseconds: 5)); - inFlight--; - completed++; - }); - } - - final futures = List.generate(10, (_) => work()); - await Future.wait(futures); - - expect(maxInFlight, 1); - expect(completed, 10); + () { + fakeAsync((async) { + final lock = Lock(); + var inFlight = 0; + var maxInFlight = 0; + var completed = 0; + + void work() { + lock.run(() async { + inFlight++; + if (inFlight > maxInFlight) maxInFlight = inFlight; + await Future.delayed(_burstWorkDuration); + inFlight--; + completed++; + }); + } + + for (var i = 0; i < 10; i++) { + work(); + } + + async.flushMicrotasks(); + expect(maxInFlight, 1); + + for (var i = 0; i < 10; i++) { + elapseAndFlush(async, _burstWorkDuration); + expect(maxInFlight, 1); + } + + expect(completed, 10); + }); }, ); diff --git a/test/core/persistor/persist_manager_test.dart b/test/core/persistor/persist_manager_test.dart index 7425c7b..75438d5 100644 --- a/test/core/persistor/persist_manager_test.dart +++ b/test/core/persistor/persist_manager_test.dart @@ -1,12 +1,16 @@ +import 'package:fake_async/fake_async.dart'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; import '../../models/test_persistor.dart'; import '../../models/test_user_model.dart'; +import '../../utils.dart'; void main() { - tearDown(() { - Loon.clearAll(); + tearDown(() async { + Loon.configure(persistor: null); + Loon.unsubscribe(); + await Loon.clearAll(broadcast: false); }); /// Tests the behavior of the [PersistManager] to check that it properly batches @@ -14,159 +18,172 @@ void main() { group('Persist Manager', () { test( 'Batches contiguous persistence operations', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.doc('2').create(TestUserModel('User 2')); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + + userCollection.doc('3').create(TestUserModel('User 3')); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + + // There should be two persistence calls: + // 1. The first two writes should be grouped together into a single batch in the first 1ms throttle. + // 2. The third write is grouped into its own batch. + expect(batches.length, 2); + expect( + batches.first, + { + userCollection.doc('1'), + userCollection.doc('2'), }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - - userCollection.doc('3').create(TestUserModel('User 3')); - - await TestPersistor.completer.onPersist; - - // There should be two persistence calls: - // 1. The first two writes should be grouped together into a single batch in the first 1ms throttle. - // 2. The third write is grouped into its own batch. - expect(batches.length, 2); - expect( - batches.first, - { - userCollection.doc('1'), - userCollection.doc('2'), - }, - ); - expect(batches.last, {userCollection.doc('3')}); + ); + expect(batches.last, {userCollection.doc('3')}); + }); }, ); test( 'Does not batch non-contiguous operations', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); - }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.delete(); - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - await TestPersistor.completer.onClear; - await TestPersistor.completer.onPersist; - - // There should be a separate persistence call for each document in this scenario since the operation - // order must be persist->clear->persist in order to ensure the correct sequencing of events. - expect(batches.length, 2); - expect( - batches.first, - {userCollection.doc('1')}, - ); - expect(batches.last, {userCollection.doc('2')}); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.delete(); + userCollection.doc('2').create(TestUserModel('User 2')); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + // There should be a separate persistence call for each document in this scenario since the operation + // order must be persist->clear->persist in order to ensure the correct sequencing of events. + expect(batches.length, 2); + expect( + batches.first, + {userCollection.doc('1')}, + ); + expect(batches.last, {userCollection.doc('2')}); + }); }, ); test( 'De-dupes persisting the same document', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onPersist: (docs) { - batches.add(docs); - }, - ), - ); - - final userCollection = Loon.collection( - 'users', - fromJson: TestUserModel.fromJson, - toJson: (user) => user.toJson(), - ); - - userCollection.doc('1').create(TestUserModel('User 1')); - userCollection.doc('1').update(TestUserModel('User 1 updated')); - - await TestPersistor.completer.onPersist; - - userCollection.doc('2').create(TestUserModel('User 2')); - - await TestPersistor.completer.onPersist; - - expect(batches.length, 2); - - // The multiple updates to user doc 1 are grouped and de-duped in the same batch. Persistors can read the current - // value of the documents in the batch so there is no need to include document updates again. - expect(batches.first, [userCollection.doc('1')]); - expect(batches.last, [userCollection.doc('2')]); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onPersist: (docs) { + batches.add(docs); + }, + ), + ); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + final userCollection = Loon.collection( + 'users', + fromJson: TestUserModel.fromJson, + toJson: (user) => user.toJson(), + ); + + userCollection.doc('1').create(TestUserModel('User 1')); + userCollection.doc('1').update(TestUserModel('User 1 updated')); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + + userCollection.doc('2').create(TestUserModel('User 2')); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + + expect(batches.length, 2); + + // The multiple updates to user doc 1 are grouped and de-duped in the same batch. Persistors can read the current + // value of the documents in the batch so there is no need to include document updates again. + expect(batches.first, [userCollection.doc('1')]); + expect(batches.last, [userCollection.doc('2')]); + }); }, ); test( 'Batches clearing of collections', - () async { - List> batches = []; - - Loon.configure( - persistor: TestPersistor( - seedData: [], - onClear: (refs) { - batches.add(refs); - }, - ), - ); - - Loon.collection('users').doc('1').create(true); - Loon.collection('posts').doc('1').create(true); - Loon.collection('messages').doc('1').create(true); - - Loon.collection('users').delete(); - Loon.collection('posts').delete(); - - await TestPersistor.completer.onClear; - - Loon.collection('messages').delete(); - - await TestPersistor.completer.onClear; - - expect(batches.length, 2); - - expect(batches.first, [ - Loon.collection('users'), - Loon.collection('posts'), - ]); - expect(batches.last, [Loon.collection('messages')]); + () { + fakeAsync((async) { + final batches = >[]; + + Loon.configure( + persistor: TestPersistor( + seedData: [], + onClear: (refs) { + batches.add(refs); + }, + ), + ); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + Loon.collection('users').doc('1').create(true); + Loon.collection('posts').doc('1').create(true); + Loon.collection('messages').doc('1').create(true); + + Loon.collection('users').delete(); + Loon.collection('posts').delete(); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + elapseAndFlush(async, const Duration(milliseconds: 1)); + + Loon.collection('messages').delete(); + + elapseAndFlush(async, const Duration(milliseconds: 1)); + + expect(batches.length, 2); + + expect(batches.first, [ + Loon.collection('users'), + Loon.collection('posts'), + ]); + expect(batches.last, [Loon.collection('messages')]); + }); }, ); }); diff --git a/test/native/file_persistor_test.dart b/test/native/file_persistor_test.dart index 5cf223e..ba4963b 100644 --- a/test/native/file_persistor_test.dart +++ b/test/native/file_persistor_test.dart @@ -25,11 +25,23 @@ class MockPathProvider extends Fake } @override - getApplicationDocumentsPath() async { + Future getApplicationDocumentsPath() async { return testDirectory.path; } } +DataStoreEncrypter createTestEncrypter() { + // FlutterSecureStorage does not work in tests, so use a deterministic in-process key. + return DataStoreEncrypter( + Encrypter( + AES( + Key.fromUtf8('0123456789abcdef0123456789abcdef'), + mode: AESMode.cbc, + ), + ), + ); +} + void main() { testDirectory = Directory.systemTemp.createTempSync('test_dir'); Directory("${testDirectory.path}/loon").createSync(); @@ -62,14 +74,11 @@ void main() { group('FilePersistor durability', () { final loonDir = Directory('${testDirectory.path}/loon'); - - // FlutterSecureStorage does not work in tests, so supply a fixed encrypter. - final encrypter = DataStoreEncrypter( - Encrypter(AES(Key.fromSecureRandom(32), mode: AESMode.cbc)), - ); + final encrypter = createTestEncrypter(); tearDown(() async { await Loon.clearAll(); + Loon.unsubscribe(); }); test('Persists atomically, leaving no .tmp files behind', () async { @@ -136,4 +145,107 @@ void main() { expect(await orphanedResolverTmp.exists(), false); }); }); + + group('FilePersistor fault recovery', () { + final loonDir = Directory('${testDirectory.path}/loon'); + final encrypter = createTestEncrypter(); + + FilePersistor newPersistor() => FilePersistor( + persistenceThrottle: const Duration(milliseconds: 1), + encrypter: encrypter, + ); + + Future writeDefaultStore() { + return File('${loonDir.path}/__store__.json').writeAsString( + jsonEncode({ + "": { + "users": { + "__values": { + "1": {"name": "User 1"} + } + } + } + }), + ); + } + + setUp(() { + if (loonDir.existsSync()) { + for (final entity in loonDir.listSync()) { + entity.deleteSync(recursive: true); + } + } + Loon.unsubscribe(); + }); + + tearDown(() async { + await Loon.clearAll(); + Loon.unsubscribe(); + }); + + test('A corrupt data store file does not fail hydration of the rest', + () async { + await writeDefaultStore(); + await File('${loonDir.path}/corrupt.json') + .writeAsString('{ not valid json'); + + Loon.configure(persistor: newPersistor()); + + await Loon.hydrate(); + + expect( + Loon.collection('users').doc('1').get()?.data, + {"name": "User 1"}, + ); + expect(await File('${loonDir.path}/corrupt.json').exists(), false); + expect( + await File('${loonDir.path}/corrupt.json.corrupt').exists(), + true, + ); + }); + + test('A corrupt encrypted data store file does not fail hydration', + () async { + await writeDefaultStore(); + await File('${loonDir.path}/__store__.encrypted.json') + .writeAsString('not encrypted'); + + Loon.configure(persistor: newPersistor()); + + await Loon.hydrate(); + + expect( + Loon.collection('users').doc('1').get()?.data, + {"name": "User 1"}, + ); + expect( + await File('${loonDir.path}/__store__.encrypted.json.corrupt').exists(), + true, + ); + }); + + test('A corrupt resolver file fails hydration', () async { + await writeDefaultStore(); + await File('${loonDir.path}/__resolver__.json') + .writeAsString('}{ broken'); + + await expectLater( + newPersistor().init(), + throwsA( + predicate( + (error) => error.toString().contains('FormatException'), + ), + ), + ); + + expect( + await File('${loonDir.path}/__resolver__.json').exists(), + true, + ); + expect( + await File('${loonDir.path}/__resolver__.json.corrupt').exists(), + false, + ); + }); + }); } diff --git a/test/utils.dart b/test/utils.dart index 459c372..e089dc1 100644 --- a/test/utils.dart +++ b/test/utils.dart @@ -1,6 +1,6 @@ -import 'dart:async'; import 'dart:convert'; import 'package:encrypt/encrypt.dart'; +import 'package:fake_async/fake_async.dart'; import 'package:loon/loon.dart'; final testEncryptionKey = Key.fromSecureRandom(32); @@ -22,6 +22,14 @@ Json decryptData(String encrypted) { ); } -Future asyncEvent() { - return Future.delayed(const Duration(milliseconds: 1), () => null); +/// Advances past Loon's zero-duration broadcast timer and drains stream delivery. +void flushBroadcasts(FakeAsync async) { + elapseAndFlush(async, const Duration(milliseconds: 1)); +} + +/// Advances fake time and drains microtasks around any timer callbacks. +void elapseAndFlush(FakeAsync async, Duration duration) { + async.flushMicrotasks(); + async.elapse(duration); + async.flushMicrotasks(); }