Skip to content

Commit 3239b2a

Browse files
Fix XTaskQueue delayed-callback timing and wait-timer teardown races (#975)
* Compile existing task queue test hooks only in unit-test builds * Add regression for early promotion during queue termination * Avoid early promotion when terminating shared task queues * Add stale delayed callback repro * Fix stale delayed callback, empty-sweep wake, and STL timer teardown races Retarget delayed-callback wake handling so stale timer notifications cannot promote future work too early, keep the same-port empty-sweep rescue path iterative, and harden the STL wait timer so teardown cannot race an in-flight callback after a due heap entry is popped. The regression and hook changes remain grouped here because they cover the same delayed-callback scheduling line and the Linux timer backend that services it. * Fix iOS WaitTimer ceiling rounding and teardown Keep the iOS backend on monotonic due times without truncating sub-millisecond delays, and implement the missing WaitTimer termination path so the Apple backend matches the shared TaskQueue lifecycle contract. --------- Co-authored-by: Jason Sandlin <jasonsa@microsoft.com>
1 parent 0341897 commit 3239b2a

11 files changed

Lines changed: 996 additions & 186 deletions

Source/Task/TaskQueue.cpp

Lines changed: 217 additions & 75 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,10 @@ HRESULT __stdcall TaskQueuePortImpl::QueueItem(
362362
}
363363
else
364364
{
365-
entry.enqueueTime = m_timer.GetAbsoluteTime(waitMs);
365+
// Delayed callbacks are ordered by a monotonic due time so stale timer
366+
// callbacks and wall-clock adjustments cannot make one pending entry
367+
// masquerade as another.
368+
entry.enqueueTime = m_timer.GetDueTime(waitMs);
366369
RETURN_HR_IF(E_OUTOFMEMORY, !m_pendingList->push_back(entry));
367370

368371
// If the entry's enqueue time is < our current time,
@@ -955,18 +958,21 @@ void TaskQueuePortImpl::CancelPendingEntries(
955958
_In_ ITaskQueuePortContext* portContext,
956959
_In_ bool appendToQueue)
957960
{
958-
// Stop wait timer and promote pending callbacks that are used
959-
// by the queue that invoked this termination. Other callbacks
960-
// are placed back on the pending list.
961-
962-
m_timer.Cancel();
963-
m_timerDue = UINT64_MAX;
961+
// Only move entries owned by the terminating queue. Sibling delegates
962+
// share this port's delayed-callback timer state, so leave m_timer and
963+
// m_timerDue alone; if we removed the armed earliest entry, the existing
964+
// timer simply takes one blank fire and re-arms for the next real item.
965+
LocklessQueue<QueueEntry> entriesToAppend(*m_queueList.get());
964966

965967
m_pendingList->remove_if([&](auto& entry, auto address)
966968
{
967969
if (entry.portContext == portContext)
968970
{
969-
if (!appendToQueue || !AppendEntry(entry, address))
971+
if (appendToQueue)
972+
{
973+
entriesToAppend.push_back(std::move(entry), address);
974+
}
975+
else
970976
{
971977
entry.portContext->Release();
972978
m_pendingList->free_node(address);
@@ -978,7 +984,31 @@ void TaskQueuePortImpl::CancelPendingEntries(
978984
return false;
979985
});
980986

981-
SubmitPendingCallback();
987+
while (appendToQueue)
988+
{
989+
QueueEntry entry = {};
990+
uint64_t address = 0;
991+
if (!entriesToAppend.pop_front(entry, address))
992+
{
993+
break;
994+
}
995+
996+
if (!AppendEntry(entry, address))
997+
{
998+
entry.portContext->Release();
999+
m_queueList->free_node(address);
1000+
}
1001+
}
1002+
1003+
#ifdef HC_UNITTEST_API
1004+
// Test hook: let unit tests enqueue a sibling delayed callback while this
1005+
// termination path still owns the interleaving window that used to race
1006+
// with SubmitPendingCallback().
1007+
if (auto hooks = portContext->GetQueue()->GetTestHooks(); hooks != nullptr)
1008+
{
1009+
hooks->PendingEntriesRemovedDuringTermination(portContext->GetType());
1010+
}
1011+
#endif
9821012

9831013
#ifdef _WIN32
9841014

@@ -1028,83 +1058,130 @@ void TaskQueuePortImpl::EraseQueue(
10281058
}
10291059
}
10301060

1031-
// Examines the pending callback list, optionally popping the entry off the
1032-
// list that matches m_timerDue, and schedules the timer for the next entry.
1033-
bool TaskQueuePortImpl::ScheduleNextPendingCallback(
1061+
// Promotes every delayed entry whose deadline has already arrived and then
1062+
// arms the timer for the next future deadline, if one remains.
1063+
//
1064+
// This replaces the older "pop exactly one entry whose enqueueTime matches the
1065+
// currently armed due time" flow. That older model made correctness depend on
1066+
// timestamps behaving like unique identities. By sweeping everything with
1067+
// enqueueTime <= now, equal-deadline siblings and stale timer callbacks both
1068+
// collapse into the same simple rule: if a callback is due, move it now; if it
1069+
// is still in the future, leave it pending and re-arm for the earliest future
1070+
// item.
1071+
void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
10341072
_In_ uint64_t dueTime,
1035-
_Out_ QueueEntry& dueEntry,
1036-
_Out_ uint64_t& dueEntryNode)
1073+
_In_ uint64_t now)
10371074
{
1038-
QueueEntry nextItem = {};
1039-
bool hasDueEntry = false;
1040-
bool hasNextItem = false;
1075+
for (;;)
1076+
{
1077+
// Collect due entries locally first and only touch the active queue
1078+
// after remove_if completes. Keeping the sweep phase and the publish
1079+
// phase separate preserves the "promote all ready entries" behavior
1080+
// without asking remove_if to coexist with queue wakeups and
1081+
// cross-queue node reuse at the same time.
1082+
LocklessQueue<QueueEntry> readyEntries(*m_queueList.get());
10411083

1042-
dueEntryNode = 0;
1084+
QueueEntry nextItem = {};
1085+
bool hasNextItem = false;
10431086

1044-
m_pendingList->remove_if([&](auto& entry, auto address)
1045-
{
1046-
if (!hasDueEntry && entry.enqueueTime == dueTime)
1047-
{
1048-
dueEntry = entry;
1049-
dueEntryNode = address;
1050-
hasDueEntry = true;
1051-
return true;
1052-
}
1053-
else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
1087+
m_pendingList->remove_if([&](auto& entry, auto address)
10541088
{
1055-
// remove_if works by removing items from the list and
1056-
// re-adding them if this callback returns false. If we
1057-
// are going to keep an item beyond this callback we need
1058-
// to make sure fields we're using stay valid. Only the
1059-
// port context is a risk.
1089+
// Any entry whose deadline has passed is ready right now,
1090+
// regardless of whether its timestamp aliases another entry or
1091+
// whether this timer fire is the original notification or a
1092+
// stale callback that arrived late.
1093+
if (entry.enqueueTime <= now)
1094+
{
1095+
readyEntries.push_back(std::move(entry), address);
1096+
1097+
return true;
1098+
}
10601099

1061-
if (hasNextItem)
1100+
if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
10621101
{
1063-
nextItem.portContext->Release();
1102+
// remove_if works by removing items from the list and
1103+
// re-adding them if this callback returns false. If we
1104+
// are going to keep an item beyond this callback we need
1105+
// to make sure fields we're using stay valid. Only the
1106+
// port context is a risk.
1107+
1108+
if (hasNextItem)
1109+
{
1110+
nextItem.portContext->Release();
1111+
}
1112+
1113+
nextItem = entry;
1114+
nextItem.portContext->AddRef();
1115+
hasNextItem = true;
10641116
}
10651117

1066-
nextItem = entry;
1067-
nextItem.portContext->AddRef();
1068-
hasNextItem = true;
1069-
}
1118+
return false;
1119+
});
10701120

1071-
return false;
1072-
});
1121+
// Publish the ready entries after the pending-list walk finishes.
1122+
QueueEntry readyEntry = {};
1123+
uint64_t readyEntryNode = 0;
1124+
while (readyEntries.pop_front(readyEntry, readyEntryNode))
1125+
{
1126+
if (!AppendEntry(readyEntry, readyEntryNode))
1127+
{
1128+
readyEntry.portContext->Release();
1129+
m_queueList->free_node(readyEntryNode);
1130+
}
1131+
}
10731132

1074-
if (hasNextItem)
1075-
{
1076-
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
1133+
if (hasNextItem)
10771134
{
1078-
while (true)
1135+
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
10791136
{
1080-
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
1137+
while (true)
10811138
{
1082-
m_timer.Start(nextItem.enqueueTime);
1083-
break;
1084-
}
1139+
// Publish the earliest future deadline that survived the
1140+
// ready sweep. If another thread already armed an even
1141+
// earlier timer, leave that earlier deadline in place.
1142+
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
1143+
{
1144+
m_timer.Start(nextItem.enqueueTime);
1145+
break;
1146+
}
10851147

1086-
dueTime = m_timerDue.load();
1148+
dueTime = m_timerDue.load();
10871149

1088-
if (dueTime <= nextItem.enqueueTime)
1089-
{
1090-
break;
1150+
if (dueTime <= nextItem.enqueueTime)
1151+
{
1152+
break;
1153+
}
10911154
}
10921155
}
1093-
}
1094-
else
1095-
{
1096-
// The port is no longer active. Pending entries are canceled
1097-
// when the port is terminated, but if we were iterating above
1098-
// it's possible that we removed an item while the termination was
1099-
// being processed and it got missed.
1100-
CancelPendingEntries(nextItem.portContext, true);
1156+
else
1157+
{
1158+
// The port is no longer active. Pending entries are canceled
1159+
// when the port is terminated, but if we were iterating above
1160+
// it's possible that we removed an item while the termination
1161+
// was being processed and it got missed.
1162+
CancelPendingEntries(nextItem.portContext, true);
1163+
}
1164+
1165+
nextItem.portContext->Release();
1166+
return;
11011167
}
11021168

1103-
nextItem.portContext->Release();
1104-
}
1105-
else
1106-
{
1169+
// No future entries remain in the pending list.
11071170
uint64_t noDueTime = UINT64_MAX;
1171+
1172+
#ifdef HC_UNITTEST_API
1173+
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
1174+
{
1175+
auto hooks = portContext->GetQueue()->GetTestHooks();
1176+
if (hooks != nullptr)
1177+
{
1178+
hooks->NoNextPendingCallbackFound(
1179+
portContext->GetType(),
1180+
dueTime);
1181+
}
1182+
});
1183+
#endif
1184+
11081185
if (m_timerDue.compare_exchange_strong(dueTime, noDueTime))
11091186
{
11101187
// Bug fix: ScheduleNextPendingCallback timer race results
@@ -1118,6 +1195,7 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
11181195
// See VerifyDelayedCallbackTimerRaceOnManualQueue for full
11191196
// analysis. The test hook here allows unit tests to verify
11201197
// there is no race.
1198+
#ifdef HC_UNITTEST_API
11211199
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
11221200
{
11231201
auto hooks = portContext->GetQueue()->GetTestHooks();
@@ -1128,24 +1206,69 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
11281206
noDueTime);
11291207
}
11301208
});
1209+
#endif
1210+
1211+
// A concurrent QueueItem can append a future entry after our
1212+
// sweep has already concluded there is no next item, but before
1213+
// we publish UINT64_MAX here. Instead of recursing (which has
1214+
// no tail-call guarantee and risks stack growth under sustained
1215+
// contention), loop back for a rescue sweep. If nothing landed,
1216+
// the second pass is a cheap no-op.
1217+
if (dueTime != noDueTime)
1218+
{
1219+
now = m_timer.GetCurrentTime();
1220+
dueTime = noDueTime;
1221+
continue;
1222+
}
11311223
}
1132-
}
11331224

1134-
return hasDueEntry;
1225+
return;
1226+
}
11351227
}
11361228

