Skip to content

Commit a44764f

Browse files
committed
Guard delayed timer re-arm against stale callbacks
1 parent 1cd9640 commit a44764f

7 files changed

Lines changed: 247 additions & 71 deletions

File tree

Source/Task/TaskQueue.cpp

Lines changed: 113 additions & 24 deletions
Original file line numberDiff line numberDiff line change
@@ -362,7 +362,10 @@ HRESULT __stdcall TaskQueuePortImpl::QueueItem(
362362
}
363363
else
364364
{
365-
entry.enqueueTime = m_timer.GetAbsoluteTime(waitMs);
365+
// Delayed callbacks are ordered by a monotonic due time so stale timer
366+
// callbacks and wall-clock adjustments cannot make one pending entry
367+
// masquerade as another.
368+
entry.enqueueTime = m_timer.GetDueTime(waitMs);
366369
RETURN_HR_IF(E_OUTOFMEMORY, !m_pendingList->push_back(entry));
367370

368371
// If the entry's enqueue time is < our current time,
@@ -959,12 +962,17 @@ void TaskQueuePortImpl::CancelPendingEntries(
959962
// share this port's delayed-callback timer state, so leave m_timer and
960963
// m_timerDue alone; if we removed the armed earliest entry, the existing
961964
// timer simply takes one blank fire and re-arms for the next real item.
965+
LocklessQueue<QueueEntry> entriesToAppend(*m_queueList.get());
962966

963967
m_pendingList->remove_if([&](auto& entry, auto address)
964968
{
965969
if (entry.portContext == portContext)
966970
{
967-
if (!appendToQueue || !AppendEntry(entry, address))
971+
if (appendToQueue)
972+
{
973+
entriesToAppend.push_back(std::move(entry), address);
974+
}
975+
else
968976
{
969977
entry.portContext->Release();
970978
m_pendingList->free_node(address);
@@ -976,6 +984,22 @@ void TaskQueuePortImpl::CancelPendingEntries(
976984
return false;
977985
});
978986

987+
while (appendToQueue)
988+
{
989+
QueueEntry entry = {};
990+
uint64_t address = 0;
991+
if (!entriesToAppend.pop_front(entry, address))
992+
{
993+
break;
994+
}
995+
996+
if (!AppendEntry(entry, address))
997+
{
998+
entry.portContext->Release();
999+
m_queueList->free_node(address);
1000+
}
1001+
}
1002+
9791003
#ifdef HC_UNITTEST_API
9801004
// Test hook: let unit tests enqueue a sibling delayed callback while this
9811005
// termination path still owns the interleaving window that used to race
@@ -1034,29 +1058,46 @@ void TaskQueuePortImpl::EraseQueue(
10341058
}
10351059
}
10361060

1037-
// Examines the pending callback list, optionally popping the entry off the
1038-
// list that matches m_timerDue, and schedules the timer for the next entry.
1039-
bool TaskQueuePortImpl::ScheduleNextPendingCallback(
1061+
// Promotes every delayed entry whose deadline has already arrived and then
1062+
// arms the timer for the next future deadline, if one remains.
1063+
//
1064+
// This replaces the older "pop exactly one entry whose enqueueTime matches the
1065+
// currently armed due time" flow. That older model made correctness depend on
1066+
// timestamps behaving like unique identities. By sweeping everything with
1067+
// enqueueTime <= now, equal-deadline siblings and stale timer callbacks both
1068+
// collapse into the same simple rule: if a callback is due, move it now; if it
1069+
// is still in the future, leave it pending and re-arm for the earliest future
1070+
// item.
1071+
void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
10401072
_In_ uint64_t dueTime,
1041-
_Out_ QueueEntry& dueEntry,
1042-
_Out_ uint64_t& dueEntryNode)
1043-
{
1073+
_In_ uint64_t now)
1074+
{
1075+
// Collect due entries locally first and only touch the active queue after
1076+
// remove_if completes. The callback passed to LocklessQueue::remove_if owns
1077+
// any removed node addresses, but mutating the ready queue and signaling
1078+
// dispatchers from inside that walk makes the pending-list sweep interleave
1079+
// with new work publication. Keeping the sweep phase and the publish phase
1080+
// separate preserves the "promote all ready entries" behavior without
1081+
// asking remove_if to coexist with queue wakeups and cross-queue node reuse
1082+
// at the same time.
1083+
LocklessQueue<QueueEntry> readyEntries(*m_queueList.get());
1084+
10441085
QueueEntry nextItem = {};
1045-
bool hasDueEntry = false;
10461086
bool hasNextItem = false;
10471087

1048-
dueEntryNode = 0;
1049-
10501088
m_pendingList->remove_if([&](auto& entry, auto address)
10511089
{
1052-
if (!hasDueEntry && entry.enqueueTime == dueTime)
1090+
// Any entry whose deadline has passed is ready right now, regardless of
1091+
// whether its timestamp aliases another entry or whether this timer fire
1092+
// is the original notification or a stale callback that arrived late.
1093+
if (entry.enqueueTime <= now)
10531094
{
1054-
dueEntry = entry;
1055-
dueEntryNode = address;
1056-
hasDueEntry = true;
1095+
readyEntries.push_back(std::move(entry), address);
1096+
10571097
return true;
10581098
}
1059-
else if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
1099+
1100+
if (!hasNextItem || nextItem.enqueueTime > entry.enqueueTime)
10601101
{
10611102
// remove_if works by removing items from the list and
10621103
// re-adding them if this callback returns false. If we
@@ -1077,12 +1118,30 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
10771118
return false;
10781119
});
10791120

1121+
// Publish the ready entries after the pending-list walk finishes. This
1122+
// keeps queue wakeups and threadpool submissions out of remove_if's
1123+
// critical section while still publishing each promoted ready entry
1124+
// individually once the sweep is complete.
1125+
QueueEntry readyEntry = {};
1126+
uint64_t readyEntryNode = 0;
1127+
while (readyEntries.pop_front(readyEntry, readyEntryNode))
1128+
{
1129+
if (!AppendEntry(readyEntry, readyEntryNode))
1130+
{
1131+
readyEntry.portContext->Release();
1132+
m_queueList->free_node(readyEntryNode);
1133+
}
1134+
}
1135+
10801136
if (hasNextItem)
10811137
{
10821138
if (nextItem.portContext->GetStatus() == TaskQueuePortStatus::Active)
10831139
{
10841140
while (true)
10851141
{
1142+
// Publish the earliest future deadline that survived the ready
1143+
// sweep. If another thread already armed an even earlier timer,
1144+
// leave that earlier deadline in place and do not overwrite it.
10861145
if (m_timerDue.compare_exchange_weak(dueTime, nextItem.enqueueTime))
10871146
{
10881147
m_timer.Start(nextItem.enqueueTime);
@@ -1139,21 +1198,51 @@ bool TaskQueuePortImpl::ScheduleNextPendingCallback(
11391198
}
11401199
}
11411200

1142-
return hasDueEntry;
11431201
}
11441202

11451203
void TaskQueuePortImpl::SubmitPendingCallback()
11461204
{
1147-
QueueEntry dueEntry;
1148-
uint64_t dueEntryNode;
1149-
1150-
if (ScheduleNextPendingCallback(m_timerDue.load(), dueEntry, dueEntryNode))
1205+
while (true)
11511206
{
1152-
if (!AppendEntry(dueEntry, dueEntryNode))
1207+
uint64_t dueTime = m_timerDue.load();
1208+
1209+
if (dueTime == UINT64_MAX)
11531210
{
1154-
dueEntry.portContext->Release();
1155-
m_queueList->free_node(dueEntryNode);
1211+
return;
11561212
}
1213+
1214+
// Threadpool timer callbacks that were already queued can still arrive
1215+
// after the timer has been retargeted. Treat the callback as advisory and
1216+
// only sweep ready entries once the currently armed monotonic deadline has
1217+
// actually arrived.
1218+
//
1219+
// Important: do not just return on an "early" callback. On Win32 the
1220+
// threadpool timer's relative wait source is not the same clock object as
1221+
// std::chrono::steady_clock, so a legitimate one-shot fire can arrive a
1222+
// little before the stored steady-clock deadline. If we drop that callback
1223+
// without re-arming the timer, the pending entry can remain stranded until
1224+
// some unrelated later timer fire or termination path happens to flush it.
1225+
//
1226+
// Also do not blindly re-arm the due time we just read. Another thread can
1227+
// publish an earlier pending entry between the load above and Start() below.
1228+
// If this stale callback then overwrites the timer with the older deadline,
1229+
// the newer earlier entry can stay stranded until the older deadline fires.
1230+
// Only re-arm when m_timerDue still matches the due time we observed.
1231+
const uint64_t now = m_timer.GetCurrentTime();
1232+
if (now < dueTime)
1233+
{
1234+
uint64_t expectedDueTime = dueTime;
1235+
if (m_timerDue.compare_exchange_weak(expectedDueTime, dueTime))
1236+
{
1237+
m_timer.Start(dueTime);
1238+
return;
1239+
}
1240+
1241+
continue;
1242+
}
1243+
1244+
PromoteReadyPendingCallbacks(dueTime, now);
1245+
return;
11571246
}
11581247
}
11591248

Source/Task/TaskQueueImpl.h

Lines changed: 2 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -311,10 +311,9 @@ class TaskQueuePortImpl: public Api<ApiId::TaskQueuePort, ITaskQueuePort>
311311
static void EraseQueue(
312312
_In_opt_ LocklessQueue<QueueEntry>* queue);
313313

314-
bool ScheduleNextPendingCallback(
314+
void PromoteReadyPendingCallbacks(
315315
_In_ uint64_t dueTime,
316-
_Out_ QueueEntry& dueEntry,
317-
_Out_ uint64_t& dueEntryNode);
316+
_In_ uint64_t now);
318317

319318
void SubmitPendingCallback();
320319

Source/Task/WaitTimer.h

Lines changed: 10 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ namespace OS
66

77
class WaitTimerImpl;
88

9-
// A wait timer holds a single timeout in absolute
10-
// time. Calling Start will reset any pending timeout.
9+
// A wait timer holds a single timeout expressed as a monotonic due time.
10+
// Calling Start will reset any pending timeout.
1111
class WaitTimer
1212
{
1313
public:
@@ -17,10 +17,16 @@ namespace OS
1717
HRESULT Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback) noexcept;
1818
void Terminate() noexcept;
1919

20-
void Start(_In_ uint64_t absoluteTime) noexcept;
20+
// Arms the one-shot timer for the provided monotonic due time.
21+
void Start(_In_ uint64_t dueTime) noexcept;
2122
void Cancel() noexcept;
2223

23-
uint64_t GetAbsoluteTime(_In_ uint32_t msFromNow) noexcept;
24+
// Returns the current monotonic time used for delayed-callback
25+
// ordering and stale-timer validation.
26+
uint64_t GetCurrentTime() noexcept;
27+
28+
// Returns a monotonic due time msFromNow milliseconds in the future.
29+
uint64_t GetDueTime(_In_ uint32_t msFromNow) noexcept;
2430

2531
private:
2632
std::atomic<WaitTimerImpl*> m_impl;

Source/Task/WaitTimer_stl.cpp

Lines changed: 34 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,26 @@
11
#include "pch.h"
22
#include "WaitTimer.h"
33

4-
using Deadline = std::chrono::high_resolution_clock::time_point;
4+
using Clock = std::chrono::steady_clock;
5+
using Deadline = Clock::time_point;
6+
using TimerDuration = std::chrono::nanoseconds;
7+
8+
namespace
9+
{
10+
// Keep the public WaitTimer surface on a plain integer so TaskQueue can use
11+
// atomics without dragging chrono types through its state. The integer still
12+
// represents steady-clock time, not wall-clock time.
13+
Deadline DeadlineFromDueTime(uint64_t dueTime) noexcept
14+
{
15+
return Deadline(std::chrono::duration_cast<Clock::duration>(TimerDuration(dueTime)));
16+
}
17+
18+
uint64_t DueTimeFromDeadline(Deadline deadline) noexcept
19+
{
20+
return static_cast<uint64_t>(
21+
std::chrono::duration_cast<TimerDuration>(deadline.time_since_epoch()).count());
22+
}
23+
}
524

625
namespace OS
726
{
@@ -12,7 +31,7 @@ namespace OS
1231
public:
1332
~WaitTimerImpl();
1433
HRESULT Initialize(_In_opt_ void* context, _In_ WaitTimerCallback* callback);
15-
void Start(_In_ uint64_t absoluteTime);
34+
void Start(_In_ uint64_t dueTime);
1635
void Cancel();
1736
void InvokeCallback();
1837

@@ -144,7 +163,7 @@ namespace OS
144163
while (!m_queue.empty())
145164
{
146165
Deadline next = Peek().When;
147-
if (std::chrono::high_resolution_clock::now() < next)
166+
if (Clock::now() < next)
148167
{
149168
break;
150169
}
@@ -235,9 +254,9 @@ namespace OS
235254
return S_OK;
236255
}
237256

238-
void WaitTimerImpl::Start(_In_ uint64_t absoluteTime)
257+
void WaitTimerImpl::Start(_In_ uint64_t dueTime)
239258
{
240-
m_timerQueue->Set(this, Deadline(Deadline::duration(absoluteTime)));
259+
m_timerQueue->Set(this, DeadlineFromDueTime(dueTime));
241260
}
242261

243262
void WaitTimerImpl::Cancel()
@@ -285,19 +304,24 @@ namespace OS
285304
}
286305
}
287306

288-
void WaitTimer::Start(_In_ uint64_t absoluteTime) noexcept
307+
void WaitTimer::Start(_In_ uint64_t dueTime) noexcept
289308
{
290-
m_impl.load()->Start(absoluteTime);
309+
m_impl.load()->Start(dueTime);
291310
}
292311

293312
void WaitTimer::Cancel() noexcept
294313
{
295314
m_impl.load()->Cancel();
296315
}
297316

298-
uint64_t WaitTimer::GetAbsoluteTime(_In_ uint32_t msFromNow) noexcept
317+
uint64_t WaitTimer::GetCurrentTime() noexcept
318+
{
319+
return DueTimeFromDeadline(Clock::now());
320+
}
321+
322+
uint64_t WaitTimer::GetDueTime(_In_ uint32_t msFromNow) noexcept
299323
{
300-
Deadline d = std::chrono::high_resolution_clock::now() + std::chrono::milliseconds(msFromNow);
301-
return d.time_since_epoch().count();
324+
Deadline deadline = Clock::now() + std::chrono::milliseconds(msFromNow);
325+
return DueTimeFromDeadline(deadline);
302326
}
303327
} // Namespace

0 commit comments

Comments
 (0)