diff --git a/packages/envio/src/Env.res b/packages/envio/src/Env.res index 3396c63d5..c5dae16f4 100644 --- a/packages/envio/src/Env.res +++ b/packages/envio/src/Env.res @@ -11,6 +11,12 @@ let maxAddrInPartition = envSafe->EnvSafe.get("MAX_PARTITION_SIZE", S.int, ~fall let maxPartitionConcurrency = envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10) +// Target number of in-memory objects (uncommitted entity/effect changes plus +// unwritten batch items) the store holds before processing waits for the write +// cycle to catch up. +let inMemoryObjectsTarget = + envSafe->EnvSafe.get("ENVIO_IN_MEMORY_OBJECTS_TARGET", S.int, ~fallback=100_000)->Belt.Int.toFloat + // FIXME: This broke HS grafana dashboard. Should investigate it later. Maybe we should use :: as a default value? // We want to be able to set it to 0.0.0.0 // to allow to passthrough the port from a Docker container diff --git a/packages/envio/src/InMemoryStore.res b/packages/envio/src/InMemoryStore.res index 6fc582711..72f5911db 100644 --- a/packages/envio/src/InMemoryStore.res +++ b/packages/envio/src/InMemoryStore.res @@ -21,12 +21,15 @@ module EntityTables = { } type effectCacheInMemTable = { + // Cache keys whose handler output is persisted on the next write. Drained + // each write; eviction is driven by the per-entry checkpointId instead. mutable idsToStore: array, mutable invalidationsCount: int, - mutable dict: dict, - // The live dict swapped here while its write is in flight, kept readable so - // reads stay warm until the write completes (see snapshotEffects). - mutable pendingDict: dict, + // Each entry is stamped with the checkpoint that referenced it (or + // loadedFromDbCheckpointId for db reads), so committed entries can be + // dropped once persisted/re-derivable, mirroring entity changes. + mutable dict: dict>, + mutable changesCount: float, effect: Internal.effect, } @@ -100,9 +103,9 @@ let make = ( } } -// Max uncommitted entity changes plus unwritten batch items before processing -// must wait for the cycle to free capacity. -let keepLatestChangesLimit = 100_000. +// Max uncommitted entity/effect changes plus unwritten batch items before +// processing must wait for the cycle to free capacity. +let keepLatestChangesLimit = Env.inMemoryObjectsTarget let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => { let key = effect.name @@ -112,7 +115,7 @@ let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => { let table = { idsToStore: [], dict: Dict.make(), - pendingDict: Dict.make(), + changesCount: 0., invalidationsCount: 0, effect, } @@ -121,13 +124,64 @@ let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => { } } -// Effect cache read that also consults the in-flight write's pending values. let getEffectOutput = (inMemTable: effectCacheInMemTable, key) => switch inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(key) { - | Some(_) as found => found - | None => inMemTable.pendingDict->Utils.Dict.dangerouslyGetNonOption(key) + | Some(Set({entity: output})) => Some(output) + | Some(Delete(_)) | None => None } +// Records a handler output. Persisted on the next write only when shouldCache; +// otherwise kept in memory (re-run on a later miss) but never written to the db. +let setEffectOutput = ( + inMemTable: effectCacheInMemTable, + ~checkpointId, + ~cacheKey, + ~output, + ~shouldCache, +) => { + switch inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(cacheKey) { + | Some(_) => () + | None => inMemTable.changesCount = inMemTable.changesCount +. 1. + } + inMemTable.dict->Dict.set(cacheKey, Set({entityId: cacheKey, entity: output, checkpointId})) + if shouldCache { + inMemTable.idsToStore->Array.push(cacheKey)->ignore + } +} + +// Seeds an entry from a db read. Stamped with loadedFromDbCheckpointId so it's +// always droppable (re-readable from the db) and never re-persisted. +let initEffectOutputFromDb = (inMemTable: effectCacheInMemTable, ~cacheKey, ~output) => + if inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(cacheKey)->Option.isNone { + inMemTable.changesCount = inMemTable.changesCount +. 1. + inMemTable.dict->Dict.set( + cacheKey, + Set({entityId: cacheKey, entity: output, checkpointId: Internal.loadedFromDbCheckpointId}), + ) + } + +// Frees committed entries (re-readable from the db, or re-runnable for +// cache:false). Uncommitted entries stay warm. With keepLoadedFromDb, entries +// seeded from a db read are spared. Mirrors entity dropCommittedChanges. +let dropCommittedEffects = ( + inMemTable: effectCacheInMemTable, + ~committedCheckpointId, + ~keepLoadedFromDb, +) => { + let keysToDelete = [] + inMemTable.dict->Utils.Dict.forEachWithKey((change, key) => { + let checkpointId = change->Change.getCheckpointId + if ( + !(checkpointId > committedCheckpointId) && + !(keepLoadedFromDb && checkpointId == Internal.loadedFromDbCheckpointId) + ) { + keysToDelete->Array.push(key) + } + }) + keysToDelete->Array.forEach(key => inMemTable.dict->Utils.Dict.deleteInPlace(key)) + inMemTable.changesCount = inMemTable.changesCount -. keysToDelete->Array.length->Int.toFloat +} + let getInMemTable = ( inMemoryStore: t, ~entityConfig: Internal.entityConfig, @@ -142,6 +196,9 @@ let getChangesCount = (inMemoryStore: t) => { inMemoryStore.allEntities->Array.forEach(entityConfig => { total := total.contents +. (inMemoryStore->getInMemTable(~entityConfig)).changesCount }) + inMemoryStore.effects->Utils.Dict.forEach(inMemTable => { + total := total.contents +. inMemTable.changesCount + }) inMemoryStore.processedBatches->Array.forEach(batch => { total := total.contents +. batch.totalBatchSize->Int.toFloat }) @@ -207,8 +264,8 @@ let drainBatchRun = (inMemoryStore: t): Batch.t => { } } -// Captures the effects to persist, parking each live dict in pendingDict so -// reads stay warm during the write, then starts fresh live dicts. +// Captures the cache:true outputs to persist. The dict is left intact — entries +// stay warm and are reclaimed later by dropCommittedEffects once committed. let snapshotEffects = (inMemoryStore: t, ~cache): array => { let acc = [] inMemoryStore.effects->Utils.Dict.forEach(inMemTable => { @@ -216,10 +273,12 @@ let snapshotEffects = (inMemoryStore: t, ~cache): array () | ids => - let items = ids->Array.map((id): Internal.effectCacheItem => { - id, - output: dict->Dict.getUnsafe(id), - }) + let items = ids->Array.filterMap((id): option => + switch dict->Dict.getUnsafe(id) { + | Set({entity: output}) => Some({id, output}) + | Delete(_) => None + } + ) let effectName = effect.name let effectCacheRecord = switch cache->Utils.Dict.dangerouslyGetNonOption(effectName) { | Some(c) => c @@ -233,8 +292,6 @@ let snapshotEffects = (inMemoryStore: t, ~cache): arrayArray.push(({effect, items, shouldInitialize}: Persistence.updatedEffectCache))->ignore } - inMemTable.pendingDict = dict - inMemTable.dict = Dict.make() inMemTable.idsToStore = [] inMemTable.invalidationsCount = 0 }) @@ -302,7 +359,6 @@ let runOneWrite = async (inMemoryStore: t, ~persistence: Persistence.t, ~config) ) inMemoryStore.committedCheckpointId = upToCheckpointId - inMemoryStore.effects->Utils.Dict.forEach(inMemTable => inMemTable.pendingDict = Dict.make()) } } @@ -376,23 +432,38 @@ let commitBatch = (inMemoryStore: t, ~batch: Batch.t) => { inMemoryStore->kick } +// Drops committed entity and effect entries across all tables. With +// keepLoadedFromDb, entries seeded from a db read are spared. +let dropCommitted = (inMemoryStore: t, ~keepLoadedFromDb) => { + let committedCheckpointId = inMemoryStore.committedCheckpointId + inMemoryStore.allEntities->Array.forEach(entityConfig => + inMemoryStore + ->getInMemTable(~entityConfig) + ->InMemoryTable.Entity.dropCommittedChanges(~committedCheckpointId, ~keepLoadedFromDb) + ) + inMemoryStore.effects->Utils.Dict.forEach(inMemTable => + inMemTable->dropCommittedEffects(~committedCheckpointId, ~keepLoadedFromDb) + ) +} + // Blocks until the store holds fewer than keepLatestChangesLimit changes, // freeing committed changes first and awaiting commits as a last resort. let rec awaitCapacity = async (inMemoryStore: t) => { // After a failed write nothing will free capacity, so bail instead of waiting // on a commit that won't come (the error already went to onError). if !inMemoryStore.hasFailedWrite && inMemoryStore->getChangesCount >= keepLatestChangesLimit { - inMemoryStore.allEntities->Array.forEach(entityConfig => - inMemoryStore - ->getInMemTable(~entityConfig) - ->InMemoryTable.Entity.dropCommittedChanges( - ~committedCheckpointId=inMemoryStore.committedCheckpointId, - ) - ) + // Drop committed writes first, sparing db-loaded entries (explicitly + // requested, so likelier to be read again). + inMemoryStore->dropCommitted(~keepLoadedFromDb=true) + + // Still over: drop the db-loaded entries too. + if inMemoryStore->getChangesCount >= keepLatestChangesLimit { + inMemoryStore->dropCommitted(~keepLoadedFromDb=false) + } - // What's left is uncommitted. Only wait if a queued batch can free it; - // otherwise (e.g. a large rollback diff with no batch) waiting would - // deadlock, so let processing proceed. + // Still over: what's left is uncommitted. Only wait if a queued batch can + // free it; otherwise (e.g. a large rollback diff with no batch) waiting + // would deadlock, so let processing proceed. if ( inMemoryStore->getChangesCount >= keepLatestChangesLimit && inMemoryStore.processedBatches->Utils.Array.notEmpty diff --git a/packages/envio/src/InMemoryTable.res b/packages/envio/src/InMemoryTable.res index 7b778d888..4790a29e3 100644 --- a/packages/envio/src/InMemoryTable.res +++ b/packages/envio/src/InMemoryTable.res @@ -84,14 +84,19 @@ module Entity = { // Frees committed changes: drops latest entries at or below committedCheckpointId // (re-readable from the db) and clears the per-batch indices (rebuilt on the next - // getWhere). Uncommitted changes are kept. - let dropCommittedChanges = (self: t, ~committedCheckpointId) => { + // getWhere). Uncommitted changes are kept. With keepLoadedFromDb, entries seeded + // from a db read are spared so the cheaper-to-re-derive writes are dropped first. + let dropCommittedChanges = (self: t, ~committedCheckpointId, ~keepLoadedFromDb) => { let keysToDelete = [] - self.latestEntityChangeById->Utils.Dict.forEachWithKey((change, key) => - if !(change->Change.getCheckpointId > committedCheckpointId) { + self.latestEntityChangeById->Utils.Dict.forEachWithKey((change, key) => { + let checkpointId = change->Change.getCheckpointId + if ( + !(checkpointId > committedCheckpointId) && + !(keepLoadedFromDb && checkpointId == Internal.loadedFromDbCheckpointId) + ) { keysToDelete->Array.push(key) } - ) + }) keysToDelete->Array.forEach(key => self.latestEntityChangeById->Utils.Dict.deleteInPlace(key)) self.changesCount = self.changesCount -. keysToDelete->Array.length->Int.toFloat self.indicesByEntityId = Dict.make() diff --git a/packages/envio/src/Internal.res b/packages/envio/src/Internal.res index 4d7a8fed7..069fbd957 100644 --- a/packages/envio/src/Internal.res +++ b/packages/envio/src/Internal.res @@ -537,6 +537,9 @@ type effectArgs = { input: effectInput, context: effectContext, cacheKey: string, + // The processing checkpoint that referenced this effect; stamped on the + // in-memory cache entry so it's evicted once the checkpoint commits. + checkpointId: bigint, } type effectCacheItem = {id: string, output: effectOutput} type effectCacheStorageMeta = { diff --git a/packages/envio/src/LoadLayer.res b/packages/envio/src/LoadLayer.res index 0ab995fa1..9706a31f8 100644 --- a/packages/envio/src/LoadLayer.res +++ b/packages/envio/src/LoadLayer.res @@ -89,10 +89,12 @@ let callEffect = ( effect.handler(arg) ->Promise.thenResolve(output => { - inMemTable.dict->Dict.set(arg.cacheKey, output) - if arg.context.cache { - inMemTable.idsToStore->Array.push(arg.cacheKey)->ignore - } + inMemTable->InMemoryStore.setEffectOutput( + ~checkpointId=arg.checkpointId, + ~cacheKey=arg.cacheKey, + ~output, + ~shouldCache=arg.context.cache, + ) }) ->Utils.Promise.catchResolve(exn => { onError(~inputKey=arg.cacheKey, ~exn) @@ -283,7 +285,7 @@ let loadEffect = ( try { let output = dbEntity.output->S.parseOrThrow(outputSchema) idsFromCache->Utils.Set.add(dbEntity.id)->ignore - inMemTable.dict->Dict.set(dbEntity.id, output) + inMemTable->InMemoryStore.initEffectOutputFromDb(~cacheKey=dbEntity.id, ~output) } catch { | S.Raised(error) => inMemTable.invalidationsCount = inMemTable.invalidationsCount + 1 diff --git a/packages/envio/src/UserContext.res b/packages/envio/src/UserContext.res index 338453237..d99b5f6ac 100644 --- a/packages/envio/src/UserContext.res +++ b/packages/envio/src/UserContext.res @@ -55,6 +55,7 @@ let initEffect = (params: contextParams) => { input, context: effectContext, cacheKey: input->S.reverseConvertOrThrow(effect.input)->Utils.Hash.makeOrThrow, + checkpointId: params.checkpointId, } LoadLayer.loadEffect( ~loadManager=params.loadManager, diff --git a/scenarios/test_codegen/test/E2E_test.res b/scenarios/test_codegen/test/E2E_test.res index 1140e3780..9a1477814 100644 --- a/scenarios/test_codegen/test/E2E_test.res +++ b/scenarios/test_codegen/test/E2E_test.res @@ -310,7 +310,12 @@ describe("E2E tests", () => { logIndex: 0, handler: async ({context}) => { t.expect(await context.effect(testEffect, "test")).toEqual("test-output") - t.expect(await context.effect(testEffectWithCache, "test")).toEqual("test-output") + t.expect( + await Promise.all2(( + context.effect(testEffectWithCache, "test"), + context.effect(testEffectWithCache, "test-2"), + )), + ).toEqual(("test-output", "test-2-output")) }, }, ], @@ -327,7 +332,7 @@ describe("E2E tests", () => { labels: Dict.fromArray([("effect", "testEffect")]), }, { - value: "1", + value: "2", labels: Dict.fromArray([("effect", "testEffectWithCache")]), }, ]) @@ -336,7 +341,7 @@ describe("E2E tests", () => { ~message="should increment effect cache count", ).toEqual([ { - value: "1", + value: "2", labels: Dict.fromArray([("effect", "testEffectWithCache")]), }, ]) @@ -346,8 +351,11 @@ describe("E2E tests", () => { ).toEqual([]) t.expect( await indexerMock.queryEffectCache("testEffectWithCache"), - ~message="should have the cache entry in db", - ).toEqual([{"id": `"test"`, "output": %raw(`"test-output"`)}]) + ~message="should have the cache entries in db", + ).toEqual([ + {"id": `"test"`, "output": %raw(`"test-output"`)}, + {"id": `"test-2"`, "output": %raw(`"test-2-output"`)}, + ]) let indexerMock = await indexerMock.restart() await Utils.delay(0) @@ -361,11 +369,34 @@ describe("E2E tests", () => { ~message="should resume effect cache count on restart", ).toEqual([ { - value: "1", + value: "2", labels: Dict.fromArray([("effect", "testEffectWithCache")]), }, ]) + // A changed effect output schema is a code change, so it only takes effect + // after a restart. The restart clears the warm in-memory cache, so the db + // entries are reloaded and re-validated against the new schema. "test-output" + // fails the new schema and is recomputed; "test-2-output" passes and is kept. + let testEffectWithCacheV2 = Envio.createEffect( + { + name: "testEffectWithCache", + input: S.string, + output: S.string->S.refine( + s => + v => + if !(v->String.includes("2")) { + s.fail(`Expected to include '2', got ${v}`) + }, + ), + rateLimit: Disable, + cache: true, + }, + async ({input}) => { + input ++ "-output-v2" + }, + ) + sourceMock.resolveGetHeightOrThrow(300) await Utils.delay(0) await Utils.delay(0) @@ -377,10 +408,10 @@ describe("E2E tests", () => { handler: async ({context}) => { t.expect( await Promise.all2(( - context.effect(testEffectWithCache, "test"), - context.effect(testEffectWithCache, "test-2"), + context.effect(testEffectWithCacheV2, "test"), + context.effect(testEffectWithCacheV2, "test-2"), )), - ).toEqual(("test-output", "test-2-output")) + ).toEqual(("test-output-v2", "test-2-output")) }, }, ], @@ -407,7 +438,7 @@ describe("E2E tests", () => { ], [ { - value: "1", + value: "2", labels: Dict.fromArray([ ("operation", "testEffectWithCache.effect"), ("storage", "postgres"), @@ -429,7 +460,7 @@ describe("E2E tests", () => { indexerMock.metric("envio_effect_call_total"), indexerMock.metric("envio_effect_cache"), )), - ~message="Should increment effect calls count and cache count", + ~message="Should recompute the invalidated entry and keep the cache count", ).toEqual(( [ { @@ -445,44 +476,6 @@ describe("E2E tests", () => { ], )) - let testEffectWithCacheV2 = Envio.createEffect( - { - name: "testEffectWithCache", - input: S.string, - output: S.string->S.refine( - s => - v => - if !(v->String.includes("2")) { - s.fail(`Expected to include '2', got ${v}`) - }, - ), - rateLimit: Disable, - cache: true, - }, - async ({input}) => { - input ++ "-output-v2" - }, - ) - - sourceMock.resolveGetItemsOrThrow( - [ - { - blockNumber: 102, - logIndex: 0, - handler: async ({context}) => { - t.expect( - await Promise.all2(( - context.effect(testEffectWithCacheV2, "test"), - context.effect(testEffectWithCacheV2, "test-2"), - )), - ).toEqual(("test-output-v2", "test-2-output")) - }, - }, - ], - ~latestFetchedBlockNumber=102, - ) - await indexerMock.getBatchWritePromise() - t.expect( await indexerMock.queryEffectCache("testEffectWithCache"), ~message="Should invalidate loaded cache and store new one", @@ -490,15 +483,6 @@ describe("E2E tests", () => { {"id": `"test-2"`, "output": %raw(`"test-2-output"`)}, {"id": `"test"`, "output": %raw(`"test-output-v2"`)}, ]) - t.expect( - await indexerMock.metric("envio_effect_cache"), - ~message="Shouldn't increment on invalidation", - ).toEqual([ - { - value: "2", - labels: Dict.fromArray([("effect", "testEffectWithCache")]), - }, - ]) }) // Reproduction for https://github.com/enviodev/hyperindex/issues/1173 diff --git a/scenarios/test_codegen/test/WriteRead_test.res b/scenarios/test_codegen/test/WriteRead_test.res index 77a13429c..f1ffa29f6 100644 --- a/scenarios/test_codegen/test/WriteRead_test.res +++ b/scenarios/test_codegen/test/WriteRead_test.res @@ -241,7 +241,7 @@ breaking precicion on big values. https://github.com/enviodev/hyperindex/issues/ add("committed", 5n) add("uncommitted", 6n) - table->InMemoryTable.Entity.dropCommittedChanges(~committedCheckpointId=5n) + table->InMemoryTable.Entity.dropCommittedChanges(~committedCheckpointId=5n, ~keepLoadedFromDb=false) t.expect(( table.changesCount, @@ -249,6 +249,28 @@ breaking precicion on big values. https://github.com/enviodev/hyperindex/issues/ )).toEqual((1., ["uncommitted"])) }) + it("dropCommittedChanges with keepLoadedFromDb spares db-loaded entries", t => { + let makeEntity = (id): Internal.entity => + {"id": id}->(Utils.magic: {"id": string} => Internal.entity) + + let table = InMemoryTable.Entity.make() + let add = (id, checkpointId) => + table->InMemoryTable.Entity.set( + ~committedCheckpointId=Internal.initialCheckpointId, + Set({entityId: id, entity: makeEntity(id), checkpointId}), + ) + add("loaded", Internal.loadedFromDbCheckpointId) + add("committed", 5n) + add("uncommitted", 6n) + + table->InMemoryTable.Entity.dropCommittedChanges(~committedCheckpointId=5n, ~keepLoadedFromDb=true) + + t.expect(( + table.changesCount, + table.latestEntityChangeById->Dict.keysToArray->Array.toSorted(String.compare), + )).toEqual((2., ["loaded", "uncommitted"])) + }) + Async.it("Test getWhere queries with eq and gt operators", async t => { let sourceMock = MockIndexer.Source.make(~chain=#1337, [#getHeightOrThrow, #getItemsOrThrow]) let indexerMock = await MockIndexer.Indexer.make(