Fire batch writes concurrently instead of blocking#1269
Conversation
Make the in-memory store fire its batch write into a single-slot
pendingPersistence and return control, so the next batch can process
while the previous one writes. At most one write is in flight: the next
write awaits the prior before firing, keeping writes in batch order.
- InMemoryStore: split writeBatch into a synchronous prepare (snapshot +
store reset + committedCheckpointId advance) and a fired storage write.
Concurrency is gated on keepLatestChanges; when the store drops its
latest changes the write is awaited inline so later DB reads stay
consistent. flushPendingPersistence awaits the in-flight write and is
called from prepareRollbackDiff before clearing the cache.
- LoadLayer: serve effect cache hits from the in-flight write's effects
snapshot before reading the not-yet-committed DB rows.
- GlobalState: flush the pending write before rollback DB reads and
before the success-exit paths.
- MockIndexer: getBatchWritePromise awaits the in-flight write.
Known pending: one realtime-ordering E2E test ("Live source should not
participate in initial height fetch but should after sync") asserts a
fetch-vs-write interleaving that legitimately shifts now that the write
no longer blocks EventBatchProcessed. Awaiting decision on how to update.
https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
Reaching head now flips isRealtime before the first waitForNewBlock since the batch write no longer blocks EventBatchProcessed, so the first race already runs in realtime mode (Live primary, Sync secondary). https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
Resolve InMemoryStore conflicts from main's committedCheckpointId move, changesCount sizing, plain-array rawEvents, and simplified effects map by layering the concurrent-write logic (prepareWriteBatch/scheduleWriteBatch/ flushPendingPersistence + effects snapshot) on top of the new shapes. Also process batches with items even when no chain progressed yet: concurrent writes let ProcessEventBatch run before a partition's fetch response lands, producing a non-progressing partial batch that the old isEmpty guard dropped without rescheduling. Skip only when the batch is genuinely empty (totalBatchSize == 0 && progressedChainsById empty). https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
Resolve InMemoryStore conflict from #1267 (memory-limit handling): keep main's over-limit reset that preserves db-loaded entities and only drops everything when those alone exceed the limit, while layering the concurrent write logic on top (fire under the limit, await inline over it). https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
…chema change Mock indexer restart now awaits the in-memory store's in-flight write before starting the new indexer on the same DB, so the old and new writes don't race. Fix the effect cache read-through to only serve from the in-flight write's snapshot when it's the same effect instance. A different effect sharing the name (e.g. an updated output schema) must go through the DB-load path, which re-validates and invalidates stale outputs; serving the raw pending value bypassed invalidation and fed handlers a stale result. https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
|
Note Reviews pausedIt looks like this branch is under active development. To avoid overwhelming you with review comments due to an influx of new commits, CodeRabbit has automatically paused this review. You can configure this behavior by changing the Use the following commands to manage reviews:
Use the checkboxes below for quick actions:
📝 WalkthroughWalkthroughImplements a serialized async write loop in InMemoryStore with pending effect snapshots and staged chain metadata; EventProcessing commits to the in-memory queue, Persistence/PgStorage accept optional chainMetaData, GlobalState coordinates flushes on exit/rollback, throttler scheduling uses NodeJs.setImmediate, and tests/mock indexer updated to flush/wait. ChangesInMemoryStore async write loop & orchestration
Estimated code review effort 🎯 4 (Complex) | ⏱️ ~45 minutes Possibly related PRs
🚥 Pre-merge checks | ✅ 5✅ Passed checks (5 passed)
✏️ Tip: You can configure your own custom pre-merge checks in the settings. Comment |
There was a problem hiding this comment.
Actionable comments posted: 2
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/envio/src/InMemoryStore.res`:
- Around line 92-97: In flushPendingPersistence, don't clear
inMemoryStore.pendingPersistence before awaiting the in-flight promise because
that allows other callers to see None and proceed concurrently; instead capture
the pendingPersistence.promise to a local variable (or keep the Some record),
await that promise first, and only after the await set
inMemoryStore.pendingPersistence = None so the single-write ordering guarantee
remains intact (refer to flushPendingPersistence and
inMemoryStore.pendingPersistence).
In `@packages/envio/src/LoadLayer.res`:
- Around line 258-269: The pending read-through is pulling values from
pending.dict regardless of whether those outputs are cacheable; update the
branch in LoadLayer where you iterate pending.dict (the switch on
inMemoryStore.pendingPersistence / effects / pending) to only reuse entries that
were marked for persistence — e.g., check pending.idsToStore (or the per-arg
cacheable flag) before adding to idsFromCache and setting inMemTable.dict; in
short, only copy pending.dict[arg.cacheKey] into inMemTable.dict when that
arg/cacheKey was recorded as persistable by callEffect (idsToStore), otherwise
skip it.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: aee04841-b951-4bf4-8c46-dc0111011c5c
📒 Files selected for processing (5)
packages/envio/src/GlobalState.respackages/envio/src/InMemoryStore.respackages/envio/src/LoadLayer.resscenarios/test_codegen/test/E2E_test.resscenarios/test_codegen/test/helpers/MockIndexer.res
… cycle (#1276) * Make in-memory store persistence a standalone background cycle Decouple the database write from batch processing. Processing now only updates the in-memory store and continues; a persistence cycle owned by the in-memory store drains changes to Postgres on its own. - Split the checkpoint pointer into committedCheckpointId (last persisted to db) and processedCheckpointId (in-memory frontier). createBatch keys off processedCheckpointId; history retention still keys off committed. - commitBatch accumulates batch metadata and triggers a single-writer background loop (strictly one write in flight, overlapping processing). - Snapshot rawEvents/effects/entity changes synchronously at write start so the in-memory store is never reset before its changes are committed; effect outputs being written stay readable via a pending dict. - Capacity gate (50k changes) before each batch: drop committed changes, else await a commit. - Drain the cycle before a rollback and flush it before a successful exit. - Serialize chain-metadata writes with batch writes to avoid concurrent updates to the chains table. - MockIndexer awaits the full write (and settles) before returning. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Address PR review on EventProcessing - Await in-memory store capacity before starting the batch timer. - Drop the redundant comment over commitBatch. - Remove db-write duration from processing metrics; the write now happens off the processing path in the in-memory store cycle. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Fold chain-metadata write into the in-memory store cycle Persist chain metadata from the persistence cycle instead of a separate throttled write. Because the cycle is the single db writer, the metadata write no longer races batch writes on the chains table, so the throttler and the serializeDbWrite mutex are both removed. Also make the effect table's pendingDict always present instead of optional, for simplicity. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Write chain metadata via a separate throttler again Revert the in-cycle chain-metadata write back to a throttled, separate setChainMeta, serialized through the store's serializeDbWrite so it never overlaps a background batch write on the chains table. Also replace drainForRollback with flush - awaiting the write cycle already drains all pending batches, so the explicit resets were redundant. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Fold chain metadata into the batch write; tidy store fields - Persist chain metadata as part of the batch write transaction instead of a separate throttled write. The store keeps current vs committed metadata and only the stale per-chain diff is folded into writeBatch, so metadata never races the batch write and the throttler/serializeDbWrite are gone. - Make persistence and config immutable creation params of InMemoryStore instead of mutable fields set per batch. - Stop the ProcessEventBatch loop once an exit is decided, so the async exit flush doesn't let further batches process (fixes the auto-exit smoke test processing past the first event block). https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Carry isInReorgThreshold on the batch and split writes on its boundary Move isInReorgThreshold onto Batch.t (set at creation from the chain manager) instead of passing it separately into commitBatch. The persistence cycle no longer merges all queued batches blindly. It drains the leading run of processed batches that share isInReorgThreshold and writes only those, leaving the rest for the next write. Entity changes are snapshotted up to the run boundary so a single write never mixes history-saving modes (avoids over-saving history across the threshold transition). https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Make persistence/config required; single-pass batch-run drain - persistence and config are now non-optional fields of InMemoryStore.make. The in-memory-only test helper supplies a shared default persistence. - Drive the write cycle off processedBatches being non-empty, so drainBatchRun is never called with an empty array. - drainBatchRun now splits the run and accumulates checkpoints/progress in a single pass instead of one forEach plus five map+concatMany. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Review fixes: avoid capacity deadlock and remove dead code - awaitCapacity only waits for a commit when there is a queued batch to free capacity. A large rollback diff is staged without a batch, so waiting on it would deadlock; let processing proceed instead. - Remove resetButKeepLatestChanges/resetButKeepLoadedFromDbChanges, dead since the cycle uses snapshotChanges/dropCommittedChanges. Replace the obsolete unit test with one for dropCommittedChanges. - Remove the now-unused chain-metadata throttle env var. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Persist chain metadata on a throttled idle path with delta tracking Stage chain metadata as a per-chain dirty delta computed at stage time via structural comparison instead of a JSON-stringify diff on every write. A batch write folds the delta into its transaction for free; when no batch is flowing, a throttled standalone upsert flushes it, restoring idle freshness while keeping all writes serialized through the single write loop. * Track chain-meta dirtiness with a bool instead of a delta dict setChainMeta writes a single unnest upsert regardless of chain count, so a per-chain delta bought nothing at the db level. Replace dirtyChainMeta with a flag and write a shallow-copied snapshot of the latest per-chain metadata. * Defer Throttler execution to setImmediate Run scheduled functions on the next setImmediate instead of synchronously inside schedule, so work queued before them (e.g. a batch task) runs first. This makes chain-metadata fold into the imminent batch write by default and replaces the startThrottled priming. * Tighten comments; share setImmediate binding via NodeJs Condense the persistence-cycle and chain-metadata comments to one line where they earn it, and move the duplicated setImmediate external into NodeJs so Throttler and GlobalStateManager share a single binding. * Reuse NodeJs.setImmediate in Throttler test; retry timing tests Drop the duplicate setImmediate external from the test and reuse the shared NodeJs binding. Add retry to the two interval-timing tests, matching the others, since deferred execution adds macrotask jitter. --------- Co-authored-by: Claude <noreply@anthropic.com>
There was a problem hiding this comment.
Actionable comments posted: 1
🤖 Prompt for all review comments with AI agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
Inline comments:
In `@packages/envio/src/TestIndexerProxyStorage.res`:
- Around line 141-142: The proxy write function in TestIndexerProxyStorage.res
currently discards the chainMetaData parameter (seen in the anonymous function
handling writes), which can hide metadata-related regressions; update the write
path to either (A) include chainMetaData in the WriteBatch payload by adding it
to the serialized batch object sent to the proxy (preserve the existing
WriteBatch shape and add a chainMetaData field) or (B) fail fast by detecting a
non-null/undefined chainMetaData and throwing/logging an explicit error; adjust
the code around the function that constructs WriteBatch and the proxy write
handler so WriteBatch consumers (and tests) receive the metadata or the call
errors immediately.
🪄 Autofix (Beta)
Fix all unresolved CodeRabbit comments on this PR:
- Push a commit to this branch (recommended)
- Create a new PR with the fixes
ℹ️ Review info
⚙️ Run configuration
Configuration used: Organization UI
Review profile: CHILL
Plan: Pro
Run ID: 0600aab3-02d2-43ef-bce3-84fb3b6b7dbe
📒 Files selected for processing (22)
packages/envio/src/Batch.respackages/envio/src/ChainManager.respackages/envio/src/EventProcessing.respackages/envio/src/GlobalState.respackages/envio/src/GlobalStateManager.respackages/envio/src/InMemoryStore.respackages/envio/src/InMemoryTable.respackages/envio/src/LoadLayer.respackages/envio/src/Main.respackages/envio/src/Persistence.respackages/envio/src/PgStorage.respackages/envio/src/TestIndexerProxyStorage.respackages/envio/src/Throttler.respackages/envio/src/bindings/NodeJs.resscenarios/test_codegen/test/ChainManager_test.resscenarios/test_codegen/test/ChainMeta_test.resscenarios/test_codegen/test/E2E_test.resscenarios/test_codegen/test/EventOrigin_test.resscenarios/test_codegen/test/LoadLayer_test.resscenarios/test_codegen/test/WriteRead_test.resscenarios/test_codegen/test/helpers/MockIndexer.resscenarios/test_codegen/test/lib_tests/Throttler_test.res
| ~chainMetaData as _, | ||
| ) => { |
There was a problem hiding this comment.
Avoid silently dropping chainMetaData in proxy writes.
Line 141 discards chainMetaData, so proxy-mode writes can diverge from real Persistence.storage.writeBatch behavior and hide metadata-related regressions. Either serialize it in WriteBatch payload or fail fast when it is provided.
Suggested fail-fast patch
writeBatch: async (
~batch,
@@
- ~chainMetaData as _,
+ ~chainMetaData,
) => {
+ switch chainMetaData {
+ | Some(_) =>
+ JsError.throwWithMessage(
+ "TestIndexer: chainMetaData is provided but not serialized in proxy WriteBatch payload.",
+ )
+ | None => ()
+ }
// Encode entities to JSON for serialization across worker boundary🤖 Prompt for AI Agents
Verify each finding against current code. Fix only still-valid issues, skip the
rest with a brief reason, keep changes minimal, and validate.
In `@packages/envio/src/TestIndexerProxyStorage.res` around lines 141 - 142, The
proxy write function in TestIndexerProxyStorage.res currently discards the
chainMetaData parameter (seen in the anonymous function handling writes), which
can hide metadata-related regressions; update the write path to either (A)
include chainMetaData in the WriteBatch payload by adding it to the serialized
batch object sent to the proxy (preserve the existing WriteBatch shape and add a
chainMetaData field) or (B) fail fast by detecting a non-null/undefined
chainMetaData and throwing/logging an explicit error; adjust the code around the
function that constructs WriteBatch and the proxy write handler so WriteBatch
consumers (and tests) receive the metadata or the call errors immediately.
* Make in-memory store persistence a standalone background cycle Decouple the database write from batch processing. Processing now only updates the in-memory store and continues; a persistence cycle owned by the in-memory store drains changes to Postgres on its own. - Split the checkpoint pointer into committedCheckpointId (last persisted to db) and processedCheckpointId (in-memory frontier). createBatch keys off processedCheckpointId; history retention still keys off committed. - commitBatch accumulates batch metadata and triggers a single-writer background loop (strictly one write in flight, overlapping processing). - Snapshot rawEvents/effects/entity changes synchronously at write start so the in-memory store is never reset before its changes are committed; effect outputs being written stay readable via a pending dict. - Capacity gate (50k changes) before each batch: drop committed changes, else await a commit. - Drain the cycle before a rollback and flush it before a successful exit. - Serialize chain-metadata writes with batch writes to avoid concurrent updates to the chains table. - MockIndexer awaits the full write (and settles) before returning. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Address PR review on EventProcessing - Await in-memory store capacity before starting the batch timer. - Drop the redundant comment over commitBatch. - Remove db-write duration from processing metrics; the write now happens off the processing path in the in-memory store cycle. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Fold chain-metadata write into the in-memory store cycle Persist chain metadata from the persistence cycle instead of a separate throttled write. Because the cycle is the single db writer, the metadata write no longer races batch writes on the chains table, so the throttler and the serializeDbWrite mutex are both removed. Also make the effect table's pendingDict always present instead of optional, for simplicity. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Write chain metadata via a separate throttler again Revert the in-cycle chain-metadata write back to a throttled, separate setChainMeta, serialized through the store's serializeDbWrite so it never overlaps a background batch write on the chains table. Also replace drainForRollback with flush - awaiting the write cycle already drains all pending batches, so the explicit resets were redundant. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Fold chain metadata into the batch write; tidy store fields - Persist chain metadata as part of the batch write transaction instead of a separate throttled write. The store keeps current vs committed metadata and only the stale per-chain diff is folded into writeBatch, so metadata never races the batch write and the throttler/serializeDbWrite are gone. - Make persistence and config immutable creation params of InMemoryStore instead of mutable fields set per batch. - Stop the ProcessEventBatch loop once an exit is decided, so the async exit flush doesn't let further batches process (fixes the auto-exit smoke test processing past the first event block). https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Carry isInReorgThreshold on the batch and split writes on its boundary Move isInReorgThreshold onto Batch.t (set at creation from the chain manager) instead of passing it separately into commitBatch. The persistence cycle no longer merges all queued batches blindly. It drains the leading run of processed batches that share isInReorgThreshold and writes only those, leaving the rest for the next write. Entity changes are snapshotted up to the run boundary so a single write never mixes history-saving modes (avoids over-saving history across the threshold transition). https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Make persistence/config required; single-pass batch-run drain - persistence and config are now non-optional fields of InMemoryStore.make. The in-memory-only test helper supplies a shared default persistence. - Drive the write cycle off processedBatches being non-empty, so drainBatchRun is never called with an empty array. - drainBatchRun now splits the run and accumulates checkpoints/progress in a single pass instead of one forEach plus five map+concatMany. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Review fixes: avoid capacity deadlock and remove dead code - awaitCapacity only waits for a commit when there is a queued batch to free capacity. A large rollback diff is staged without a batch, so waiting on it would deadlock; let processing proceed instead. - Remove resetButKeepLatestChanges/resetButKeepLoadedFromDbChanges, dead since the cycle uses snapshotChanges/dropCommittedChanges. Replace the obsolete unit test with one for dropCommittedChanges. - Remove the now-unused chain-metadata throttle env var. https://claude.ai/code/session_01TuuFyaX6X8RzzDK2v6gfAt * Raise capacity limit to 100k incl. batch items; surface write errors via onError - keepLatestChangesLimit 50k -> 100k, now counts queued batch items alongside entity changes so a low-entity/high-item workload can't outrun persistence. - InMemoryStore.make takes a required onError callback; a failed background write reports through it immediately instead of being thrown at the next batch's awaitCapacity. Main wires it to dispatch ErrorExit. - awaitCapacity/flush no longer rethrow persistenceError; they stop draining since onError owns surfacing the failure. * Replace persistenceError option<exn> with a hasFailedWrite bool The stored exn was never read back - it's handed straight to onError at the failure site. The field only gates the write loop, so a plain bool says what it is. * Surface unexpected writes from in-memory-only test store instead of ignoring These stores never run the persistence cycle, so onError firing means a test is wired wrong - log and raise rather than swallow it. * Route fatal errors through a single onError handler Hold one onError callback (log + exit) on GlobalState and share it with the in-memory store. The store calls it directly on a background write failure instead of dispatching ErrorExit, and the ErrorExit action delegates to the same callback rather than inlining its own log + exit. * Tighten comments * Pass required onError to InMemoryStore in ChainMeta_test The merged store signature makes onError required; the in-memory test store raises on any unexpected persistence write. https://claude.ai/code/session_01Taw9xnp2tLPUvHiW1BSumS --------- Co-authored-by: Claude <noreply@anthropic.com>
* Build raw_events in PgStorage from batch items Move raw event row construction out of the per-event processing path and into PgStorage.writeBatch, which now derives the rows by iterating the batch items being written. Carry batch items through drainBatchRun so they reach the write, and drop the rawEvents accumulator and the ~rawEvents parameter from the storage interface. * Cover raw_events in the e2e indexer test Enable `raw_events: true` in the e2e_test config and assert the indexer writes one raw_events row per processed event, with the decoded params, src address and transaction fields matching the known first event. * Sanitize NUL bytes in raw_events writes (#1195) A NUL byte in event params made the raw_events jsonb INSERT fail with 22P05, poisoning the batch transaction and aborting unrelated entity writes. Route the raw_events write through the same escape-and-retry path used for entities: on a Postgres encoding error, escape the offending table and retry. The stripper now recurses into nested objects/arrays so a NUL buried inside an event param object (or a json entity field) is removed, and the classifier also recognizes the jsonb-specific error message in addition to the text-column one. --------- Co-authored-by: Claude <noreply@anthropic.com>
…1279) * Track effect cache entries by checkpoint for commit-gated eviction Store each effect cache entry as a Change stamped with the per-item checkpointId (mirroring entity changes) instead of a raw output in a dict that was wiped after every write. Committed entries are now reclaimed by dropCommittedEffects in awaitCapacity, and effect entries count toward the in-memory changes limit. cache:false outputs are stored in memory but not persisted, and are evictable (re-run on a later miss). Removes pendingDict and the per-write dict swap. Make the changes limit configurable via ENVIO_MAX_IN_MEMORY_CHANGES. Known open item: E2E "Track effects in prom metrics" fails. It swaps an effect's output schema mid-run (no restart) and expects the warm in-memory entry to be re-validated/invalidated. Under the new model committed entries stay warm and are only re-validated on a db reload (i.e. across restarts, the real-world schema-change path). Pending a decision on adapting the test. * Test effect-cache schema invalidation via the single restart Under commit-gated eviction a committed effect entry stays warm in memory, so a mid-run output-schema change isn't re-validated. Schema changes are code changes that take effect on restart, where the db cache is reloaded and re-validated. Restructure the test so both cache entries are written before the existing restart, then exercise the new schema in the post-restart batch (avoids a second restart, which collides on the checkpoint pkey). * Rename env to ENVIO_IN_MEMORY_OBJECTS_TARGET; inline mapChangeToEffectOutput * Evict committed changes before db-loaded ones in awaitCapacity Tiered backpressure: drop our committed writes first (cheap to re-derive), then db-loaded entries, and only then wait for a commit. Applies to both entity and effect tables via keepLoadedFromDb. * Fix doc comment placement for dropCommitted/awaitCapacity --------- Co-authored-by: Claude <noreply@anthropic.com>
Summary
Refactors batch persistence to fire asynchronously and return control immediately when entities remain in memory, enabling concurrent processing of the next batch while the previous write lands in the database. This improves throughput by overlapping I/O with computation.
Key Changes
Split
writeBatchinto two phases:prepareWriteBatchsynchronously snapshots the batch for writing and resets the store, whilewriteBatchfires the write concurrently when safe to do so.Track in-flight writes: Added
pendingPersistencefield to store the promise and effects snapshot of a write that has been fired but not yet awaited. This allows the next batch to serve effect cache hits from the in-flight snapshot instead of recomputing or reading uncommitted DB rows.Conditional awaiting: Only await the write immediately if entities were dropped from memory (forcing a DB read on the next batch). Otherwise, fire the write and return control, deferring the await until the next batch's
flushPendingPersistencecall or an operation that requires committed state (rollback, exit).Flush before committed reads: Added
flushPendingPersistencecalls before any operation that reads committed state from the database (rollback, exit, restart) to ensure the in-flight write lands first.Effect cache read-through: Updated
loadEffectto serve cache hits from the in-flight batch's effects snapshot before querying the database, avoiding redundant loads of not-yet-committed rows.Test updates: Adjusted E2E test expectations to reflect that
isRealtimenow flips before the firstwaitForNewBlock(since batch writes no longer blockEventBatchProcessed), and updated mock indexer to await in-flight writes when asserting DB state.Implementation Details
pendingPersistencespecifically to enable read-through cache hits for the next batch, avoiding the need to recompute or query the database for rows that are in-flight.flushPendingPersistenceensures writes land in order.https://claude.ai/code/session_01VEEEfkaYzNwoeb1iuqm9A1
Summary by CodeRabbit
New Features
Bug Fixes
Performance
Tests