Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
5 changes: 5 additions & 0 deletions .changeset/add-atom-persisted-swr.md
Original file line number Diff line number Diff line change
@@ -0,0 +1,5 @@
---
"effect": minor
---

Add `Atom.persistedSwr` to combine key-value persistence with stale-while-revalidate, preferring valid registry values over fresh persisted cache for SSR and hydration, and `Atom.invalidatePersisted` to clear persisted SWR cache entries and refetch.
304 changes: 304 additions & 0 deletions packages/effect/src/unstable/reactivity/Atom.ts
Original file line number Diff line number Diff line change
Expand Up @@ -1834,6 +1834,234 @@ const shouldRevalidateSWR = <A, E>(result: AsyncResult.AsyncResult<A, E>, staleT
return !isFreshWithin(timestamp, staleTime, Date.now())
}

function preferValidRegistryValue<A, E>(
registry: AtomContext["registry"],
self: Atom<AsyncResult.AsyncResult<A, E>> & Serializable<AsyncResult.Schema<Schema.Codec<A>, Schema.Codec<E>>>,
persisted: AsyncResult.AsyncResult<A, E>
): AsyncResult.AsyncResult<A, E> {
const selfKey = self[SerializableTypeId].key
const selfNode = registry.getNodes().get(selfKey)
if (selfNode?.currentState() !== "valid") return persisted

const live = selfNode.value()
const liveTs = Option.getOrUndefined(swrTimestamp(live))
const persistedTs = Option.getOrUndefined(swrTimestamp(persisted))
if (liveTs !== undefined && (persistedTs === undefined || liveTs >= persistedTs)) return live

return persisted
}

/**
* Options for {@link Atom.persistedSwr}. Focus and mount options match {@link Atom.swr}.
*/
export type PersistedSwrOptions<A extends Schema.Codec<any>, E extends Schema.Codec<any>> = {
/**
* Key-value store key for persisted cache entries (for example `todos-cache:${id}`).
*/
readonly key: string

/**
* Runtime providing {@link KeyValueStore.KeyValueStore}.
*/
readonly runtime: AtomRuntime<KeyValueStore.KeyValueStore, any>

/**
* Schema for persisted {@link AsyncResult.AsyncResult} values.
*/
readonly schema: AsyncResult.Schema<A, E>

/**
* Value written when the store key is missing. Defaults to `AsyncResult.initial(true)`.
*/
readonly defaultValue?:
| LazyArg<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>
| undefined

/**
* Duration after which cached data is stale.
*/
readonly staleTime: Duration.Input

/**
* Skip background revalidation on the first read when persisted data is stale.
*/
readonly revalidateOnMount?: boolean | undefined

/**
* Revalidate on focus. `true` respects `staleTime`; `"always"` forces refetch. Requires {@link focusSignal}.
*/
readonly revalidateOnFocus?: boolean | "always" | undefined

/**
* Focus signal atom (for example a window-focus counter).
*/
readonly focusSignal?: Atom<any> | undefined
}

