diff --git a/replication/replicationConnection.ts b/replication/replicationConnection.ts index 7c8d02ee8..02b7e9c6c 100644 --- a/replication/replicationConnection.ts +++ b/replication/replicationConnection.ts @@ -1547,6 +1547,7 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom } const id = auditRecord.recordId; event = undefined; // reset before each decode attempt + let receivedBlobs: any[] | undefined; try { decodeBlobsWithWrites( () => { @@ -1565,7 +1566,11 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom }; }, auditStore?.rootStore, - (blob) => receiveBlobs(blob, id) + (blob) => { + const localBlob = receiveBlobs(blob, id); + (receivedBlobs ??= []).push(localBlob); + return localBlob; + } ); } catch (error) { logger.error?.( @@ -1578,6 +1583,11 @@ export function replicateOverWS(ws: WebSocket, options: any, authorization: Prom error ); } + if (!event && receivedBlobs) { + // decode failed mid-message; the blobs that were already accepted will never be referenced. Give in-flight reads + // a window to complete, then unlink the files. (mirrors the pattern at the relocate path above.) + setTimeout(() => receivedBlobs.forEach(deleteBlob), 60000).unref(); + } replicationSharedStatus[RECEIVED_VERSION_POSITION] = Math.max( // ensure monotonicity auditRecord.version,