Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
4 changes: 2 additions & 2 deletions example/pubspec.lock
Original file line number Diff line number Diff line change
Expand Up @@ -339,10 +339,10 @@ packages:
dependency: transitive
description:
name: pointycastle
sha256: "7c1e5f0d23c9016c5bbd8b1473d0d3fb3fc851b876046039509e18e0c7485f2c"
sha256: "4be0097fcf3fd3e8449e53730c631200ebc7b88016acecab2b0da2f0149222fe"
url: "https://pub.dev"
source: hosted
version: "3.7.3"
version: "3.9.1"
process:
dependency: transitive
description:
Expand Down
20 changes: 14 additions & 6 deletions lib/broadcast_manager.dart
Original file line number Diff line number Diff line change
Expand Up @@ -36,16 +36,21 @@ class BroadcastManager {
/// The subset of broadcast observers from [_observers] with dependencies.
final Set<BroadcastObserver> _depObservers = {};

/// Whether the broadcast store is dirty and has a pending broadcast scheduled.
bool _pendingBroadcast = false;
/// Non-null while a broadcast is scheduled or currently draining.
Timer? _broadcastTimer;

bool get _pendingBroadcast => _broadcastTimer != null;

void _cancelBroadcast() {
_broadcastTimer?.cancel();
_broadcastTimer = null;
}

void _scheduleBroadcast() {
if (!_pendingBroadcast) {
_pendingBroadcast = true;

// The broadcast is run async so that multiple broadcast events can be batched
// together into one update across all changes that occur in the current task of the event loop.
Future.delayed(Duration.zero, _broadcast);
_broadcastTimer = Timer(Duration.zero, _broadcast);
}
}

Expand All @@ -63,7 +68,7 @@ class BroadcastManager {
}

eventStore.clear();
_pendingBroadcast = false;
_broadcastTimer = null;
}

/// Schedules all dependents of the given document for broadcast.
Expand Down Expand Up @@ -136,6 +141,7 @@ class BroadcastManager {
}