/**
* Combines {@link Atom.kvs} persistence with {@link Atom.swr} for an `AsyncResult` atom.
*
* Reads the key-value store when fresh. If the wrapped atom already has a valid registry
* value with a newer or equal timestamp, that value is returned instead (for SSR and
* hydration). Only `Success` values are persisted; newer successes sync the store
* immediately.
*
* When persisted data is fresh, the inner source is not read automatically. Manual
* `registry.refresh` refetches the inner source while keeping persisted storage.
* Use {@link Atom.invalidatePersisted} to remove the store entry and force a hard reset.
*
* @example
* ```ts
* import { Effect, Schema } from "effect"
* import { BrowserKeyValueStore } from "@effect/platform-browser"
* import { AtomRegistry, AsyncResult, Atom } from "effect/unstable/reactivity"
* import { Todo } from "@/features/todos/schemas"
* import { ApiError } from "@/lib/api/definitions"
*
* const runtime = Atom.runtime(BrowserKeyValueStore.layerLocalStorage)
*
* class ApiClient extends AtomHttpApi.Service<ApiClient>()("ApiClient", { ... }) {}
*
* const schema = AsyncResult.Schema({ success: Schema.Array(Todo), error: ApiError })
*
* const todosAtom = ApiClient.query("todos", "getAll", {}).pipe(
* Atom.persistedSwr({
* key: "todos-getAll-cache",
* runtime,
* schema,
* staleTime: "1 minute",
* revalidateOnMount: false,
* })
* )
*
* // Force a network refetch:
* // registry.refresh(todos)
*
* // Clear persisted storage and refetch:
* // yield* Atom.invalidatePersisted(todos)
* ```
*
* @see {@link Atom.swr}
* @see {@link Atom.kvs}
*
* @category combinators
* @since 4.0.0
*/
export const persistedSwr: {
<A extends Schema.Codec<any>, E extends Schema.Codec<any>>(
options: PersistedSwrOptions<A, E>
): (
self: Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>
) =>
& Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>
& Serializable<AsyncResult.Schema<A, E>>
& Persisted
<A extends Schema.Codec<any>, E extends Schema.Codec<any>>(
self: Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>,
options: PersistedSwrOptions<A, E>
):
& Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>
& Serializable<AsyncResult.Schema<A, E>>
& Persisted
} = dual(2, function<
A extends Schema.Codec<any>,
E extends Schema.Codec<any>
>(
_self: Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>,
options: PersistedSwrOptions<A, E>
):
& Atom<
AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>
>
& Serializable<AsyncResult.Schema<A, E>>
& Persisted
{
const staleTime = Duration.toMillis(Duration.fromInputUnsafe(options.staleTime))

const persistenceAtom = kvs({
runtime: options.runtime,
key: options.key,
schema: options.schema,
defaultValue: options.defaultValue ?? (() => AsyncResult.initial(true))
})

const self:
& Atom<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>
& Serializable<AsyncResult.Schema<A, E>> = hasProperty(_self, SerializableTypeId)
? (_self as any)
: _self.pipe(serializable({ key: `inner-serializable-${options.key}`, schema: options.schema }))

let revalidating = false

const outer = transform(
self,
function(get) {
const persisted = get(persistenceAtom)

get.subscribe(self, function(value) {
if (AsyncResult.isInitial(value) || value.waiting) return
if (AsyncResult.isSuccess(value)) get.set(persistenceAtom, value)
revalidating = false
get.setSelf(value)
})

const firstRead = Option.isNone(
get.self<AsyncResult.AsyncResult<Schema.Schema.Type<A>, Schema.Schema.Type<E>>>()
)

const timestamp = Option.getOrUndefined(swrTimestamp(persisted))

const displayed = preferValidRegistryValue(get.registry, self, persisted)

const shouldRevalidate = !AsyncResult.isSuccess(persisted) ||
(
timestamp !== undefined &&
!isFreshWithin(timestamp, staleTime, Date.now()) &&
displayed === persisted &&
!(firstRead && options.revalidateOnMount === false)
)

if (shouldRevalidate && !revalidating) {
revalidating = true
get.refresh(self)
}

if (options.revalidateOnFocus && options.focusSignal) {
get.once(options.focusSignal)
get.subscribe(
options.focusSignal,
options.revalidateOnFocus === "always"
? () => get.refresh(self)
: () => {
const current = get.once(self)
const displayed = AsyncResult.isInitial(current) ? persisted : current
if (shouldRevalidateSWR(displayed, staleTime)) get.refresh(self)
}
)
}

if (
AsyncResult.isSuccess(displayed) &&
displayed !== persisted
) {
get.set(persistenceAtom, displayed)
}

return displayed
},
{ initialValueTarget: self }
).pipe(serializable({ key: `outer-serializable-${options.key}`, schema: options.schema }))

return Object.assign(Object.create(Object.getPrototypeOf(outer)), {
...outer,
[PersistedTypeId]: {
key: options.key,
persistence: persistenceAtom,
source: self
}
}) as typeof outer & Persisted
})

