From af03b018d3339dcb42392e80d949ad657eb8b57d Mon Sep 17 00:00:00 2001 From: Rody Gosset Date: Tue, 19 May 2026 15:20:58 +0200 Subject: [PATCH] feat(reactivity): Add `Atom.persistedSwr` to combine key-value persistence with stale-while-revalidate, preferring valid registry values over fresh persisted cache for SSR and hydration. --- .changeset/add-atom-persisted-swr.md | 5 + .../effect/src/unstable/reactivity/Atom.ts | 304 ++++++++++++++++++ .../src/unstable/reactivity/AtomRegistry.ts | 5 + packages/effect/test/reactivity/Atom.test.ts | 226 +++++++++++++ 4 files changed, 540 insertions(+) create mode 100644 .changeset/add-atom-persisted-swr.md diff --git a/.changeset/add-atom-persisted-swr.md b/.changeset/add-atom-persisted-swr.md new file mode 100644 index 0000000000..e741b8072c --- /dev/null +++ b/.changeset/add-atom-persisted-swr.md @@ -0,0 +1,5 @@ +--- +"effect": minor +--- + +Add `Atom.persistedSwr` to combine key-value persistence with stale-while-revalidate, preferring valid registry values over fresh persisted cache for SSR and hydration, and `Atom.invalidatePersisted` to clear persisted SWR cache entries and refetch. diff --git a/packages/effect/src/unstable/reactivity/Atom.ts b/packages/effect/src/unstable/reactivity/Atom.ts index c9e7532e02..5303f2f158 100644 --- a/packages/effect/src/unstable/reactivity/Atom.ts +++ b/packages/effect/src/unstable/reactivity/Atom.ts @@ -1834,6 +1834,234 @@ const shouldRevalidateSWR = (result: AsyncResult.AsyncResult, staleT return !isFreshWithin(timestamp, staleTime, Date.now()) } +function preferValidRegistryValue( + registry: AtomContext["registry"], + self: Atom> & Serializable, Schema.Codec>>, + persisted: AsyncResult.AsyncResult +): AsyncResult.AsyncResult { + const selfKey = self[SerializableTypeId].key + const selfNode = registry.getNodes().get(selfKey) + if (selfNode?.currentState() !== "valid") return persisted + + const live = selfNode.value() + const liveTs = Option.getOrUndefined(swrTimestamp(live)) + const persistedTs = Option.getOrUndefined(swrTimestamp(persisted)) + if (liveTs !== undefined && (persistedTs === undefined || liveTs >= persistedTs)) return live + + return persisted +} + +/** + * Options for {@link Atom.persistedSwr}. Focus and mount options match {@link Atom.swr}. + */ +export type PersistedSwrOptions, E extends Schema.Codec> = { + /** + * Key-value store key for persisted cache entries (for example `todos-cache:${id}`). + */ + readonly key: string + + /** + * Runtime providing {@link KeyValueStore.KeyValueStore}. + */ + readonly runtime: AtomRuntime + + /** + * Schema for persisted {@link AsyncResult.AsyncResult} values. + */ + readonly schema: AsyncResult.Schema + + /** + * Value written when the store key is missing. Defaults to `AsyncResult.initial(true)`. + */ + readonly defaultValue?: + | LazyArg, Schema.Schema.Type>> + | undefined + + /** + * Duration after which cached data is stale. + */ + readonly staleTime: Duration.Input + + /** + * Skip background revalidation on the first read when persisted data is stale. + */ + readonly revalidateOnMount?: boolean | undefined + + /** + * Revalidate on focus. `true` respects `staleTime`; `"always"` forces refetch. Requires {@link focusSignal}. + */ + readonly revalidateOnFocus?: boolean | "always" | undefined + + /** + * Focus signal atom (for example a window-focus counter). + */ + readonly focusSignal?: Atom | undefined +} + +/** + * Combines {@link Atom.kvs} persistence with {@link Atom.swr} for an `AsyncResult` atom. + * + * Reads the key-value store when fresh. If the wrapped atom already has a valid registry + * value with a newer or equal timestamp, that value is returned instead (for SSR and + * hydration). Only `Success` values are persisted; newer successes sync the store + * immediately. + * + * When persisted data is fresh, the inner source is not read automatically. Manual + * `registry.refresh` refetches the inner source while keeping persisted storage. + * Use {@link Atom.invalidatePersisted} to remove the store entry and force a hard reset. + * + * @example + * ```ts + * import { Effect, Schema } from "effect" + * import { BrowserKeyValueStore } from "@effect/platform-browser" + * import { AtomRegistry, AsyncResult, Atom } from "effect/unstable/reactivity" + * import { Todo } from "@/features/todos/schemas" + * import { ApiError } from "@/lib/api/definitions" + * + * const runtime = Atom.runtime(BrowserKeyValueStore.layerLocalStorage) + * + * class ApiClient extends AtomHttpApi.Service()("ApiClient", { ... }) {} + * + * const schema = AsyncResult.Schema({ success: Schema.Array(Todo), error: ApiError }) + * + * const todosAtom = ApiClient.query("todos", "getAll", {}).pipe( + * Atom.persistedSwr({ + * key: "todos-getAll-cache", + * runtime, + * schema, + * staleTime: "1 minute", + * revalidateOnMount: false, + * }) + * ) + * + * // Force a network refetch: + * // registry.refresh(todos) + * + * // Clear persisted storage and refetch: + * // yield* Atom.invalidatePersisted(todos) + * ``` + * + * @see {@link Atom.swr} + * @see {@link Atom.kvs} + * + * @category combinators + * @since 4.0.0 + */ +export const persistedSwr: { + , E extends Schema.Codec>( + options: PersistedSwrOptions + ): ( + self: Atom, Schema.Schema.Type>> + ) => + & Atom, Schema.Schema.Type>> + & Serializable> + & Persisted + , E extends Schema.Codec>( + self: Atom, Schema.Schema.Type>>, + options: PersistedSwrOptions + ): + & Atom, Schema.Schema.Type>> + & Serializable> + & Persisted +} = dual(2, function< + A extends Schema.Codec, + E extends Schema.Codec +>( + _self: Atom, Schema.Schema.Type>>, + options: PersistedSwrOptions +): + & Atom< + AsyncResult.AsyncResult, Schema.Schema.Type> + > + & Serializable> + & Persisted +{ + const staleTime = Duration.toMillis(Duration.fromInputUnsafe(options.staleTime)) + + const persistenceAtom = kvs({ + runtime: options.runtime, + key: options.key, + schema: options.schema, + defaultValue: options.defaultValue ?? (() => AsyncResult.initial(true)) + }) + + const self: + & Atom, Schema.Schema.Type>> + & Serializable> = hasProperty(_self, SerializableTypeId) + ? (_self as any) + : _self.pipe(serializable({ key: `inner-serializable-${options.key}`, schema: options.schema })) + + let revalidating = false + + const outer = transform( + self, + function(get) { + const persisted = get(persistenceAtom) + + get.subscribe(self, function(value) { + if (AsyncResult.isInitial(value) || value.waiting) return + if (AsyncResult.isSuccess(value)) get.set(persistenceAtom, value) + revalidating = false + get.setSelf(value) + }) + + const firstRead = Option.isNone( + get.self, Schema.Schema.Type>>() + ) + + const timestamp = Option.getOrUndefined(swrTimestamp(persisted)) + + const displayed = preferValidRegistryValue(get.registry, self, persisted) + + const shouldRevalidate = !AsyncResult.isSuccess(persisted) || + ( + timestamp !== undefined && + !isFreshWithin(timestamp, staleTime, Date.now()) && + displayed === persisted && + !(firstRead && options.revalidateOnMount === false) + ) + + if (shouldRevalidate && !revalidating) { + revalidating = true + get.refresh(self) + } + + if (options.revalidateOnFocus && options.focusSignal) { + get.once(options.focusSignal) + get.subscribe( + options.focusSignal, + options.revalidateOnFocus === "always" + ? () => get.refresh(self) + : () => { + const current = get.once(self) + const displayed = AsyncResult.isInitial(current) ? persisted : current + if (shouldRevalidateSWR(displayed, staleTime)) get.refresh(self) + } + ) + } + + if ( + AsyncResult.isSuccess(displayed) && + displayed !== persisted + ) { + get.set(persistenceAtom, displayed) + } + + return displayed + }, + { initialValueTarget: self } + ).pipe(serializable({ key: `outer-serializable-${options.key}`, schema: options.schema })) + + return Object.assign(Object.create(Object.getPrototypeOf(outer)), { + ...outer, + [PersistedTypeId]: { + key: options.key, + persistence: persistenceAtom, + source: self + } + }) as typeof outer & Persisted +}) + /** * Wraps an atom in a writable optimistic atom. * @@ -2354,6 +2582,38 @@ export const getResult = ( export const refresh = (self: Atom): Effect.Effect => Effect.map(AtomRegistry, (_) => _.refresh(self)) +/** + * Invalidates an atom through the `AtomRegistry` service, forcing a re-read on + * the next access. + * + * @category converting + * @since 4.0.0 + */ +export const invalidate = (self: Atom): Effect.Effect => + Effect.map(AtomRegistry, (_) => _.invalidate(self)) + +/** + * Removes the persisted key-value store entry for a {@link Atom.persistedSwr} atom, + * invalidates its registry nodes, and refetches the inner source. + * + * Unlike {@link refresh}, this clears persisted storage before refetching. The + * `KeyValueStore` service must be available in the Effect context. + * + * @category converting + * @since 4.0.0 + */ +export const invalidatePersisted = ( + self: Atom & Persisted +): Effect.Effect => + Effect.gen(function*() { + const meta = self[PersistedTypeId] + const registry = yield* AtomRegistry + yield* KeyValueStore.KeyValueStore.use((store) => store.remove(meta.key)) + registry.invalidate(meta.persistence) + registry.invalidate(self) + registry.refresh(meta.source) + }) + /** * Mounts an atom in the `AtomRegistry` for the lifetime of the current scope. * @@ -2368,6 +2628,50 @@ export const refresh = (self: Atom): Effect.Effect(self: Atom): Effect.Effect => AtomRegistry.use((r) => Registry.mount(r, self)) +// ----------------------------------------------------------------------------- +// Persisted +// ----------------------------------------------------------------------------- + +/** + * The type id used to mark atoms created by {@link persistedSwr}. + * + * @category Persisted + * @since 4.0.0 + */ +export const PersistedTypeId: PersistedTypeId = "~effect-atom/atom/Atom/Persisted" + +/** + * The literal type of the persisted atom marker. + * + * @category Persisted + * @since 4.0.0 + */ +export type PersistedTypeId = "~effect-atom/atom/Atom/Persisted" + +interface PersistedMetadata { + readonly key: string + readonly persistence: Writable, AsyncResult.AsyncResult> + readonly source: Atom> +} + +/** + * Metadata attached to atoms created by {@link persistedSwr}. + * + * @category Persisted + * @since 4.0.0 + */ +export interface Persisted { + readonly [PersistedTypeId]: PersistedMetadata +} + +/** + * Returns `true` when an atom carries {@link Persisted} metadata. + * + * @category Persisted + * @since 4.0.0 + */ +export const isPersisted = (self: Atom): self is Atom & Persisted => PersistedTypeId in self + // ----------------------------------------------------------------------------- // Serializable // ----------------------------------------------------------------------------- diff --git a/packages/effect/src/unstable/reactivity/AtomRegistry.ts b/packages/effect/src/unstable/reactivity/AtomRegistry.ts index a6b80d8caf..3b7c71eab3 100644 --- a/packages/effect/src/unstable/reactivity/AtomRegistry.ts +++ b/packages/effect/src/unstable/reactivity/AtomRegistry.ts @@ -80,6 +80,7 @@ export interface AtomRegistry { readonly get: (atom: Atom.Atom) => A readonly mount: (atom: Atom.Atom) => () => void readonly refresh: (atom: Atom.Atom) => void + readonly invalidate: (atom: Atom.Atom) => void readonly set: (atom: Atom.Writable, value: W) => void readonly setSerializable: (key: string, encoded: unknown) => void readonly modify: (atom: Atom.Writable, f: (_: R) => [returnValue: A, nextValue: W]) => A @@ -403,6 +404,10 @@ class RegistryImpl implements AtomRegistry { } } + invalidate = (atom: Atom.Atom): void => { + this.invalidateAtom(atom) + } + subscribe(atom: Atom.Atom, f: (_: A) => void, options?: { readonly immediate?: boolean }): () => void { const node = this.ensureNode(atom) if (options?.immediate) { diff --git a/packages/effect/test/reactivity/Atom.test.ts b/packages/effect/test/reactivity/Atom.test.ts index efa4582bbb..674132983b 100644 --- a/packages/effect/test/reactivity/Atom.test.ts +++ b/packages/effect/test/reactivity/Atom.test.ts @@ -2319,6 +2319,232 @@ describe.sequential("Atom", () => { assert.strictEqual(runs, 2) }) + describe("persistedSwr", () => { + const cacheKey = "persisted-swr-cache" + const schema = AsyncResult.Schema({ success: Schema.Number, error: Schema.String }) + + const layerMemory = (storage: Map) => + Layer.succeed( + KeyValueStore.KeyValueStore, + KeyValueStore.makeStringOnly({ + get: (key) => Effect.sync(() => storage.get(key)), + set: (key, value) => Effect.sync(() => storage.set(key, value)), + remove: (key) => Effect.sync(() => storage.delete(key)), + clear: Effect.sync(() => storage.clear()), + size: Effect.sync(() => storage.size) + }) + ) + + const seedStorage = ( + storage: Map, + result: AsyncResult.AsyncResult + ) => { + storage.set(cacheKey, JSON.stringify(Schema.encodeSync(schema)(result))) + } + + const makeSetup = (storage: Map) => { + const runtime = Atom.runtime(layerMemory(storage)) + const makeSelf = (source: Atom.Atom>) => + source.pipe(Atom.serializable({ key: `inner-serializable-${cacheKey}`, schema })) + const makePersistedSwr = ( + self: Atom.Atom>, + staleTime = 10_000 + ) => self.pipe(Atom.persistedSwr({ key: cacheKey, runtime, schema, staleTime })) + return { makeSelf, makePersistedSwr } + } + + test("returns fresh persisted value without fetching", async () => { + const storage = new Map() + const { makeSelf, makePersistedSwr } = makeSetup(storage) + seedStorage(storage, AsyncResult.success(1, { timestamp: Date.now() })) + + let runs = 0 + const source = Atom.make(Effect.sync(() => ++runs)).pipe(Atom.keepAlive) + const atom = makePersistedSwr(makeSelf(source)) + + const r = AtomRegistry.make() + const unmount = r.mount(atom) + + const result = r.get(atom) + await Effect.runPromise(Effect.yieldNow) + + assert(AsyncResult.isSuccess(result)) + assert.strictEqual(result.value, 1) + assert.strictEqual(runs, 0) + + unmount() + }) + + test("prefers valid registry value over fresh persisted and syncs store", async () => { + const storage = new Map() + const { makeSelf, makePersistedSwr } = makeSetup(storage) + const now = Date.now() + seedStorage(storage, AsyncResult.success(1, { timestamp: now - 5_000 })) + + let runs = 0 + const source = Atom.make(Effect.sync(() => { + runs++ + return 2 + })).pipe(Atom.keepAlive) + const self = makeSelf(source) + const atom = makePersistedSwr(self) + + const r = AtomRegistry.make() + r.mount(self) + const live = r.get(self) + assert(AsyncResult.isSuccess(live)) + assert.strictEqual(live.value, 2) + assert.strictEqual(runs, 1) + + const unmount = r.mount(atom) + const result = r.get(atom) + await Effect.runPromise(Effect.yieldNow) + + assert(AsyncResult.isSuccess(result)) + assert.strictEqual(result.value, 2) + assert.strictEqual(runs, 1) + + const stored = storage.get(cacheKey) + assert(stored !== undefined) + const parsed = JSON.parse(stored) as { readonly value: number } + assert.strictEqual(parsed.value, 2) + + unmount() + }) + + test("revalidates when persisted is stale and registry has no live value", async () => { + const storage = new Map() + const { makeSelf, makePersistedSwr } = makeSetup(storage) + seedStorage(storage, AsyncResult.success(1, { timestamp: Date.now() - 2_000 })) + + let runs = 0 + const source = Atom.make(Effect.sync(() => ++runs)).pipe(Atom.keepAlive) + const atom = makePersistedSwr(makeSelf(source), 1_000) + + const r = AtomRegistry.make() + const unmount = r.mount(atom) + + r.get(atom) + await Effect.runPromise(Effect.yieldNow) + + assert.strictEqual(runs, 1) + + unmount() + }) + + test("does not revalidate when live registry value is already displayed", async () => { + const storage = new Map() + const { makeSelf, makePersistedSwr } = makeSetup(storage) + seedStorage(storage, AsyncResult.success(1, { timestamp: Date.now() - 2_000 })) + + let runs = 0 + const source = Atom.make(Effect.sync(() => ++runs)).pipe(Atom.keepAlive) + const self = makeSelf(source) + const atom = makePersistedSwr(self, 1_000) + + const r = AtomRegistry.make() + r.mount(self) + const live = r.get(self) + assert(AsyncResult.isSuccess(live)) + assert.strictEqual(live.value, 1) + assert.strictEqual(runs, 1) + + const unmount = r.mount(atom) + const result = r.get(atom) + await Effect.runPromise(Effect.yieldNow) + + assert(AsyncResult.isSuccess(result)) + assert.strictEqual(result.value, 1) + assert.strictEqual(runs, 1) + + unmount() + }) + + test("invalidatePersisted clears fresh persisted cache and refetches", async () => { + const storage = new Map() + let removed = false + const kvsLayer = Layer.succeed( + KeyValueStore.KeyValueStore, + KeyValueStore.makeStringOnly({ + get: (key) => Effect.sync(() => storage.get(key)), + set: (key, value) => Effect.sync(() => storage.set(key, value)), + remove: (key) => + Effect.sync(() => { + removed = true + storage.delete(key) + }), + clear: Effect.sync(() => storage.clear()), + size: Effect.sync(() => storage.size) + }) + ) + const runtime = Atom.runtime(kvsLayer) + const makeSelf = (source: Atom.Atom>) => + source.pipe(Atom.serializable({ key: `inner-serializable-${cacheKey}`, schema })) + const makePersistedSwr = ( + self: Atom.Atom>, + staleTime = 10_000 + ) => self.pipe(Atom.persistedSwr({ key: cacheKey, runtime, schema, staleTime })) + + seedStorage(storage, AsyncResult.success(1, { timestamp: Date.now() })) + + let runs = 0 + const source = Atom.make(Effect.sync(() => ++runs)).pipe(Atom.keepAlive) + const atom = makePersistedSwr(makeSelf(source)) + + const r = AtomRegistry.make() + const unmount = r.mount(atom) + + const initial = r.get(atom) + assert(AsyncResult.isSuccess(initial)) + assert.strictEqual(initial.value, 1) + assert.strictEqual(runs, 0) + + await Effect.runPromise( + Atom.invalidatePersisted(atom).pipe( + Effect.provideService(AtomRegistry.AtomRegistry, r), + Effect.provide(kvsLayer) + ) + ) + await Effect.runPromise(Effect.yieldNow) + + assert(removed) + assert.strictEqual(runs, 1) + const result = r.get(atom) + assert(AsyncResult.isSuccess(result)) + assert.strictEqual(result.value, 1) + + unmount() + }) + + test("refresh keeps KVS when fresh", async () => { + const storage = new Map() + const { makeSelf, makePersistedSwr } = makeSetup(storage) + seedStorage(storage, AsyncResult.success(1, { timestamp: Date.now() })) + const originalStored = storage.get(cacheKey) + + let runs = 0 + const source = Atom.make(Effect.sync(() => ++runs)).pipe(Atom.keepAlive) + const atom = makePersistedSwr(makeSelf(source)) + + const r = AtomRegistry.make() + const unmount = r.mount(atom) + + assert.strictEqual(runs, 0) + + await Effect.runPromise( + Atom.refresh(atom).pipe( + Effect.provideService(AtomRegistry.AtomRegistry, r) + ) + ) + await Effect.runPromise(Effect.yieldNow) + + assert.strictEqual(runs, 1) + assert.strictEqual(storage.get(cacheKey), originalStored) + + unmount() + }) + }) + describe("kvs", () => { it("memoizes defaultValue while loading empty storage", async () => { let calls = 0