Skip to content

Commit 91bbfaa

Browse files
authored
Update TaskQueue with changes from Windows repo (#980)
This PR is the result of a round trip between libhttpclient and the windows codebase. There are three areas of change: 1. General cleanup / "fit" into windows codebase (no behavioral changes) 2. Address very minor race condition during CAS operation 3. Eat task queue submits during process detach ## Changes ### General Cleanup I removed the HC_UNITTEST_API #ifdefs. In the Windows codebase the task queue library is built and then merged into either the runtime DLL or the unit test DLL. This means the same library needs to link to both (we don't build it twice). The code add for unit tests is small enough I think this is fine. I also renamed SubmitPendingCallback to be plural (since now its behavior is any callback that is ready) and unwrapped the test-specific API cover fn. ### CAS Race In SubmitPendingCallbacks there was a very narrow window where the CAS is successfully exchanged and then the timer is set. There is a narrow gap here where the CAS could have been updated by another thread after this thread "wins" but before this thread changes the timer. My fix for this is to only return if the m_timerDue is still what we expect after the SetTimer call. ### Process Detach The PlayFab team and I were researching a game crash that looked like we were submitting a threadpool callback after the task queue handle was released. This turned out to be the wrong analysis. What was happening was the game was hitting some failure and calling ExitProcess while other parts of the game were still running. This ended up submitting task queue callbacks in late shutdown during process detach, and the Windows thread pool handles this by throwing an exception and crashing the process. The intent of this change is to stop masking the real problem with a process crash. This uses RtlDllShutdownInProgress, which while documented, is not exposed in any header. It does work if you just declare it and link to ntdll, but that causes a link time break for all consumers of libHttpClient since ntdll is not in the current linkage requirements. Instead, this code does a GetModuleHandle of ntdll (which is loaded into every process on Windows). We don't do this for UWP because GMH is not defined for that API set. We don't bother releasing the module handle because a) we don't want to re-acquire it on every task queue submit and b) it is never unloaded anyway, so a leaked reference doesn't matter. ## Testing / Verification * All unit tests pass both in libHttpClient and Windows repos. * Verified xCode builds. * Deployed runtime to Xbox console and verified RtlDllShutdownInProgress code works correctly in that environment.
1 parent ab6a11b commit 91bbfaa

6 files changed

Lines changed: 59 additions & 38 deletions

File tree

Source/Task/TaskQueue.cpp

Lines changed: 18 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -306,7 +306,7 @@ HRESULT TaskQueuePortImpl::Initialize(
306306
RETURN_IF_FAILED(m_timer.Initialize(this, [](void* context)
307307
{
308308
TaskQueuePortImpl* pthis = static_cast<TaskQueuePortImpl*>(context);
309-
pthis->SubmitPendingCallback();
309+
pthis->SubmitPendingCallbacks();
310310
}));
311311

312312
#ifdef _WIN32
@@ -1000,15 +1000,13 @@ void TaskQueuePortImpl::CancelPendingEntries(
10001000
}
10011001
}
10021002

1003-
#ifdef HC_UNITTEST_API
10041003
// Test hook: let unit tests enqueue a sibling delayed callback while this
10051004
// termination path still owns the interleaving window that used to race
1006-
// with SubmitPendingCallback().
1005+
// with SubmitPendingCallbacks().
10071006
if (auto hooks = portContext->GetQueue()->GetTestHooks(); hooks != nullptr)
10081007
{
10091008
hooks->PendingEntriesRemovedDuringTermination(portContext->GetType());
10101009
}
1011-
#endif
10121010

10131011
#ifdef _WIN32
10141012

@@ -1169,7 +1167,6 @@ void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
11691167
// No future entries remain in the pending list.
11701168
uint64_t noDueTime = UINT64_MAX;
11711169

1172-
#ifdef HC_UNITTEST_API
11731170
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
11741171
{
11751172
auto hooks = portContext->GetQueue()->GetTestHooks();
@@ -1180,22 +1177,20 @@ void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
11801177
dueTime);
11811178
}
11821179
});
1183-
#endif
11841180

