diff --git a/lib/observable_query.dart b/lib/observable_query.dart index a39052f..4cf155e 100644 --- a/lib/observable_query.dart +++ b/lib/observable_query.dart @@ -143,6 +143,24 @@ class ObservableQuery extends Query ), ); } + } else if (_snapCache.containsKey(doc)) { + // The document was in the result set but a delete + recreate that + // coalesced into a single added event (or a re-add) now fails the + // filter, so the stale entry must be evicted. + _evictDoc(doc); + + shouldRebroadcast = true; + + if (hasChangeListener) { + changeSnaps.add( + DocumentChangeSnapshot( + doc: doc, + event: BroadcastEvents.removed, + prevData: prevSnap?.data, + data: null, + ), + ); + } } break; case BroadcastEvents.removed: diff --git a/test/core/observable_query_equivalence_test.dart b/test/core/observable_query_equivalence_test.dart new file mode 100644 index 0000000..a1815ea --- /dev/null +++ b/test/core/observable_query_equivalence_test.dart @@ -0,0 +1,164 @@ +import 'dart:math'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:loon/loon.dart'; + +import '../utils.dart'; + +/// Equivalence fuzzer for [ObservableQuery]. +/// +/// An observable query maintains its result incrementally: on each broadcast it +/// inspects only the changed documents and patches a cached result, rather than +/// recomputing from scratch. The property under test is that this incremental +/// result always equals a fresh full recompute of the same query +/// (`Query.get()`), which filters and sorts the whole collection and is the +/// simple oracle. +/// +/// Each test drives one long random walk of create/update/delete operations +/// over a small id/value space, so documents repeatedly cross the filter +/// boundary, exercising the added/removed/modified transitions in +/// `_onBroadcast`, and compares the value most recently emitted on the query's +/// stream against the oracle after every step. A failing case replays from its +/// seed, sorted flag, and threshold. +/// +/// A total-order comparator (value then id) keeps the sorted result unambiguous +/// so it can be compared as an ordered list; the unsorted variant is compared +/// as a set since its order is unspecified. +/// +/// Notes on determinism: a single long walk is used rather than many short +/// trials because resetting the global store between trials schedules broadcasts +/// that can race the next trial's observer. A 1ms settle (rather than a +/// zero-duration one) guarantees the broadcast's zero-duration timer has fired +/// and its microtask delivery completed before each comparison. + +int _cmp(DocumentSnapshot a, DocumentSnapshot b) { + final byValue = a.data.compareTo(b.data); + return byValue != 0 ? byValue : a.doc.id.compareTo(b.doc.id); +} + +List _ordered(List> snaps) => + [for (final s in snaps) '${s.doc.id}=${s.data}']; + +List _asSet(List> snaps) => + _ordered(snaps)..sort(); + +Future _reset() async { + Loon.unsubscribe(); + // broadcast: false so the reset doesn't schedule a broadcast that could + // race the next test's observer. + await Loon.clearAll(broadcast: false); + await asyncEvent(); +} + +Future _walk({ + required bool sorted, + required int seed, + required int threshold, + int rounds = 400, +}) async { + await _reset(); + + final r = Random(seed); + final col = Loon.collection('items'); + bool filter(DocumentSnapshot s) => s.data >= threshold; + + final query = sorted ? col.where(filter).sortBy(_cmp) : col.where(filter); + final obs = query.observe(); + + final emissions = >>[]; + final sub = obs.stream().listen(emissions.add); + await asyncEvent(); + + final present = {}; + for (var round = 0; round < rounds; round++) { + final opsThisRound = 1 + r.nextInt(3); + for (var k = 0; k < opsThisRound; k++) { + if (present.isEmpty || r.nextInt(10) < 7) { + final id = '${r.nextInt(8)}'; + col.doc(id).createOrUpdate(r.nextInt(10)); + present.add(id); + } else { + final id = present.elementAt(r.nextInt(present.length)); + col.doc(id).delete(); + present.remove(id); + } + } + + await asyncEvent(); + + final oracle = + (sorted ? col.where(filter).sortBy(_cmp) : col.where(filter)).get(); + final latest = emissions.last; + final reason = + 'seed=$seed sorted=$sorted threshold=$threshold round=$round'; + if (sorted) { + expect(_ordered(latest), _ordered(oracle), reason: reason); + } else { + expect(_asSet(latest), _asSet(oracle), reason: reason); + } + } + + await sub.cancel(); +} + +void main() { + tearDown(_reset); + + test('Coalesced delete and recreate evicts a cached query result', () async { + await _reset(); + + final col = Loon.collection('items'); + final doc = col.doc('1'); + doc.create(5); + await asyncEvent(); + + final query = col.where((snap) => snap.data >= 4).observe(); + final emissions = >>[]; + final changes = >>[]; + final valueSub = query.stream().listen(emissions.add); + final changeSub = query.streamChanges().listen(changes.add); + await asyncEvent(); + + try { + expect(_ordered(emissions.last), ['1=5']); + + doc.delete(); + doc.create(0); + await asyncEvent(); + + expect(_ordered(emissions.last), isEmpty); + expect(changes, [ + [ + DocumentChangeSnapshot( + doc: doc, + data: null, + event: BroadcastEvents.removed, + prevData: 5, + ), + ], + ]); + } finally { + await valueSub.cancel(); + await changeSub.cancel(); + } + }); + + group('ObservableQuery equivalence', () { + // A spread of selectivities: low threshold (most docs pass), middle, and + // high (most fail), for both sorted and unsorted queries. + for (final sorted in [true, false]) { + for (final threshold in [1, 4, 8]) { + test( + '${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' + 'matches a full recompute', + () async { + await _walk( + sorted: sorted, + seed: 1000 + threshold, + threshold: threshold, + ); + }, + ); + } + } + }); +}