From ad0d9ef929808dfff51c72429e0c8348045fdf03 Mon Sep 17 00:00:00 2001 From: Claude Date: Tue, 26 May 2026 17:10:09 +0000 Subject: [PATCH 1/2] test: add ObservableQuery equivalence fuzzer; fix stale result on coalesced delete+recreate Adds a randomized equivalence fuzzer that drives long random create/ update/delete walks against an observed query and asserts the value it emits incrementally always equals a fresh full recompute (Query.get()), the obviously-correct oracle. Covers sorted and unsorted queries across a spread of filter selectivities. The fuzzer found a real bug. When a document already in a query's result set is deleted and recreated within the same broadcast batch, the two events coalesce in the event store into a single `added` event. The added handler only *added* documents that pass the filter; it never *evicted* an already-cached document whose recreated value now fails the filter, so a stale snapshot lingered in the result. Effect: a query showing a stale row after a delete+recreate that should have removed it. Fixed by evicting the cached entry in that case, symmetric with the modified handler. Confirmed the fuzzer fails against the unfixed code. The fuzzer uses one long walk per test (resetting the global store between many short trials schedules broadcasts that race the next trial's observer) and a 1ms settle so the zero-duration broadcast timer and its microtask delivery complete before each comparison. --- lib/observable_query.dart | 18 +++ .../observable_query_equivalence_test.dart | 118 ++++++++++++++++++ 2 files changed, 136 insertions(+) create mode 100644 test/core/observable_query_equivalence_test.dart diff --git a/lib/observable_query.dart b/lib/observable_query.dart index a39052f..4cf155e 100644 --- a/lib/observable_query.dart +++ b/lib/observable_query.dart @@ -143,6 +143,24 @@ class ObservableQuery extends Query ), ); } + } else if (_snapCache.containsKey(doc)) { + // The document was in the result set but a delete + recreate that + // coalesced into a single added event (or a re-add) now fails the + // filter, so the stale entry must be evicted. + _evictDoc(doc); + + shouldRebroadcast = true; + + if (hasChangeListener) { + changeSnaps.add( + DocumentChangeSnapshot( + doc: doc, + event: BroadcastEvents.removed, + prevData: prevSnap?.data, + data: null, + ), + ); + } } break; case BroadcastEvents.removed: diff --git a/test/core/observable_query_equivalence_test.dart b/test/core/observable_query_equivalence_test.dart new file mode 100644 index 0000000..a73d1c7 --- /dev/null +++ b/test/core/observable_query_equivalence_test.dart @@ -0,0 +1,118 @@ +import 'dart:math'; +import 'package:flutter_test/flutter_test.dart'; +import 'package:loon/loon.dart'; + +/// Equivalence fuzzer for [ObservableQuery]. +/// +/// An observable query maintains its result incrementally: on each broadcast it +/// inspects only the changed documents and patches a cached result, rather than +/// recomputing from scratch. The property under test is that this incremental +/// result always equals a fresh full recompute of the same query +/// (`Query.get()`), which filters and sorts the whole collection and is the +/// obviously-correct oracle. +/// +/// Each test drives one long random walk of create/update/delete operations +/// over a small id/value space — so documents repeatedly cross the filter +/// boundary, exercising the added/removed/modified transitions in +/// `_onBroadcast` — and compares the value most recently emitted on the query's +/// stream against the oracle after every step. A failing case replays from its +/// seed, sorted flag, and threshold. +/// +/// A total-order comparator (value then id) keeps the sorted result unambiguous +/// so it can be compared as an ordered list; the unsorted variant is compared +/// as a set since its order is unspecified. +/// +/// Notes on determinism: a single long walk is used rather than many short +/// trials because resetting the global store between trials schedules broadcasts +/// that can race the next trial's observer. A 1ms settle (rather than a +/// zero-duration one) guarantees the broadcast's zero-duration timer has fired +/// and its microtask delivery completed before each comparison. + +int _cmp(DocumentSnapshot a, DocumentSnapshot b) { + final byValue = a.data.compareTo(b.data); + return byValue != 0 ? byValue : a.doc.id.compareTo(b.doc.id); +} + +List _ordered(List> snaps) => + [for (final s in snaps) '${s.doc.id}=${s.data}']; + +List _asSet(List> snaps) => + _ordered(snaps)..sort(); + +Future _settle() => Future.delayed(const Duration(milliseconds: 1)); + +Future _reset() async { + Loon.unsubscribe(); + // broadcast: false so the reset doesn't schedule a broadcast that could + // race the next test's observer. + await Loon.clearAll(broadcast: false); + await _settle(); +} + +Future _walk({ + required bool sorted, + required int seed, + required int threshold, + int rounds = 400, +}) async { + await _reset(); + + final r = Random(seed); + final col = Loon.collection('items'); + bool filter(DocumentSnapshot s) => s.data >= threshold; + + final query = sorted ? col.where(filter).sortBy(_cmp) : col.where(filter); + final obs = query.observe(); + + final emissions = >>[]; + final sub = obs.stream().listen(emissions.add); + await _settle(); + + final present = {}; + for (var round = 0; round < rounds; round++) { + final opsThisRound = 1 + r.nextInt(3); + for (var k = 0; k < opsThisRound; k++) { + if (present.isEmpty || r.nextInt(10) < 7) { + final id = '${r.nextInt(8)}'; + col.doc(id).createOrUpdate(r.nextInt(10)); + present.add(id); + } else { + final id = present.elementAt(r.nextInt(present.length)); + col.doc(id).delete(); + present.remove(id); + } + } + + await _settle(); + + final oracle = + (sorted ? col.where(filter).sortBy(_cmp) : col.where(filter)).get(); + final latest = emissions.last; + final reason = + 'seed=$seed sorted=$sorted threshold=$threshold round=$round'; + if (sorted) { + expect(_ordered(latest), _ordered(oracle), reason: reason); + } else { + expect(_asSet(latest), _asSet(oracle), reason: reason); + } + } + + await sub.cancel(); +} + +void main() { + tearDown(_reset); + + group('ObservableQuery equivalence', () { + // A spread of selectivities: low threshold (most docs pass), middle, and + // high (most fail), for both sorted and unsorted queries. + for (final sorted in [true, false]) { + for (final threshold in [1, 4, 8]) { + test('${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' + 'matches a full recompute', () async { + await _walk(sorted: sorted, seed: 1000 + threshold, threshold: threshold); + }); + } + } + }); +} From fdb411cd42f1b7431ad14b03f8d8fb1b0647ff2a Mon Sep 17 00:00:00 2001 From: Dan Reynolds Date: Sun, 31 May 2026 09:31:04 -0400 Subject: [PATCH 2/2] Cover ObservableQuery coalesced change event --- .../observable_query_equivalence_test.dart | 70 +++++++++++++++---- 1 file changed, 58 insertions(+), 12 deletions(-) diff --git a/test/core/observable_query_equivalence_test.dart b/test/core/observable_query_equivalence_test.dart index a73d1c7..a1815ea 100644 --- a/test/core/observable_query_equivalence_test.dart +++ b/test/core/observable_query_equivalence_test.dart @@ -2,6 +2,8 @@ import 'dart:math'; import 'package:flutter_test/flutter_test.dart'; import 'package:loon/loon.dart'; +import '../utils.dart'; + /// Equivalence fuzzer for [ObservableQuery]. /// /// An observable query maintains its result incrementally: on each broadcast it @@ -9,12 +11,12 @@ import 'package:loon/loon.dart'; /// recomputing from scratch. The property under test is that this incremental /// result always equals a fresh full recompute of the same query /// (`Query.get()`), which filters and sorts the whole collection and is the -/// obviously-correct oracle. +/// simple oracle. /// /// Each test drives one long random walk of create/update/delete operations -/// over a small id/value space — so documents repeatedly cross the filter +/// over a small id/value space, so documents repeatedly cross the filter /// boundary, exercising the added/removed/modified transitions in -/// `_onBroadcast` — and compares the value most recently emitted on the query's +/// `_onBroadcast`, and compares the value most recently emitted on the query's /// stream against the oracle after every step. A failing case replays from its /// seed, sorted flag, and threshold. /// @@ -39,14 +41,12 @@ List _ordered(List> snaps) => List _asSet(List> snaps) => _ordered(snaps)..sort(); -Future _settle() => Future.delayed(const Duration(milliseconds: 1)); - Future _reset() async { Loon.unsubscribe(); // broadcast: false so the reset doesn't schedule a broadcast that could // race the next test's observer. await Loon.clearAll(broadcast: false); - await _settle(); + await asyncEvent(); } Future _walk({ @@ -66,7 +66,7 @@ Future _walk({ final emissions = >>[]; final sub = obs.stream().listen(emissions.add); - await _settle(); + await asyncEvent(); final present = {}; for (var round = 0; round < rounds; round++) { @@ -83,7 +83,7 @@ Future _walk({ } } - await _settle(); + await asyncEvent(); final oracle = (sorted ? col.where(filter).sortBy(_cmp) : col.where(filter)).get(); @@ -103,15 +103,61 @@ Future _walk({ void main() { tearDown(_reset); + test('Coalesced delete and recreate evicts a cached query result', () async { + await _reset(); + + final col = Loon.collection('items'); + final doc = col.doc('1'); + doc.create(5); + await asyncEvent(); + + final query = col.where((snap) => snap.data >= 4).observe(); + final emissions = >>[]; + final changes = >>[]; + final valueSub = query.stream().listen(emissions.add); + final changeSub = query.streamChanges().listen(changes.add); + await asyncEvent(); + + try { + expect(_ordered(emissions.last), ['1=5']); + + doc.delete(); + doc.create(0); + await asyncEvent(); + + expect(_ordered(emissions.last), isEmpty); + expect(changes, [ + [ + DocumentChangeSnapshot( + doc: doc, + data: null, + event: BroadcastEvents.removed, + prevData: 5, + ), + ], + ]); + } finally { + await valueSub.cancel(); + await changeSub.cancel(); + } + }); + group('ObservableQuery equivalence', () { // A spread of selectivities: low threshold (most docs pass), middle, and // high (most fail), for both sorted and unsorted queries. for (final sorted in [true, false]) { for (final threshold in [1, 4, 8]) { - test('${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' - 'matches a full recompute', () async { - await _walk(sorted: sorted, seed: 1000 + threshold, threshold: threshold); - }); + test( + '${sorted ? 'sorted' : 'unsorted'} query, threshold $threshold ' + 'matches a full recompute', + () async { + await _walk( + sorted: sorted, + seed: 1000 + threshold, + threshold: threshold, + ); + }, + ); } } });