Skip to content

Commit 650b12e

Browse files
committed
feat: add support for external streams in UseRepoMixin
1 parent 7392ca9 commit 650b12e

3 files changed

Lines changed: 479 additions & 15 deletions

File tree

lib/src/repo/mixins/use_repo_mixin.dart

Lines changed: 165 additions & 14 deletions
Original file line numberDiff line numberDiff line change
@@ -5,14 +5,56 @@ import 'package:grumpy_annotations/grumpy_annotations.dart';
55
import 'package:meta/meta.dart';
66
import 'package:grumpy/grumpy.dart';
77

8-
/// Provides use hooks for watching and accessing data within [UseRepoMixin._onDependenciesReady].
9-
typedef UseHooks = ({
8+
/// Provides use hooks for watching and accessing data within [UseRepoMixin.onDependenciesReady].
9+
class UseHooks {
10+
/// Provides use hooks for watching and accessing data within [UseRepoMixin.onDependenciesReady].
11+
const UseHooks({required this.repo, required this.externalStream});
12+
13+
/// {@template UseHooks.repo}
1014
/// A function that allows you to watch a [Repo] of type [R] managing data of type [S] and
1115
/// returns a tuple containing the data from the repo and the repo itself.
1216
///
13-
/// Throws a [NoRepoDataError] if the repo's state does not contain data.
14-
Future<(S, R)> Function<S, R extends Repo<S>>() repo,
15-
});
17+
/// {@endtemplate}
18+
final Future<(S, R)> Function<S, R extends Repo<S>>() repo;
19+
20+
/// {@template UseHooks.externalStream}
21+
/// A function that watches an external change signal while reading the current
22+
/// value synchronously via [syncSnapshot].
23+
///
24+
/// The [changeSignal] is treated as invalidation-only. Emitted values are
25+
/// ignored and only used to trigger recomputation.
26+
/// {@endtemplate}
27+
final T Function<T>(
28+
Object key, {
29+
required Stream changeSignal,
30+
required T Function() syncSnapshot,
31+
})
32+
externalStream;
33+
}
34+
35+
final class _WatchedExternalDependency {
36+
_WatchedExternalDependency({
37+
required this.changeSignal,
38+
required this.subscription,
39+
});
40+
41+
Stream changeSignal;
42+
StreamSubscription subscription;
43+
Object? lastError;
44+
StackTrace? lastStackTrace;
45+
46+
bool get hasError => lastError != null;
47+
48+
void clearError() {
49+
lastError = null;
50+
lastStackTrace = null;
51+
}
52+
53+
void setError(Object error, StackTrace stackTrace) {
54+
lastError = error;
55+
lastStackTrace = stackTrace;
56+
}
57+
}
1658

1759
/// A mixin that provides functionality to watch and use multiple [Repo] instances.
1860
///
@@ -21,7 +63,7 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
2163
final _subs = <StreamSubscription>[];
2264
final _watchedRepos = <Type, Repo>{};
2365
final _pendingRepoResolutions = <Type, Future<Repo<dynamic>>>{};
24-
final _watchedStreams = <Stream, StreamSubscription>{};
66+
final _watchedExternalDependencies = <Object, _WatchedExternalDependency>{};
2567

2668
bool _installed = false;
2769
int _stateChangeVersion = 0;
@@ -40,7 +82,8 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
4082

4183
Future<void> _rebuildDependencyState(int version) async {
4284
var anyLoading = false;
43-
RepoErrorState? firstError;
85+
Object? firstError;
86+
StackTrace? firstErrorStackTrace;
4487

4588
D? nextData = _lastData;
4689
E? nextError = _lastError;
@@ -51,7 +94,9 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
5194
log(
5295
'Dependency of type ${repo.runtimeType} has error. Rebuilding error state...',
5396
);
54-
firstError = repo.state.asError;
97+
final repoError = repo.state.asError;
98+
firstError = repoError.error;
99+
firstErrorStackTrace = repoError.stackTrace;
55100
break;
56101
}
57102
if (repo.state.isLoading) {
@@ -62,14 +107,25 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
62107
}
63108
}
64109

