Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
6 changes: 6 additions & 0 deletions packages/envio/src/Env.res
Original file line number Diff line number Diff line change
Expand Up @@ -11,6 +11,12 @@ let maxAddrInPartition = envSafe->EnvSafe.get("MAX_PARTITION_SIZE", S.int, ~fall
let maxPartitionConcurrency =
envSafe->EnvSafe.get("ENVIO_MAX_PARTITION_CONCURRENCY", S.int, ~fallback=10)

// Target number of in-memory objects (uncommitted entity/effect changes plus
// unwritten batch items) the store holds before processing waits for the write
// cycle to catch up.
let inMemoryObjectsTarget =
envSafe->EnvSafe.get("ENVIO_IN_MEMORY_OBJECTS_TARGET", S.int, ~fallback=100_000)->Belt.Int.toFloat

// FIXME: This broke HS grafana dashboard. Should investigate it later. Maybe we should use :: as a default value?
// We want to be able to set it to 0.0.0.0
// to allow to passthrough the port from a Docker container
Expand Down
131 changes: 101 additions & 30 deletions packages/envio/src/InMemoryStore.res
Original file line number Diff line number Diff line change
Expand Up @@ -21,12 +21,15 @@ module EntityTables = {
}

type effectCacheInMemTable = {
// Cache keys whose handler output is persisted on the next write. Drained
// each write; eviction is driven by the per-entry checkpointId instead.
mutable idsToStore: array<string>,
mutable invalidationsCount: int,
mutable dict: dict<Internal.effectOutput>,
// The live dict swapped here while its write is in flight, kept readable so
// reads stay warm until the write completes (see snapshotEffects).
mutable pendingDict: dict<Internal.effectOutput>,
// Each entry is stamped with the checkpoint that referenced it (or
// loadedFromDbCheckpointId for db reads), so committed entries can be
// dropped once persisted/re-derivable, mirroring entity changes.
mutable dict: dict<Change.t<Internal.effectOutput>>,
mutable changesCount: float,
effect: Internal.effect,
}

Expand Down Expand Up @@ -100,9 +103,9 @@ let make = (
}
}

// Max uncommitted entity changes plus unwritten batch items before processing
// must wait for the cycle to free capacity.
let keepLatestChangesLimit = 100_000.
// Max uncommitted entity/effect changes plus unwritten batch items before
// processing must wait for the cycle to free capacity.
let keepLatestChangesLimit = Env.inMemoryObjectsTarget

