From f44f442f6b4c76e6419c765a94efab3355407759 Mon Sep 17 00:00:00 2001 From: Kris Zyp Date: Sat, 2 May 2026 18:47:55 -0600 Subject: [PATCH] fix(replication): clean up blobs accepted during failed decode In the replication ingest loop, `decodeBlobsWithWrites` may invoke the blob callback (which kicks off `saveBlob`) for one or more blobs and then throw before the surrounding callback can build the `event`. The `catch` only logs, so the partial blobs end up on disk with no record ever referencing them. Track the blobs as they are received and, if `event` is still undefined after the catch (decode failed), schedule deletion after the 60s window used by the relocate path. The successful path is unchanged. Co-Authored-By: Claude Opus 4.7 (1M context) --- replication/replicationConnection.ts | 12 +++++++++++- 1 file changed, 11 insertions(+), 1 deletion(-) diff --git a/replication/replicationConnection.ts b/replication/replicationConnection.ts index 7c8d02ee8..02b7e9c6c 100644 --- a/replication/replicationConnection.ts +++ b/replication/replicationConnection.ts @@ -1547,6 +1547,7 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom } const id = auditRecord.recordId; event = undefined; // reset before each decode attempt + let receivedBlobs: any[] | undefined; try { decodeBlobsWithWrites( () => { @@ -1565,7 +1566,11 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom }; }, auditStore?.rootStore, - (blob) => receiveBlobs(blob, id) + (blob) => { + const localBlob = receiveBlobs(blob, id); + (receivedBlobs ??= []).push(localBlob); + return localBlob; + } ); } catch (error) { logger.error?.( @@ -1578,6 +1583,11 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom error ); } + if (!event && receivedBlobs) { + // decode failed mid-message; the blobs that were already accepted will never be referenced. Give in-flight reads + // a window to complete, then unlink the files. (mirrors the pattern at the relocate path above.) + setTimeout(() => receivedBlobs.forEach(deleteBlob), 60000).unref(); + } replicationSharedStatus[RECEIVED_VERSION_POSITION] = Math.max( // ensure monotonicity auditRecord.version,