11371229
void TaskQueuePortImpl::SubmitPendingCallback()
11381230
{
1139-
QueueEntry dueEntry;
1140-
uint64_t dueEntryNode;
1141-
1142-
if (ScheduleNextPendingCallback(m_timerDue.load(), dueEntry, dueEntryNode))
1231+
while (true)
11431232
{
1144-
if (!AppendEntry(dueEntry, dueEntryNode))
1233+
uint64_t dueTime = m_timerDue.load();
1234+
1235+
if (dueTime == UINT64_MAX)
1236+
{
1237+
return;
1238+
}
1239+
1240+
// Threadpool timer callbacks that were already queued can still arrive
1241+
// after the timer has been retargeted. Treat the callback as advisory and
1242+
// only sweep ready entries once the currently armed monotonic deadline has
1243+
// actually arrived.
1244+
//
1245+
// Important: do not just return on an "early" callback. On Win32 the
1246+
// threadpool timer's relative wait source is not the same clock object as
1247+
// std::chrono::steady_clock, so a legitimate one-shot fire can arrive a
1248+
// little before the stored steady-clock deadline. If we drop that callback
1249+
// without re-arming the timer, the pending entry can remain stranded until
1250+
// some unrelated later timer fire or termination path happens to flush it.
1251+
//
1252+
// Also do not blindly re-arm the due time we just read. Another thread can
1253+
// publish an earlier pending entry between the load above and Start() below.
1254+
// If this stale callback then overwrites the timer with the older deadline,
1255+
// the newer earlier entry can stay stranded until the older deadline fires.
1256+
// Only re-arm when m_timerDue still matches the due time we observed.
1257+
const uint64_t now = m_timer.GetCurrentTime();
1258+
if (now < dueTime)
11451259
{
1146-
dueEntry.portContext->Release();
1147-
m_queueList->free_node(dueEntryNode);
1260+
uint64_t expectedDueTime = dueTime;
1261+
if (m_timerDue.compare_exchange_weak(expectedDueTime, dueTime))
1262+
{
1263+
m_timer.Start(dueTime);
1264+
return;
1265+
}
1266+
1267+
continue;
11481268
}
1269+
1270+
PromoteReadyPendingCallbacks(dueTime, now);
1271+
return;
11491272
}
11501273
}
11511274