void clear({bool broadcast = true}) {
_cancelBroadcast();
eventStore.clear();
observerValueStore.clear();

Expand Down Expand Up @@ -166,6 +172,8 @@ class BroadcastManager {
}

void unsubscribe() {
_cancelBroadcast();
eventStore.clear();
for (final observer in _observers.toList()) {
observer.dispose();
}
Expand Down
65 changes: 60 additions & 5 deletions lib/persistor/file_persistor/file_data_store_config.dart
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,45 @@ import 'package:loon/loon.dart';
import 'package:loon/persistor/data_store.dart';
import 'package:loon/persistor/data_store_resolver.dart';

void _logHydrationFailure(
File file,
Logger logger,
Object error,
StackTrace stackTrace,
) {
logger.log(
'Failed to hydrate ${file.path}, recovering without quarantine: '
'$error\n$stackTrace',
);
}

/// Moves a file that failed to hydrate (corrupt JSON, failed decryption) aside
/// to `<path>.corrupt` and recovers by returning null (an empty store) so that
/// one unreadable file cannot fail hydration of the entire store. The data is
/// preserved for inspection, the `.corrupt` suffix is ignored by the data store
/// file listing, and the next persist for the partition writes a fresh file.
Future<void> _recoverCorruptFile(
File file,
Logger logger,
Object error,
StackTrace stackTrace,
) async {
logger.log(
'Failed to hydrate ${file.path}, quarantining as corrupt: '
'$error\n$stackTrace',
);

try {
final quarantine = File('${file.path}.corrupt');
if (await quarantine.exists()) {
await quarantine.delete();
}
await file.rename(quarantine.path);
} catch (error, stackTrace) {
logger.log('Failed to quarantine ${file.path}: $error\n$stackTrace');
}
}

/// Writes [contents] to [file] atomically: the data is written to a sibling
/// temporary file, flushed to disk, then renamed over the target. A rename on
/// the same filesystem is atomic, so an interrupted write (crash, OOM kill,
Expand Down Expand Up @@ -54,9 +93,19 @@ class FileDataStoreConfig extends DataStoreConfig {
hydrate: () async {
try {
final value = await file.readAsString();
final json = jsonDecode(
encrypted ? encrypter.decrypt(value) : value,
);
String contents;
if (encrypted) {
try {
contents = encrypter.decrypt(value);
} catch (error, stackTrace) {
await _recoverCorruptFile(file, logger, error, stackTrace);
return null;
}
} else {
contents = value;
}

final json = jsonDecode(contents);
final store = ValueStore<ValueStore>();

for (final entry in json.entries) {
Expand All @@ -68,6 +117,12 @@ class FileDataStoreConfig extends DataStoreConfig {
return store;
} on PathNotFoundException {
return null;
} on FileSystemException catch (error, stackTrace) {
_logHydrationFailure(file, logger, error, stackTrace);
return null;
} on FormatException catch (error, stackTrace) {
await _recoverCorruptFile(file, logger, error, stackTrace);
return null;
}
Comment on lines 118 to 126
},
persist: (store) async {
Expand Down Expand Up @@ -95,8 +150,8 @@ class FileDataStoreResolverConfig extends DataStoreResolverConfig {
}) : super(
hydrate: () async {
try {
return ValueRefStore<String>(
jsonDecode(await file.readAsString()));
final json = jsonDecode(await file.readAsString());
return ValueRefStore<String>(json);
} on PathNotFoundException {
return null;
}
Expand Down
5 changes: 3 additions & 2 deletions lib/utils/id.dart
Original file line number Diff line number Diff line change
@@ -1,9 +1,10 @@
import 'dart:math';
import 'dart:typed_data';

// 64-characters (2^6)
// 64 characters (2^6). This intentionally excludes "_" because Loon paths use
// "__" as their segment delimiter.
const String _alphabet =
'_-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ';
'-0123456789abcdefghijklmnopqrstuvwxyzABCDEFGHIJKLMNOPQRSTUVWXYZ~';
final Uint8List _alphabytes = Uint8List.fromList(_alphabet.codeUnits);
const int _u32 = 0x100000000; // 2^32

Expand Down
30 changes: 13 additions & 17 deletions test/core/broadcast_timing_test.dart
Original file line number Diff line number Diff line change
Expand Up @@ -2,6 +2,8 @@ import 'package:fake_async/fake_async.dart';
import 'package:flutter_test/flutter_test.dart';
import 'package:loon/loon.dart';

import '../utils.dart';

/// Deterministic tests for broadcast batching, coalescing, and ordering.
///
/// Loon schedules a broadcast on a zero-duration timer so that all writes in a
Expand All @@ -16,12 +18,6 @@ import 'package:loon/loon.dart';
/// (A dedicated file runs in its own test isolate, so the global store starts
/// clean and these virtual-time tests are isolated from the rest of the suite.)

/// Advances virtual time past the broadcast's zero-duration timer and drains
/// the microtasks that deliver stream events.
void _flush(FakeAsync async) {
async.elapse(const Duration(milliseconds: 1));
}

void _reset(FakeAsync async) {
Loon.unsubscribe();
Loon.clearAll(broadcast: false);
Expand All @@ -39,12 +35,12 @@ void main() {
final sub = col
.stream()
.listen((snaps) => emissions.add([for (final s in snaps) s.data]));
_flush(async); // initial emission
flushBroadcasts(async); // initial emission

col.doc('1').create(1);
col.doc('2').create(2);
col.doc('3').create(3);
_flush(async);
flushBroadcasts(async);

// One emission for the initial value and exactly one for the batch.
expect(emissions.length, 2);
Expand All @@ -62,11 +58,11 @@ void main() {

final emissions = <int?>[];
final sub = doc.stream().listen((snap) => emissions.add(snap?.data));
_flush(async); // initial null
flushBroadcasts(async); // initial null

doc.create(1);
doc.update(2);
_flush(async);
flushBroadcasts(async);

// The create and update collapse into a single emission of the final value.
expect(emissions, [null, 2]);
Expand All @@ -83,14 +79,14 @@ void main() {

final emissions = <int?>[];
final sub = doc.stream().listen((snap) => emissions.add(snap?.data));
_flush(async); // initial null
flushBroadcasts(async); // initial null

doc.create(1);
_flush(async);
flushBroadcasts(async);
doc.update(2);
_flush(async);
flushBroadcasts(async);
doc.update(3);
_flush(async);
flushBroadcasts(async);

expect(emissions, [null, 1, 2, 3]);

Expand All @@ -106,12 +102,12 @@ void main() {

final emissions = <int?>[];
final sub = doc.stream().listen((snap) => emissions.add(snap?.data));
_flush(async); // initial null
flushBroadcasts(async); // initial null

doc.create(1);
_flush(async);
flushBroadcasts(async);
doc.update(1); // same value
_flush(async);
flushBroadcasts(async);

// No emission for the no-op update.
expect(emissions, [null, 1]);
Expand Down
13 changes: 13 additions & 0 deletions test/core/id_test.dart
Original file line number Diff line number Diff line change
@@ -0,0 +1,13 @@
import 'package:flutter_test/flutter_test.dart';
import 'package:loon/utils/id.dart';

void main() {
group('IDs', () {
test('generated IDs do not contain the path delimiter', () {
for (var i = 0; i < 1000; i++) {
expect(generateSecureId(), isNot(contains('__')));
expect(generateFastId(), isNot(contains('__')));
}
});
});
}
Loading
Loading