11851181
if (m_timerDue.compare_exchange_strong(dueTime, noDueTime))
11861182
{
11871183
// Bug fix: ScheduleNextPendingCallback timer race results
11881184
// in lost delayed task wakes. Don't cancel the timer here
11891185
// as another scheduled callback could have been added.
11901186
// The CAS above is sufficient: the timer has already fired
1191-
// (call site 1: SubmitPendingCallback) or was already
1187+
// (call site 1: SubmitPendingCallbacks) or was already
11921188
// canceled (call site 2: CancelPendingEntries). A Cancel()
11931189
// here raced with concurrent QueueItem/Start calls on other
11941190
// threads, permanently stranding entries in m_pendingList.
11951191
// See VerifyDelayedCallbackTimerRaceOnManualQueue for full
11961192
// analysis. The test hook here allows unit tests to verify
11971193
// there is no race.
1198-
#ifdef HC_UNITTEST_API
11991194
m_attachedContexts.Visit([&](ITaskQueuePortContext* portContext)
12001195
{
12011196
auto hooks = portContext->GetQueue()->GetTestHooks();
@@ -1206,7 +1201,6 @@ void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
12061201
noDueTime);
12071202
}
12081203
});
1209-
#endif
12101204

12111205
// A concurrent QueueItem can append a future entry after our
12121206
// sweep has already concluded there is no next item, but before
@@ -1226,7 +1220,7 @@ void TaskQueuePortImpl::PromoteReadyPendingCallbacks(
12261220
}
12271221
}
12281222