@@ -2341,6 +2464,7 @@ STDAPI_(bool) XTaskQueueUninitialize(
23412464
return ApiRefs::WaitZeroRefs(timeoutMilliseconds);
23422465
}
23432466

2467+
#ifdef HC_UNITTEST_API
23442468
/// <summary>
23452469
/// Sets or clears test hooks on a task queue.
23462470
/// </summary>
@@ -2353,4 +2477,22 @@ STDAPI XTaskQueueSetTestHooks(
23532477
RETURN_HR_IF(E_GAMERUNTIME_INVALID_HANDLE, aq == nullptr);
23542478
aq->SetTestHooks(hooks);
23552479
return S_OK;
2356-
}
2480+
}
2481+
2482+
STDAPI XTaskQueueSubmitPendingCallbackForTests(
2483+
_In_ XTaskQueueHandle queue,
2484+
_In_ XTaskQueuePort port
2485+
) noexcept
2486+
{
2487+
referenced_ptr<ITaskQueue> aq(GetQueue(queue));
2488+
RETURN_HR_IF(E_GAMERUNTIME_INVALID_HANDLE, aq == nullptr);
2489+
2490+
referenced_ptr<ITaskQueuePortContext> portContext;
2491+
RETURN_IF_FAILED(aq->GetPortContext(port, portContext.address_of()));
2492+
2493+
auto* portImpl = static_cast<TaskQueuePortImpl*>(portContext->GetPort());
2494+
portImpl->SubmitPendingCallbackForTests();
2495+
return S_OK;
2496+
}
2497+
#endif
2498+

0 commit comments

Comments
 (0)