diff --git a/.changeset/olive-coins-sleep.md b/.changeset/olive-coins-sleep.md new file mode 100644 index 000000000..624ca768c --- /dev/null +++ b/.changeset/olive-coins-sleep.md @@ -0,0 +1,9 @@ +--- +'@tanstack/db': patch +'@tanstack/offline-transactions': patch +'@tanstack/query-db-collection': patch +--- + +fix: prevent stale query refreshes from overwriting optimistic offline changes on reconnect + +When reconnecting with pending offline transactions, query-backed collections now defer processing query refreshes until queued writes finish replaying, avoiding temporary reverts to stale server data. diff --git a/.github/workflows/e2e-tests.yml b/.github/workflows/e2e-tests.yml index 6558b4589..1716e94b2 100644 --- a/.github/workflows/e2e-tests.yml +++ b/.github/workflows/e2e-tests.yml @@ -41,6 +41,7 @@ jobs: pnpm --filter @tanstack/db-ivm build pnpm --filter @tanstack/db build pnpm --filter @tanstack/electric-db-collection build + pnpm --filter @tanstack/offline-transactions build pnpm --filter @tanstack/query-db-collection build - name: Run Electric E2E tests diff --git a/packages/db/src/collection/index.ts b/packages/db/src/collection/index.ts index d95103267..2cb975f91 100644 --- a/packages/db/src/collection/index.ts +++ b/packages/db/src/collection/index.ts @@ -301,6 +301,13 @@ export class CollectionImpl< // and for debugging public _state: CollectionStateManager + /** + * When set, collection consumers should defer processing incoming data + * refreshes until this promise resolves. This prevents stale data from + * overwriting optimistic state while pending writes are being applied. + */ + public deferDataRefresh: Promise | null = null + private comparisonOpts: StringCollationConfig /** diff --git a/packages/offline-transactions/src/OfflineExecutor.ts b/packages/offline-transactions/src/OfflineExecutor.ts index cc537b3a3..8f443277c 100644 --- a/packages/offline-transactions/src/OfflineExecutor.ts +++ b/packages/offline-transactions/src/OfflineExecutor.ts @@ -221,12 +221,36 @@ export class OfflineExecutor { this.unsubscribeOnline = this.onlineDetector.subscribe(() => { if (this.isOfflineEnabled && this.executor) { this.executor.resetRetryDelays() - this.executor.executeAll().catch((error) => { - console.warn( - `Failed to execute transactions on connectivity change:`, - error, - ) - }) + + if (this.scheduler.getPendingCount() > 0) { + const barrierPromise = this.executor.executeAll() + + for (const collection of Object.values(this.config.collections)) { + collection.deferDataRefresh = barrierPromise + } + + barrierPromise + .catch((error) => { + console.warn( + `Failed to execute transactions on connectivity change:`, + error, + ) + }) + .finally(() => { + for (const collection of Object.values(this.config.collections)) { + if (collection.deferDataRefresh === barrierPromise) { + collection.deferDataRefresh = null + } + } + }) + } else { + this.executor.executeAll().catch((error) => { + console.warn( + `Failed to execute transactions on connectivity change:`, + error, + ) + }) + } } }) } @@ -568,6 +592,10 @@ export class OfflineExecutor { } dispose(): void { + for (const collection of Object.values(this.config.collections)) { + collection.deferDataRefresh = null + } + if (this.unsubscribeOnline) { this.unsubscribeOnline() this.unsubscribeOnline = null diff --git a/packages/query-db-collection/e2e/offline-refresh.e2e.test.ts b/packages/query-db-collection/e2e/offline-refresh.e2e.test.ts new file mode 100644 index 000000000..db07f7048 --- /dev/null +++ b/packages/query-db-collection/e2e/offline-refresh.e2e.test.ts @@ -0,0 +1,293 @@ +/** + * Integration test: offline transactions + query collection refresh + * + * Verifies that a query-backed collection does not revert to stale server + * state when coming back online with pending offline transactions. + */ + +import { describe, expect, it, vi } from 'vitest' +import { createCollection } from '@tanstack/db' +import { QueryClient } from '@tanstack/query-core' +import { startOfflineExecutor } from '@tanstack/offline-transactions' +import { queryCollectionOptions } from '../src/query' +import type { Collection } from '@tanstack/db' +import type { + LeaderElection, + OfflineConfig, + OnlineDetector, + StorageAdapter, +} from '@tanstack/offline-transactions' + +// --- Browser API mocks needed by @tanstack/offline-transactions --- +// jsdom doesn't provide navigator.locks, which the WebLocksLeader uses. +// We pass custom implementations (FakeLeaderElection, ManualOnlineDetector, +// FakeStorageAdapter) so these mocks just prevent initialization errors. + +if (!(globalThis.navigator as any)?.locks) { + Object.defineProperty(globalThis.navigator, `locks`, { + value: { request: vi.fn().mockResolvedValue(false) }, + configurable: true, + }) +} + +// --- Test helpers --- + +const flushMicrotasks = () => new Promise((resolve) => setTimeout(resolve, 0)) + +class ManualOnlineDetector implements OnlineDetector { + private listeners = new Set<() => void>() + private online: boolean + + constructor(initialOnline: boolean) { + this.online = initialOnline + } + + subscribe(callback: () => void): () => void { + this.listeners.add(callback) + return () => { + this.listeners.delete(callback) + } + } + + notifyOnline(): void { + for (const listener of this.listeners) { + listener() + } + } + + isOnline(): boolean { + return this.online + } + + setOnline(isOnline: boolean): void { + this.online = isOnline + if (isOnline) { + this.notifyOnline() + } + } + + dispose(): void { + this.listeners.clear() + } +} + +class FakeStorageAdapter implements StorageAdapter { + private store = new Map() + + get(key: string): Promise { + return Promise.resolve(this.store.has(key) ? this.store.get(key)! : null) + } + + set(key: string, value: string): Promise { + this.store.set(key, value) + return Promise.resolve() + } + + delete(key: string): Promise { + this.store.delete(key) + return Promise.resolve() + } + + keys(): Promise> { + return Promise.resolve(Array.from(this.store.keys())) + } + + clear(): Promise { + this.store.clear() + return Promise.resolve() + } +} + +class FakeLeaderElection implements LeaderElection { + private listeners = new Set<(isLeader: boolean) => void>() + private leader = true + + requestLeadership(): Promise { + this.notify(this.leader) + return Promise.resolve(this.leader) + } + + releaseLeadership(): void { + this.leader = false + this.notify(false) + } + + isLeader(): boolean { + return this.leader + } + + onLeadershipChange(callback: (isLeader: boolean) => void): () => void { + this.listeners.add(callback) + return () => { + this.listeners.delete(callback) + } + } + + private notify(isLeader: boolean): void { + for (const listener of this.listeners) { + listener(isLeader) + } + } +} + +// --- Test item type --- + +interface TestItem { + id: string + value: string +} + +type OfflineMutationParams = Parameters[0] + +// --- Tests --- + +describe(`offline transactions + query collection refresh`, () => { + it(`should not revert optimistic state when query refetches before pending offline transactions complete`, async () => { + // This test verifies that when a user goes offline, queues a mutation, + // and comes back online, the collection does not temporarily lose the + // optimistic insert. In a query-backed collection, data flows through + // query refetches (queryFn), not directly from the mutation function. + // When refetchOnReconnect fires before the offline transaction reaches + // the server, the refetch returns stale data. The optimistic state + // should remain visible until the transaction completes and a fresh + // refetch confirms the data. + + const onlineDetector = new ManualOnlineDetector(false) // Start offline + const storage = new FakeStorageAdapter() + + // --- Mock server state --- + const serverItems: Array = [ + { id: `item-1`, value: `server-data` }, + ] + + // Control when the mutation fn resolves + let resolveMutation: (() => void) | null = null + + const queryClient = new QueryClient({ + defaultOptions: { + queries: { + staleTime: 0, + retry: false, + }, + }, + }) + + // queryFn reads from serverItems (simulating a real API GET endpoint) + const queryFn = vi.fn().mockImplementation(() => { + return Promise.resolve([...serverItems]) + }) + + // Create the query-backed collection + const collection = createCollection( + queryCollectionOptions({ + id: `offline-refresh-test`, + queryClient, + queryKey: [`offline-refresh-test`], + queryFn, + getKey: (item: TestItem) => item.id, + startSync: true, + }), + ) + + // Wait for initial query to populate the collection + await vi.waitFor(() => { + expect(queryFn).toHaveBeenCalledTimes(1) + expect(collection.size).toBe(1) + }) + expect(collection.get(`item-1`)?.value).toBe(`server-data`) + + // --- Set up offline executor --- + const mutationFnName = `syncData` + const offlineConfig: OfflineConfig = { + collections: { [`offline-refresh-test`]: collection as any }, + mutationFns: { + [mutationFnName]: async (params: OfflineMutationParams) => { + // Block until the test explicitly resolves (simulating slow API POST) + await new Promise((resolve) => { + resolveMutation = resolve + }) + + // Update server state (simulating the server processing the mutation) + for (const mutation of params.transaction.mutations) { + if (mutation.type === `insert`) { + serverItems.push(mutation.modified as unknown as TestItem) + } + } + + return { ok: true } + }, + }, + storage, + leaderElection: new FakeLeaderElection(), + onlineDetector, + } + + const executor = startOfflineExecutor(offlineConfig) + await executor.waitForInit() + + // --- Go offline and create an offline mutation --- + const offlineTx = executor.createOfflineTransaction({ + mutationFnName, + autoCommit: false, + }) + + offlineTx.mutate(() => { + ;(collection as Collection).insert({ + id: `item-2`, + value: `offline-insert`, + }) + }) + + // Commit while offline: persists to outbox, mutation fn NOT called yet + const commitPromise = offlineTx.commit() + await flushMicrotasks() + + // Verify: item-2 is visible through optimistic state + expect(collection.get(`item-2`)?.value).toBe(`offline-insert`) + expect(collection.get(`item-1`)?.value).toBe(`server-data`) + + // --- Come online --- + // This triggers both: + // 1. The offline executor replaying pending transactions (mutationFn called) + // 2. TanStack Query potentially refetching (refetchOnReconnect default) + onlineDetector.setOnline(true) + await flushMicrotasks() + + // Trigger a query refetch that returns stale server state. + // The server doesn't have item-2 yet (the mutation is still in progress). + // This simulates what refetchOnReconnect would do. + await collection.utils.refetch() + + // The refetch returned stale data (only item-1), but item-2 should + // still be visible because the offline transaction is still pending + // and the optimistic state should cover the gap. + expect(collection.get(`item-2`)?.value).toBe(`offline-insert`) + + // --- Complete the mutation (server processes it) --- + expect(resolveMutation).not.toBeNull() + resolveMutation!() + + // Wait for the transaction to fully complete + await commitPromise + + // After the transaction completes, item-2 should remain visible. + // + // Without the fix: the stale refetch overwrote syncedData with only + // item-1, the optimistic state was cleaned up, and item-2 is gone + // permanently (no fresh refetch is triggered). + // + // With the fix: the stale refetch was skipped (barrier), and a fresh + // refetch is triggered once the barrier resolves. The fresh refetch + // includes item-2 because the server now has it. We use waitFor to + // allow the barrier-triggered refetch to complete. + await vi.waitFor( + () => { + expect(collection.get(`item-2`)?.value).toBe(`offline-insert`) + }, + { timeout: 1000 }, + ) + + executor.dispose() + queryClient.clear() + }) +}) diff --git a/packages/query-db-collection/package.json b/packages/query-db-collection/package.json index 369250d82..87da1ac72 100644 --- a/packages/query-db-collection/package.json +++ b/packages/query-db-collection/package.json @@ -54,6 +54,7 @@ "typescript": ">=4.7" }, "devDependencies": { + "@tanstack/offline-transactions": "workspace:*", "@tanstack/query-core": "^5.90.20", "@vitest/coverage-istanbul": "^3.2.4" } diff --git a/packages/query-db-collection/src/query.ts b/packages/query-db-collection/src/query.ts index 71d78b61c..45faca8f7 100644 --- a/packages/query-db-collection/src/query.ts +++ b/packages/query-db-collection/src/query.ts @@ -1332,9 +1332,25 @@ export function queryCollectionOptions( // eslint-disable-next-line no-shadow const makeQueryResultHandler = (queryKey: QueryKey) => { + const hashedQueryKey = hashKey(queryKey) const handleQueryResult: UpdateHandler = (result) => { if (result.isSuccess) { - if (retainedQueriesPendingRevalidation.has(hashKey(queryKey))) { + // Skip processing this result while data refreshes are deferred. + // Optimistic state covers the gap. Once the barrier resolves, + // trigger a fresh refetch to get authoritative data. + if (collection.deferDataRefresh) { + collection.deferDataRefresh.then(() => { + const observer = state.observers.get(hashedQueryKey) + if (observer) { + observer.refetch().catch(() => { + // Errors handled by the next handleQueryResult invocation + }) + } + }) + return + } + + if (retainedQueriesPendingRevalidation.has(hashedQueryKey)) { void reconcileSuccessfulResult(queryKey, result).catch((error) => { console.error( `[QueryCollection] Error reconciling query ${String(queryKey)}:`, diff --git a/packages/query-db-collection/tsconfig.json b/packages/query-db-collection/tsconfig.json index 3d3c48bdd..a28b5d501 100644 --- a/packages/query-db-collection/tsconfig.json +++ b/packages/query-db-collection/tsconfig.json @@ -15,9 +15,17 @@ "@tanstack/store": ["../store/src"], "@tanstack/db": ["../db/src"], "@tanstack/db-ivm": ["../db-ivm/src"], + "@tanstack/offline-transactions": ["../offline-transactions/src"], "@tanstack/db-collection-e2e": ["../db-collection-e2e/src"] } }, - "include": ["src", "tests", "e2e", "vite.config.ts", "vitest.e2e.config.ts"], + "include": [ + "src", + "tests", + "e2e", + "vite.config.ts", + "vitest.config.ts", + "vitest.e2e.config.ts" + ], "exclude": ["node_modules", "dist"] } diff --git a/packages/query-db-collection/vitest.config.ts b/packages/query-db-collection/vitest.config.ts new file mode 100644 index 000000000..56aa740fc --- /dev/null +++ b/packages/query-db-collection/vitest.config.ts @@ -0,0 +1,11 @@ +import { defineConfig } from 'vitest/config' + +export default defineConfig({ + test: { + exclude: [`e2e/**`, `**/node_modules/**`], + typecheck: { + enabled: true, + include: [`tests/**/*.test.ts`], + }, + }, +}) diff --git a/pnpm-lock.yaml b/pnpm-lock.yaml index 87ffb126b..2e26fe256 100644 --- a/pnpm-lock.yaml +++ b/pnpm-lock.yaml @@ -1324,6 +1324,9 @@ importers: specifier: '>=4.7' version: 5.9.3 devDependencies: + '@tanstack/offline-transactions': + specifier: workspace:* + version: link:../offline-transactions '@tanstack/query-core': specifier: ^5.90.20 version: 5.90.20