let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => {
let key = effect.name
Expand All @@ -112,7 +115,7 @@ let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => {
let table = {
idsToStore: [],
dict: Dict.make(),
pendingDict: Dict.make(),
changesCount: 0.,
invalidationsCount: 0,
effect,
}
Expand All @@ -121,13 +124,64 @@ let getEffectInMemTable = (inMemoryStore: t, ~effect: Internal.effect) => {
}
}

// Effect cache read that also consults the in-flight write's pending values.
let getEffectOutput = (inMemTable: effectCacheInMemTable, key) =>
switch inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(key) {
| Some(_) as found => found
| None => inMemTable.pendingDict->Utils.Dict.dangerouslyGetNonOption(key)
| Some(Set({entity: output})) => Some(output)
| Some(Delete(_)) | None => None
}

// Records a handler output. Persisted on the next write only when shouldCache;
// otherwise kept in memory (re-run on a later miss) but never written to the db.
let setEffectOutput = (
inMemTable: effectCacheInMemTable,
~checkpointId,
~cacheKey,
~output,
~shouldCache,
) => {
switch inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(cacheKey) {
| Some(_) => ()
| None => inMemTable.changesCount = inMemTable.changesCount +. 1.
}
inMemTable.dict->Dict.set(cacheKey, Set({entityId: cacheKey, entity: output, checkpointId}))
if shouldCache {
inMemTable.idsToStore->Array.push(cacheKey)->ignore
}
}

// Seeds an entry from a db read. Stamped with loadedFromDbCheckpointId so it's
// always droppable (re-readable from the db) and never re-persisted.
let initEffectOutputFromDb = (inMemTable: effectCacheInMemTable, ~cacheKey, ~output) =>
if inMemTable.dict->Utils.Dict.dangerouslyGetNonOption(cacheKey)->Option.isNone {
inMemTable.changesCount = inMemTable.changesCount +. 1.
inMemTable.dict->Dict.set(
cacheKey,
Set({entityId: cacheKey, entity: output, checkpointId: Internal.loadedFromDbCheckpointId}),
)
}

// Frees committed entries (re-readable from the db, or re-runnable for
// cache:false). Uncommitted entries stay warm. With keepLoadedFromDb, entries
// seeded from a db read are spared. Mirrors entity dropCommittedChanges.
let dropCommittedEffects = (
inMemTable: effectCacheInMemTable,
~committedCheckpointId,
~keepLoadedFromDb,
) => {
let keysToDelete = []
inMemTable.dict->Utils.Dict.forEachWithKey((change, key) => {
let checkpointId = change->Change.getCheckpointId
if (
!(checkpointId > committedCheckpointId) &&
!(keepLoadedFromDb && checkpointId == Internal.loadedFromDbCheckpointId)
) {
keysToDelete->Array.push(key)
}
})
keysToDelete->Array.forEach(key => inMemTable.dict->Utils.Dict.deleteInPlace(key))
inMemTable.changesCount = inMemTable.changesCount -. keysToDelete->Array.length->Int.toFloat
}

let getInMemTable = (
inMemoryStore: t,
~entityConfig: Internal.entityConfig,
Expand All @@ -142,6 +196,9 @@ let getChangesCount = (inMemoryStore: t) => {
inMemoryStore.allEntities->Array.forEach(entityConfig => {
total := total.contents +. (inMemoryStore->getInMemTable(~entityConfig)).changesCount
})
inMemoryStore.effects->Utils.Dict.forEach(inMemTable => {
total := total.contents +. inMemTable.changesCount
})
inMemoryStore.processedBatches->Array.forEach(batch => {
total := total.contents +. batch.totalBatchSize->Int.toFloat
})
Expand Down Expand Up @@ -207,19 +264,21 @@ let drainBatchRun = (inMemoryStore: t): Batch.t => {
}
}

// Captures the effects to persist, parking each live dict in pendingDict so
// reads stay warm during the write, then starts fresh live dicts.
// Captures the cache:true outputs to persist. The dict is left intact — entries
// stay warm and are reclaimed later by dropCommittedEffects once committed.
let snapshotEffects = (inMemoryStore: t, ~cache): array<Persistence.updatedEffectCache> => {
let acc = []
inMemoryStore.effects->Utils.Dict.forEach(inMemTable => {
let {idsToStore, dict, effect, invalidationsCount} = inMemTable
switch idsToStore {
| [] => ()
| ids =>
let items = ids->Array.map((id): Internal.effectCacheItem => {
id,
output: dict->Dict.getUnsafe(id),
})
let items = ids->Array.filterMap((id): option<Internal.effectCacheItem> =>
switch dict->Dict.getUnsafe(id) {
| Set({entity: output}) => Some({id, output})
| Delete(_) => None
}
)
let effectName = effect.name
let effectCacheRecord = switch cache->Utils.Dict.dangerouslyGetNonOption(effectName) {
| Some(c) => c
Expand All @@ -233,8 +292,6 @@ let snapshotEffects = (inMemoryStore: t, ~cache): array<Persistence.updatedEffec
Prometheus.EffectCacheCount.set(~count=effectCacheRecord.count, ~effectName)
acc->Array.push(({effect, items, shouldInitialize}: Persistence.updatedEffectCache))->ignore
}
inMemTable.pendingDict = dict
inMemTable.dict = Dict.make()
inMemTable.idsToStore = []
inMemTable.invalidationsCount = 0
})
Expand Down Expand Up @@ -302,7 +359,6 @@ let runOneWrite = async (inMemoryStore: t, ~persistence: Persistence.t, ~config)
)

inMemoryStore.committedCheckpointId = upToCheckpointId
inMemoryStore.effects->Utils.Dict.forEach(inMemTable => inMemTable.pendingDict = Dict.make())
}
}

Expand Down Expand Up @@ -376,23 +432,38 @@ let commitBatch = (inMemoryStore: t, ~batch: Batch.t) => {
inMemoryStore->kick
}

// Drops committed entity and effect entries across all tables. With
// keepLoadedFromDb, entries seeded from a db read are spared.
let dropCommitted = (inMemoryStore: t, ~keepLoadedFromDb) => {
let committedCheckpointId = inMemoryStore.committedCheckpointId
inMemoryStore.allEntities->Array.forEach(entityConfig =>
inMemoryStore
->getInMemTable(~entityConfig)
->InMemoryTable.Entity.dropCommittedChanges(~committedCheckpointId, ~keepLoadedFromDb)
)
inMemoryStore.effects->Utils.Dict.forEach(inMemTable =>
inMemTable->dropCommittedEffects(~committedCheckpointId, ~keepLoadedFromDb)
)
}

// Blocks until the store holds fewer than keepLatestChangesLimit changes,
// freeing committed changes first and awaiting commits as a last resort.
let rec awaitCapacity = async (inMemoryStore: t) => {
// After a failed write nothing will free capacity, so bail instead of waiting
// on a commit that won't come (the error already went to onError).
if !inMemoryStore.hasFailedWrite && inMemoryStore->getChangesCount >= keepLatestChangesLimit {
inMemoryStore.allEntities->Array.forEach(entityConfig =>
inMemoryStore
->getInMemTable(~entityConfig)
->InMemoryTable.Entity.dropCommittedChanges(
~committedCheckpointId=inMemoryStore.committedCheckpointId,
)
)
// Drop committed writes first, sparing db-loaded entries (explicitly
// requested, so likelier to be read again).
inMemoryStore->dropCommitted(~keepLoadedFromDb=true)

// Still over: drop the db-loaded entries too.
if inMemoryStore->getChangesCount >= keepLatestChangesLimit {
inMemoryStore->dropCommitted(~keepLoadedFromDb=false)
}

// What's left is uncommitted. Only wait if a queued batch can free it;
// otherwise (e.g. a large rollback diff with no batch) waiting would
// deadlock, so let processing proceed.
// Still over: what's left is uncommitted. Only wait if a queued batch can
// free it; otherwise (e.g. a large rollback diff with no batch) waiting
// would deadlock, so let processing proceed.
if (
inMemoryStore->getChangesCount >= keepLatestChangesLimit &&
inMemoryStore.processedBatches->Utils.Array.notEmpty
Expand Down
15 changes: 10 additions & 5 deletions packages/envio/src/InMemoryTable.res
Original file line number Diff line number Diff line change
Expand Up @@ -84,14 +84,19 @@ module Entity = {

// Frees committed changes: drops latest entries at or below committedCheckpointId
// (re-readable from the db) and clears the per-batch indices (rebuilt on the next
// getWhere). Uncommitted changes are kept.
let dropCommittedChanges = (self: t, ~committedCheckpointId) => {
// getWhere). Uncommitted changes are kept. With keepLoadedFromDb, entries seeded
// from a db read are spared so the cheaper-to-re-derive writes are dropped first.
let dropCommittedChanges = (self: t, ~committedCheckpointId, ~keepLoadedFromDb) => {
let keysToDelete = []
self.latestEntityChangeById->Utils.Dict.forEachWithKey((change, key) =>
if !(change->Change.getCheckpointId > committedCheckpointId) {
self.latestEntityChangeById->Utils.Dict.forEachWithKey((change, key) => {
let checkpointId = change->Change.getCheckpointId
if (
!(checkpointId > committedCheckpointId) &&
!(keepLoadedFromDb && checkpointId == Internal.loadedFromDbCheckpointId)
) {
keysToDelete->Array.push(key)
}
)
})
keysToDelete->Array.forEach(key => self.latestEntityChangeById->Utils.Dict.deleteInPlace(key))
self.changesCount = self.changesCount -. keysToDelete->Array.length->Int.toFloat
self.indicesByEntityId = Dict.make()
Expand Down
3 changes: 3 additions & 0 deletions packages/envio/src/Internal.res
Original file line number Diff line number Diff line change
Expand Up @@ -537,6 +537,9 @@ type effectArgs = {
input: effectInput,
context: effectContext,
cacheKey: string,
// The processing checkpoint that referenced this effect; stamped on the
// in-memory cache entry so it's evicted once the checkpoint commits.
checkpointId: bigint,
}
type effectCacheItem = {id: string, output: effectOutput}
type effectCacheStorageMeta = {
Expand Down
12 changes: 7 additions & 5 deletions packages/envio/src/LoadLayer.res
Original file line number Diff line number Diff line change
Expand Up @@ -89,10 +89,12 @@ let callEffect = (

effect.handler(arg)
->Promise.thenResolve(output => {
inMemTable.dict->Dict.set(arg.cacheKey, output)
if arg.context.cache {
inMemTable.idsToStore->Array.push(arg.cacheKey)->ignore
}
inMemTable->InMemoryStore.setEffectOutput(
~checkpointId=arg.checkpointId,
~cacheKey=arg.cacheKey,
~output,
~shouldCache=arg.context.cache,
)
})
->Utils.Promise.catchResolve(exn => {
onError(~inputKey=arg.cacheKey, ~exn)
Expand Down Expand Up @@ -283,7 +285,7 @@ let loadEffect = (
try {
let output = dbEntity.output->S.parseOrThrow(outputSchema)
idsFromCache->Utils.Set.add(dbEntity.id)->ignore
inMemTable.dict->Dict.set(dbEntity.id, output)
inMemTable->InMemoryStore.initEffectOutputFromDb(~cacheKey=dbEntity.id, ~output)
} catch {
| S.Raised(error) =>
inMemTable.invalidationsCount = inMemTable.invalidationsCount + 1
Expand Down
1 change: 1 addition & 0 deletions packages/envio/src/UserContext.res
Original file line number Diff line number Diff line change
Expand Up @@ -55,6 +55,7 @@ let initEffect = (params: contextParams) => {
input,
context: effectContext,
cacheKey: input->S.reverseConvertOrThrow(effect.input)->Utils.Hash.makeOrThrow,
checkpointId: params.checkpointId,
}
LoadLayer.loadEffect(
~loadManager=params.loadManager,
Expand Down
Loading
Loading