110+
if (firstError == null) {
111+
for (final entry in _watchedExternalDependencies.entries) {
112+
final dependency = entry.value;
113+
if (!dependency.hasError) continue;
114+
115+
log(
116+
'External dependency with key ${entry.key.runtimeType} has error. Rebuilding error state...',
117+
);
118+
firstError = dependency.lastError;
119+
firstErrorStackTrace = dependency.lastStackTrace;
120+
break;
121+
}
122+
}
123+
65124
final allDataReady = !anyLoading && firstError == null;
66125

67126
try {
68127
if (firstError != null) {
69-
nextError = await onDependencyError(
70-
firstError.error,
71-
firstError.stackTrace,
72-
);
128+
nextError = await onDependencyError(firstError, firstErrorStackTrace);
73129
nextLoading = null;
74130
nextData = null;
75131
} else if (anyLoading) {
@@ -164,16 +220,35 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
164220
_subs.clear();
165221
});
166222
onDisposed(_watchedRepos.clear);
167-
onDisposed(_watchedStreams.clear);
223+
onDisposed(_watchedExternalDependencies.clear);
168224
}
169225

170226
/// Watches a [Repo] of type [R] managing data of type [S] and
171227
/// returns a tuple containing the data from the repo and the repo itself.
172228
///
173229
/// Throws a [NoRepoDataError] if the repo's state does not contain data.
174230
@Deprecated('Use the provided use arg in onDependenciesReady instead')
231+
@visibleForTesting
175232
Future<(S, R)> useRepo<S, R extends Repo<S>>() => _useRepo<S, R>();
176233

