Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,10 @@ protected List<RpcObjectData> handle(GetObject request) throws Exception {

RpcSendQueue sendQueue = new RpcSendQueue(batchSize.get(), batch::put, localRefs, request.getSourceFileType(), traceGetObject.get());
TREE_TRAVERSAL_POOL.submit(() -> {
// Snapshot the current ref count so we can roll back on failure.
// Ref IDs are assigned sequentially as localRefs.size() + 1,
// so any ref > savedRefCount was added during this exchange.
int savedRefCount = localRefs.size();
try {
sendQueue.send(after, before, null);

Expand All @@ -96,6 +100,13 @@ protected List<RpcObjectData> handle(GetObject request) throws Exception {
// forces a full object sync (ADD) instead of a delta (CHANGE)
// against the stale, partially-sent baseline.
remoteObjects.remove(id);

// Roll back localRefs to remove refs assigned during this failed
// exchange. Without this, subsequent exchanges would send pure
// references for objects the remote never received, causing
// "Received a reference to an object that was not previously sent".
localRefs.values().removeIf(ref -> ref > savedRefCount);

PrintStream logFile = log.get();
//noinspection ConstantValue
if (logFile != null) {
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -181,9 +181,14 @@ void sendFailureCleansUpRemoteObjects() {
}

// Step 4: verify the sender cleaned up its stale remoteObjects entry
// and rolled back any refs assigned during the failed exchange
assertThat(server.remoteObjects)
.describedAs("Sender should remove stale remoteObjects entry after send failure")
.doesNotContainKey(id);
int refsAfterFailure = server.localRefs.size();
assertThat(refsAfterFailure)
.describedAs("Sender should roll back localRefs assigned during failed exchange")
.isEqualTo(0);

// Step 5: put back a valid tree and retry — should succeed via full ADD
PlainText fixed = original.withText("Fixed");
Expand Down
17 changes: 16 additions & 1 deletion rewrite-javascript/rewrite/src/reference.ts
Original file line number Diff line number Diff line change
Expand Up @@ -84,4 +84,19 @@ export class ReferenceMap {
this.refs.set(ref, refId);
this.refsById.set(refId, ref);
}
}

snapshot(): number {
return this.refCount;
}

rollbackTo(savedRefCount: number): void {
for (let i = savedRefCount; i < this.refCount; i++) {
const obj = this.refsById.get(i);
if (obj) {
this.refs.delete(obj);
this.refsById.delete(i);
}
}
this.refCount = savedRefCount;
}
}
19 changes: 14 additions & 5 deletions rewrite-javascript/rewrite/src/rpc/request/get-object.ts
Original file line number Diff line number Diff line change
Expand Up @@ -63,11 +63,20 @@ export class GetObject {
const after = obj;
const before = remoteObjects.get(objId);

allData = await new RpcSendQueue(localRefs, request.sourceFileType, trace())
.generate(after, before);
pendingData.set(objId, allData);

remoteObjects.set(objId, after);
// Snapshot ref count so we can roll back on failure.
// Ref IDs are assigned sequentially, so any ref >= savedRefCount
// was added during this exchange.
const savedRefCount = localRefs.snapshot();
try {
allData = await new RpcSendQueue(localRefs, request.sourceFileType, trace())
.generate(after, before);
pendingData.set(objId, allData);
remoteObjects.set(objId, after);
} catch (e) {
remoteObjects.delete(objId);
localRefs.rollbackTo(savedRefCount);
throw e;
}
}

const batch = allData.splice(0, batchSize);
Expand Down