1229-
void TaskQueuePortImpl::SubmitPendingCallback()
1223+
void TaskQueuePortImpl::SubmitPendingCallbacks()
12301224
{
12311225
while (true)
12321226
{
@@ -1261,7 +1255,14 @@ void TaskQueuePortImpl::SubmitPendingCallback()
12611255
if (m_timerDue.compare_exchange_weak(expectedDueTime, dueTime))
12621256
{
12631257
m_timer.Start(dueTime);
1264-
return;
1258+
1259+
// It's possible someone snuck a change into m_timerDue after the CAS
1260+
// but before the start call, so we've just written the wrong value to
1261+
// the timer. Verify dueTime again before returning.
1262+
if (m_timerDue.load() == dueTime)
1263+
{
1264+
return;
1265+
}
12651266
}
12661267

12671268
continue;
@@ -2464,7 +2465,6 @@ STDAPI_(bool) XTaskQueueUninitialize(
24642465
return ApiRefs::WaitZeroRefs(timeoutMilliseconds);
24652466
}
24662467

2467-
#ifdef HC_UNITTEST_API
24682468
/// <summary>
24692469
/// Sets or clears test hooks on a task queue.
24702470
/// </summary>
@@ -2479,7 +2479,11 @@ STDAPI XTaskQueueSetTestHooks(
24792479
return S_OK;
24802480
}
24812481

2482-
STDAPI XTaskQueueSubmitPendingCallbackForTests(
2482+
/// <summary>
2483+
/// Submits any pending delayed callbacks that are due to run. This is
2484+
/// intended for use in unit tests.
2485+
/// </summary>
2486+
STDAPI XTaskQueueSubmitPendingCallbacks(
24832487
_In_ XTaskQueueHandle queue,
24842488
_In_ XTaskQueuePort port
24852489
) noexcept
@@ -2490,9 +2494,7 @@ STDAPI XTaskQueueSubmitPendingCallbackForTests(
24902494
referenced_ptr<ITaskQueuePortContext> portContext;
24912495
RETURN_IF_FAILED(aq->GetPortContext(port, portContext.address_of()));
24922496

2493-
auto* portImpl = static_cast<TaskQueuePortImpl*>(portContext->GetPort());
2494-
portImpl->SubmitPendingCallbackForTests();
2497+
portContext->GetPort()->SubmitPendingCallbacks();
24952498
return S_OK;
24962499
}
2497-
#endif
24982500

Source/Task/TaskQueueImpl.h

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -215,12 +215,7 @@ class TaskQueuePortImpl: public Api<ApiId::TaskQueuePort, ITaskQueuePort>
215215
void __stdcall SuspendPort();
216216
void __stdcall ResumePort();
217217

218-
#ifdef HC_UNITTEST_API
219-
void __stdcall SubmitPendingCallbackForTests()
220-
{
221-
SubmitPendingCallback();
222-
}
223-
#endif
218+
void __stdcall SubmitPendingCallbacks();
224219

225220
private:
226221

@@ -315,8 +310,6 @@ class TaskQueuePortImpl: public Api<ApiId::TaskQueuePort, ITaskQueuePort>
315310
_In_ uint64_t dueTime,
316311
_In_ uint64_t now);
317312

318-
void SubmitPendingCallback();
319-
320313
void SignalTerminations();
321314
void ScheduleTermination(_In_ TerminationEntry* term);
322315
bool TerminationListEmpty();
@@ -408,10 +401,8 @@ class TaskQueueImpl : public Api<ApiId::TaskQueue, ITaskQueue>
408401
_In_ XTaskQueuePortHandle completionPort);
409402

410403
XTaskQueueHandle __stdcall GetHandle() override { return &m_header; }
411-
#ifdef HC_UNITTEST_API
412404
XTaskQueueTestHooks* __stdcall GetTestHooks() override { return m_testHooks; }
413405
void __stdcall SetTestHooks(_In_ XTaskQueueTestHooks* testHooks) override { m_testHooks = testHooks; }
414-
#endif
415406

416407
HRESULT __stdcall GetPortContext(
417408
_In_ XTaskQueuePort port,
@@ -483,9 +474,7 @@ class TaskQueueImpl : public Api<ApiId::TaskQueue, ITaskQueue>
483474
TerminationData m_termination;
484475
TaskQueuePortContextImpl m_work;
485476
TaskQueuePortContextImpl m_completion;
486-
#ifdef HC_UNITTEST_API
487477
XTaskQueueTestHooks* m_testHooks = nullptr;
488-
#endif
489478

490479
#ifdef SUSPEND_API
491480
SuspendResumeHandler m_suspendHandler;

Source/Task/TaskQueueP.h

Lines changed: 1 addition & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -83,6 +83,7 @@ struct ITaskQueuePort: IApi
8383
virtual void __stdcall SuspendPort() = 0;
8484
virtual void __stdcall ResumePort() = 0;
8585

86+
virtual void __stdcall SubmitPendingCallbacks() = 0;
8687
};
8788

8889
// The status of a port on the queue. This status is used in
@@ -125,10 +126,8 @@ struct ITaskQueuePortContext : IApi
125126
struct ITaskQueue : IApi
126127
{
127128
virtual XTaskQueueHandle __stdcall GetHandle() = 0;
128-
#ifdef HC_UNITTEST_API
129129
virtual XTaskQueueTestHooks* __stdcall GetTestHooks() = 0;
130130
virtual void __stdcall SetTestHooks(_In_ XTaskQueueTestHooks* testHooks) = 0;
131-
#endif
132131

133132
virtual HRESULT __stdcall GetPortContext(
134133
_In_ XTaskQueuePort port,

Source/Task/ThreadPool_win32.cpp

Lines changed: 35 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -3,6 +3,37 @@
33

44
namespace OS
55
{
6+
// This API is documented but not defined in the headers. Load it dynamcially
7+
// so the world doesn't have to add linkage to ntdll. Ntdll is not unloadable,
8+
// so safe to leak the module ref here.
9+
static inline BOOLEAN __stdcall RtlDllShutdownInProgress() noexcept
10+
{
11+
static decltype(RtlDllShutdownInProgress)* s_pfnRtlDllShutdownInProgress = nullptr;
12+
static HMODULE s_ntdllModuleHandle = nullptr;
13+
14+
// No locking needed -- if these race the threads race to copy
15+
// the same values, and worst case is we get an addl ref on
16+
// a dll we'll never unload anyway. GetModuleHandle is not defined
17+
// for UWP apps, so don't do this safety check for them.
18+
19+
#ifdef GetModuleHandle
20+
if (s_pfnRtlDllShutdownInProgress == nullptr)
21+
{
22+
if (s_ntdllModuleHandle == nullptr)
23+
{
24+
s_ntdllModuleHandle = GetModuleHandleW(L"ntdll.dll");
25+
}
26+
27+
if (s_ntdllModuleHandle != nullptr)
28+
{
29+
s_pfnRtlDllShutdownInProgress = reinterpret_cast<decltype(RtlDllShutdownInProgress)*>(GetProcAddress(s_ntdllModuleHandle, "RtlDllShutdownInProgress"));
30+
}
31+
}
32+
#endif
33+
34+
return s_pfnRtlDllShutdownInProgress ? s_pfnRtlDllShutdownInProgress() : FALSE;
35+
}
36+
637
class ThreadPoolImpl
738
{
839
public:
@@ -52,7 +83,10 @@ namespace OS
5283

5384
void Submit() noexcept
5485
{
55-
SubmitThreadpoolWork(m_work);
86+
if (!RtlDllShutdownInProgress())
87+
{
88+
SubmitThreadpoolWork(m_work);
89+
}
5690
}
5791

5892
private:

Source/Task/XTaskQueuePriv.h

Lines changed: 3 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -40,7 +40,6 @@ STDAPI_(void) XTaskQueueResumeTermination(
4040
_In_ XTaskQueueHandle queue
4141
) noexcept;
4242

43-
#ifdef HC_UNITTEST_API
4443
/// <summary>
4544
/// This structure can be passed as a pointer to the task queue so unit tests
4645
/// can hook into its behavior. Some race conditions are very difficult to get
@@ -84,15 +83,13 @@ STDAPI XTaskQueueSetTestHooks(
8483
) noexcept;
8584

8685
/// <summary>
87-
/// Directly invokes the delayed-callback notification path for unit tests.
88-
/// This is used to model stale threadpool timer callbacks that were already
89-
/// queued before the timer was retargeted.
86+
/// Submits any pending delayed callbacks that are due to run. This is
87+
/// intended for use in unit tests.
9088
/// </summary>
91-
STDAPI XTaskQueueSubmitPendingCallbackForTests(
89+
STDAPI XTaskQueueSubmitPendingCallbacks(
9290
_In_ XTaskQueueHandle queue,
9391
_In_ XTaskQueuePort port
9492
) noexcept;
95-
#endif
9693

9794
//----------------------------------------------------------------//
9895
//

Tests/UnitTests/Tests/TaskQueueTests.cpp

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -2390,7 +2390,7 @@ DEFINE_TEST_CLASS(TaskQueueTests)
23902390
// Simulate a stale delayed-callback notification that was already
23912391
// queued before the timer was re-armed for secondState. This must not
23922392
// promote the later pending entry before its own deadline.
2393-
VERIFY_SUCCEEDED(XTaskQueueSubmitPendingCallbackForTests(queue, XTaskQueuePort::Work));
2393+
VERIFY_SUCCEEDED(XTaskQueueSubmitPendingCallbacks(queue, XTaskQueuePort::Work));
23942394

23952395
VERIFY_IS_FALSE(XTaskQueueDispatch(queue, XTaskQueuePort::Work, 0));
23962396
VERIFY_IS_FALSE(XTaskQueueDispatch(queue, XTaskQueuePort::Work, 200));

0 commit comments

Comments
 (0)