234+
/// Watches an external [changeSignal] while reading the latest value
235+
/// synchronously from [syncSnapshot].
236+
///
237+
/// The [changeSignal] is treated as invalidation-only. Emitted values are
238+
/// ignored. Use [key] as the stable identity for this dependency across
239+
/// rebuilds.
240+
@Deprecated('Use the provided use arg in onDependenciesReady instead')
241+
@visibleForTesting
242+
T watchExternal<T>(
243+
Object key, {
244+
required Stream changeSignal,
245+
required T Function() syncSnapshot,
246+
}) => _watchExternal(
247+
key,
248+
changeSignal: changeSignal,
249+
syncSnapshot: syncSnapshot,
250+
);
251+
177252
Future<(S, R)> _useRepo<S, R extends Repo<S>>() async {
178253
if (!_installed) {
179254
throw StateError(
@@ -225,7 +300,83 @@ mixin UseRepoMixin<D, E, L> on LifecycleMixin, LifecycleHooksMixin {
225300
return (repo.state.requireData, repo);
226301
}
227302

228-
FutureOr<D> _onDependenciesReady() => onDependenciesReady((repo: _useRepo));
303+
T _watchExternal<T>(
304+
Object key, {
305+
required Stream changeSignal,
306+
required T Function() syncSnapshot,
307+
}) {
308+
if (!_installed) {
309+
throw StateError(
310+
'UseRepoMixin not installed. Call installUseRepoHooks in the constructor.',
311+
);
312+
}
313+
314+
final watchedDependency = _watchedExternalDependencies[key];
315+
if (watchedDependency == null) {
316+
_watchedExternalDependencies[key] = _subscribeToExternalDependency(
317+
key,
318+
changeSignal,
319+
);
320+
} else if (!identical(watchedDependency.changeSignal, changeSignal)) {
321+
_replaceExternalDependency(
322+
key,
323+
watchedDependency: watchedDependency,
324+
changeSignal: changeSignal,
325+
);
326+
}
327+
328+
return syncSnapshot();
329+
}
330+
331+
_WatchedExternalDependency _subscribeToExternalDependency(
332+
Object key,
333+
Stream changeSignal,
334+
) {
335+
late final _WatchedExternalDependency watchedDependency;
336+
final sub = changeSignal.listen(
337+
(_) async {
338+
if (!identical(_watchedExternalDependencies[key], watchedDependency)) {
339+
return;
340+
}
341+
watchedDependency.clearError();
342+
final version = ++_stateChangeVersion;
343+
await _rebuildDependencyState(version);
344+
},
345+
onError: (Object error, StackTrace stackTrace) async {
346+
if (!identical(_watchedExternalDependencies[key], watchedDependency)) {
347+
return;
348+
}
349+
watchedDependency.setError(error, stackTrace);
350+
final version = ++_stateChangeVersion;
351+
await _rebuildDependencyState(version);
352+
},
353+
);
354+
355+
watchedDependency = _WatchedExternalDependency(
356+
changeSignal: changeSignal,
357+
subscription: sub,
358+
);
359+
_subs.add(sub);
360+
361+
return watchedDependency;
362+
}
363+
364+
void _replaceExternalDependency(
365+
Object key, {
366+
required _WatchedExternalDependency watchedDependency,
367+
required Stream changeSignal,
368+
}) {
369+
_subs.remove(watchedDependency.subscription);
370+
unawaited(watchedDependency.subscription.cancel());
371+
_watchedExternalDependencies[key] = _subscribeToExternalDependency(
372+
key,
373+
changeSignal,
374+
);
375+
}
376+
377+
FutureOr<D> _onDependenciesReady() => onDependenciesReady(
378+
UseHooks(repo: _useRepo, externalStream: _watchExternal),
379+
);
229380

230381
/// A callback function that is called when all watched repositories are ready.
231382
/// Call [use.useRepo] within this function to access repositories required to build the value.

test/repo/harness/use_repo_mixin_test_harness.dart

Lines changed: 58 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,56 @@ class SlowSnapshotUseRepoConsumer
8383
String get logTag => 'SlowSnapshotUseRepoConsumer';
8484
}
8585

86+
class ExternalSignalConsumer
87+
with
88+
Disposable,
89+
LogMixin,
90+
LifecycleMixin,
91+
LifecycleHooksMixin,
92+
UseRepoMixin<String, String, String> {
93+
ExternalSignalConsumer({
94+
required this.key,
95+
required this.changeSignal,
96+
required this.syncSnapshot,
97+
}) {
98+
installUseRepoHooks();
99+
onDependenciesChanged(() => dependenciesChangedCalls++);
100+
initialize();
101+
}
102+
103+
final Object key;
104+
Stream changeSignal;
105+
String Function() syncSnapshot;
106+
107+
int dependenciesChangedCalls = 0;
108+
int readyCalls = 0;
109+
int errorCalls = 0;
110+
Object? lastError;
111+
112+
@override
113+
FutureOr<String> onDependenciesReady(UseHooks use) {
114+
readyCalls++;
115+
return use.externalStream<String>(
116+
key,
117+
changeSignal: changeSignal,
118+
syncSnapshot: syncSnapshot,
119+
);
120+
}
121+
122+
@override
123+
FutureOr<String> onDependencyError(Object error, StackTrace? _) {
124+
errorCalls++;
125+
lastError = error;
126+
return 'error:${error.toString()}';
127+
}
128+
129+
@override
130+
String onDependenciesLoading() => 'loading';
131+
132+
@override
133+
String get logTag => 'ExternalSignalConsumer';
134+
}
135+
86136
// for testing uninitialized usage
87137
// ignore: missing_required_constructor_call
88138
class UninitializedConsumer
@@ -93,7 +143,14 @@ class UninitializedConsumer
93143
LifecycleHooksMixin,
94144
UseRepoMixin<void, void, void> {
95145
@override
96-
FutureOr<void> onDependenciesReady(UseHooks use) {}
146+
FutureOr<void> onDependenciesReady(UseHooks use) {
147+
use.repo<int, IntRepo>();
148+
use.externalStream<void>(
149+
Object(),
150+
changeSignal: const Stream<void>.empty(),
151+
syncSnapshot: () {},
152+
);
153+
}
97154

98155
@override
99156
FutureOr<void> onDependencyError(Object _, StackTrace? _) {}

0 commit comments

Comments
 (0)