/**
* Wraps an atom in a writable optimistic atom.
*
Expand Down Expand Up @@ -2354,6 +2582,38 @@ export const getResult = <A, E>(
export const refresh = <A>(self: Atom<A>): Effect.Effect<void, never, AtomRegistry> =>
Effect.map(AtomRegistry, (_) => _.refresh(self))

/**
* Invalidates an atom through the `AtomRegistry` service, forcing a re-read on
* the next access.
*
* @category converting
* @since 4.0.0
*/
export const invalidate = <A>(self: Atom<A>): Effect.Effect<void, never, AtomRegistry> =>
Effect.map(AtomRegistry, (_) => _.invalidate(self))

/**
* Removes the persisted key-value store entry for a {@link Atom.persistedSwr} atom,
* invalidates its registry nodes, and refetches the inner source.
*
* Unlike {@link refresh}, this clears persisted storage before refetching. The
* `KeyValueStore` service must be available in the Effect context.
*
* @category converting
* @since 4.0.0
*/
export const invalidatePersisted = (
self: Atom<any> & Persisted
): Effect.Effect<void, KeyValueStore.KeyValueStoreError, AtomRegistry | KeyValueStore.KeyValueStore> =>
Effect.gen(function*() {
const meta = self[PersistedTypeId]
const registry = yield* AtomRegistry
yield* KeyValueStore.KeyValueStore.use((store) => store.remove(meta.key))
registry.invalidate(meta.persistence)
registry.invalidate(self)
registry.refresh(meta.source)
})

/**
* Mounts an atom in the `AtomRegistry` for the lifetime of the current scope.
*
Expand All @@ -2368,6 +2628,50 @@ export const refresh = <A>(self: Atom<A>): Effect.Effect<void, never, AtomRegist
export const mount = <A>(self: Atom<A>): Effect.Effect<void, never, AtomRegistry | Scope.Scope> =>
AtomRegistry.use((r) => Registry.mount(r, self))

// -----------------------------------------------------------------------------
// Persisted
// -----------------------------------------------------------------------------

/**
* The type id used to mark atoms created by {@link persistedSwr}.
*
* @category Persisted
* @since 4.0.0
*/
export const PersistedTypeId: PersistedTypeId = "~effect-atom/atom/Atom/Persisted"

/**
* The literal type of the persisted atom marker.
*
* @category Persisted
* @since 4.0.0
*/
export type PersistedTypeId = "~effect-atom/atom/Atom/Persisted"

interface PersistedMetadata {
readonly key: string
readonly persistence: Writable<AsyncResult.AsyncResult<any, any>, AsyncResult.AsyncResult<any, any>>
readonly source: Atom<AsyncResult.AsyncResult<any, any>>
}

/**
* Metadata attached to atoms created by {@link persistedSwr}.
*
* @category Persisted
* @since 4.0.0
*/
export interface Persisted {
readonly [PersistedTypeId]: PersistedMetadata
}

/**
* Returns `true` when an atom carries {@link Persisted} metadata.
*
* @category Persisted
* @since 4.0.0
*/
export const isPersisted = (self: Atom<any>): self is Atom<any> & Persisted => PersistedTypeId in self

// -----------------------------------------------------------------------------
// Serializable
// -----------------------------------------------------------------------------
Expand Down
5 changes: 5 additions & 0 deletions packages/effect/src/unstable/reactivity/AtomRegistry.ts
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ export interface AtomRegistry {
readonly get: <A>(atom: Atom.Atom<A>) => A
readonly mount: <A>(atom: Atom.Atom<A>) => () => void
readonly refresh: <A>(atom: Atom.Atom<A>) => void
readonly invalidate: <A>(atom: Atom.Atom<A>) => void
readonly set: <R, W>(atom: Atom.Writable<R, W>, value: W) => void
readonly setSerializable: (key: string, encoded: unknown) => void
readonly modify: <R, W, A>(atom: Atom.Writable<R, W>, f: (_: R) => [returnValue: A, nextValue: W]) => A
Expand Down Expand Up @@ -403,6 +404,10 @@ class RegistryImpl implements AtomRegistry {
}
}

invalidate = <A>(atom: Atom.Atom<A>): void => {
this.invalidateAtom(atom)
}

subscribe<A>(atom: Atom.Atom<A>, f: (_: A) => void, options?: { readonly immediate?: boolean }): () => void {
const node = this.ensureNode(atom)
if (options?.immediate) {
Expand Down
Loading