Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
16 changes: 16 additions & 0 deletions include/mp/proxy-io.h
Original file line number Diff line number Diff line change
Expand Up @@ -340,6 +340,22 @@ class EventLoop

//! External context pointer.
void* m_context;

//! Hook called when ProxyServer<ThreadMap>::makeThread() is called.
std::function<void()> testing_hook_makethread;

//! Hook called on the worker thread inside makeThread(), after the thread
//! context is set up and thread_context promise is fulfilled, but before it
//! starts waiting for requests.
std::function<void()> testing_hook_makethread_created;

//! Hook called on the worker thread when it starts to execute an async
//! request. Used by tests to control timing or inject behavior at this
//! point in execution.
std::function<void()> testing_hook_async_request_start;

//! Hook called on the worker thread just before returning results.
std::function<void()> testing_hook_async_request_done;
};

//! Single element task queue used to handle recursive capnp calls. (If the
Expand Down
20 changes: 16 additions & 4 deletions include/mp/type-context.h
Original file line number Diff line number Diff line change
Expand Up @@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value,
kj::Promise<typename ServerContext::CallContext>>::type
{
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto& server = server_context.proxy_server;
int req = server_context.req;
// Keep a reference to the ProxyServer instance by assigning it to the self
Expand All @@ -74,8 +72,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
auto self = server.thisCap();
auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable {
MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req;
const auto& params = call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
EventLoop& loop = *server.m_context.loop;
if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start();
ServerContext server_context{server, call_context, req};
{
// Before invoking the function, store a reference to the
Expand Down Expand Up @@ -127,6 +125,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
server_context.request_canceled = true;
};
// Update requests_threads map if not canceled.
const auto& params = call_context.getParams();
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In e69b6bf race fix: getParams() called after request cancel

Perhaps add a comment here (also trying to test my understanding of the fix)

// It is safe to call getParams() here because this code runs inside sync(),
// which executes on the event loop thread. Even if cancellation is initiated
// by the client at this point, the params structs will only be freed after
// sync() returns and the event loop resumes processing the cancellation.

Context::Reader context_arg = Accessor::get(params);
std::tie(request_thread, inserted) = SetThread(
GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection,
[&] { return context_arg.getCallbackThread(); });
Expand All @@ -153,6 +153,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
// the disconnect handler trying to destroy the thread
// client object.
server.m_context.loop->sync([&] {
// Clear cancellation callback. At this point the
// method invocation finished and the result is
// either being returned, or discarded if a
// cancellation happened. So we do not need to be
// notified of cancellations after this point. Also
// we do not want to be notified because
// cancel_mutex and server_context could be out of
// scope when it happens.
cancel_monitor.m_on_cancel = nullptr;
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Gemini wrote a mermaid diagram

Before 2fb97e8

sequenceDiagram
    participant EL as Event Loop Thread
    participant W as Worker Thread
    participant CM as CancelMonitor (m_on_cancel)
    Note over W: --- PHASE: EXECUTION ---
    W->>W: fn.invoke(server_context, cancel_mutex)
    Note over W: invocation finished
    W->>W: [Pop Stack Frame]
    Note right of W: server_context & cancel_mutex are now FREED
    Note over EL, W: --- THE RACE WINDOW ---
    Note right of EL: Client Disconnects NOW
    EL->>CM: Trigger ~CancelProbe()
    CM->>CM: Execute m_on_cancel()
    CM-->>W: Access freed server_context / cancel_mutex
    Note over CM: CRASH: Use-After-Free
    W->>EL: m_loop->sync() (Too late!)
Loading

After 2fb97e8

sequenceDiagram
    participant EL as Event Loop Thread
    participant W as Worker Thread
    participant CM as CancelMonitor (m_on_cancel)
    Note over W: --- PHASE: EXECUTION ---
    W->>W: fn.invoke(server_context, cancel_mutex)
    Note over W: --- PHASE: CLEANUP ---
    W->>EL: m_loop->sync() [START]
    Note right of EL: SYNC BARRIER (On Event Loop)
    EL->>CM: m_on_cancel = nullptr
    Note right of EL: DISARMED: Callback is gone
    W->>W: [Pop Stack Frame]
    Note right of W: server_context & cancel_mutex are now FREED
    Note over EL, W: --- THE DISCONNECT WINDOW ---
    EL->>EL: Trigger ~CancelProbe()
    EL->>EL: Check m_on_cancel (is NULL)
    Note right of EL: SAFE: No callback to trigger
    EL-->>W: sync() returns [END]
    W->>W: Worker exits safely
Loading

auto self_dispose{kj::mv(self)};
if (erase_thread) {
// Look up the thread again without using existing
Expand Down Expand Up @@ -183,12 +192,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn&
}
// End of scope: if KJ_DEFER was reached, it runs here
}
if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done();
return call_context;
};

// Lookup Thread object specified by the client. The specified thread should
// be a local Thread::Server object, but it needs to be looked up
// asynchronously with getLocalServer().
const auto& params = server_context.call_context.getParams();
Context::Reader context_arg = Accessor::get(params);
auto thread_client = context_arg.getThread();
auto result = server.m_context.connection->m_threads.getLocalServer(thread_client)
.then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable {
Expand Down
7 changes: 5 additions & 2 deletions src/mp/proxy.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -411,13 +411,16 @@ ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(conne

kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context)
{
if (m_connection.m_loop->testing_hook_makethread) m_connection.m_loop->testing_hook_makethread();
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
EventLoop& loop{*m_connection.m_loop};
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
Lock lock(g_thread_context.waiter->m_mutex);
thread_context.set_value(&g_thread_context);
if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
Comment on lines +414 to +422
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In "test: worker thread destroyed before it is initialized" 88cacd4

Why not initialize the loop at the top and of the method and capture it in the thread, which will make the first if statement less verbose and more readable?

Suggested change
if (m_connection.m_loop->testing_hook_makethread) m_connection.m_loop->testing_hook_makethread();
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, from, this]() {
g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")";
EventLoop& loop{*m_connection.m_loop};
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();
EventLoop& loop{*m_connection.m_loop};
if (loop.testing_hook_makethread) loop.testing_hook_makethread();
const std::string from = context.getParams().getName();
std::promise<ThreadContext*> thread_context;
std::thread thread([&thread_context, &loop, from, this]() {
g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")";
g_thread_context.waiter = std::make_unique<Waiter>();
thread_context.set_value(&g_thread_context);
if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created();

// Wait for shutdown signal from ProxyServer<Thread> destructor (signal
// is just waiter getting set to null.)
g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; });
Expand Down
99 changes: 99 additions & 0 deletions test/mp/test/test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -9,6 +9,7 @@
#include <capnp/capability.h>
#include <capnp/rpc.h>
#include <cassert>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstring>
Expand Down Expand Up @@ -63,6 +64,7 @@ class TestSetup
{
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In "test: worker thread destroyed before it is initialized" 88cacd4

Comment can be adjusted to with disconnect_later addition

- * Provides client_disconnect and server_disconnect lambdas that can be used to
- * trigger disconnects and test handling of broken and closed connections.
+ * Provides disconnection lambdas that can be used to trigger
+ * disconnects and test handling of broken and closed connections.
  *

public:
std::function<void()> server_disconnect;
std::function<void()> server_disconnect_later;
std::function<void()> client_disconnect;
std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise;
std::unique_ptr<ProxyClient<messages::FooInterface>> client;
Expand All @@ -88,6 +90,10 @@ class TestSetup
return capnp::Capability::Client(kj::mv(server_proxy));
});
server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); };
server_disconnect_later = [&] {
assert(std::this_thread::get_id() == loop.m_thread_id);
loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); }));
};
// Set handler to destroy the server when the client disconnects. This
// is ignored if server_disconnect() is called instead.
server_connection->onDisconnect([&] { server_connection.reset(); });
Expand Down Expand Up @@ -325,6 +331,99 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call")
signal.set_value();
}

KJ_TEST("Worker thread destroyed before it is initialized")
{
// Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756
// where worker thread is destroyed before it starts.
//
// The test works by using the `makethread` hook to start a disconnect as
// soon as ProxyServer<ThreadMap>::makeThread is called, and using the
// `makethread_created` hook to sleep 100ms after the thread is created but
Copy link
Member

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

In 88cacd4 test: worker thread destroyed before it is initialized: nit, it's only waiting 10ms

// before it starts waiting, so without the bugfix,
// ProxyServer<Thread>::~ProxyServer would run and destroy the waiter,
// causing a SIGSEGV in the worker thread after the sleep.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_makethread = [&] {
setup.server_disconnect_later();
};
loop.testing_hook_makethread_created = [&] {
std::this_thread::sleep_for(std::chrono::milliseconds(10));
};

bool disconnected{false};
try {
foo->callFnAsync();
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
disconnected = true;
}
KJ_EXPECT(disconnected);
}

KJ_TEST("Calling async IPC method, with server disconnect racing the call")
{
// Regression test for bitcoin/bitcoin#34777 heap-use-after-free where
// an async request is canceled before it starts to execute.
//
// Use testing_hook_async_request_start to trigger a disconnect from the
// worker thread as soon as it begins to execute an async request. Without
// the bugfix, the worker thread would trigger a SIGSEGV after this by
// calling call_context.getParams().
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_async_request_start = [&] {
setup.server_disconnect();
// Sleep is neccessary to let the event loop fully clean up after the
// disconnect and trigger the SIGSEGV.
std::this_thread::sleep_for(std::chrono::milliseconds(10));
};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
}

KJ_TEST("Calling async IPC method, with server disconnect after cleanup")
{
// Regression test for bitcoin/bitcoin#34782 stack-use-after-return where
// an async request is canceled after it finishes executing but before the
// response is sent.
//
// Use testing_hook_async_request_done to trigger a disconnect from the
// worker thread after it execute an async requests but before it returns.
// Without the bugfix, the m_on_cancel callback would be called at this
// point accessing the cancel_mutex stack variable that had gone out of
// scope.
TestSetup setup;
ProxyClient<messages::FooInterface>* foo = setup.client.get();
foo->initThreadMap();
setup.server->m_impl->m_fn = [] {};

EventLoop& loop = *setup.server->m_context.connection->m_loop;
loop.testing_hook_async_request_done = [&] {
setup.server_disconnect();
};

try {
foo->callFnAsync();
KJ_EXPECT(false);
} catch (const std::runtime_error& e) {
KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect.");
}
}

KJ_TEST("Make simultaneous IPC calls on single remote thread")
{
TestSetup setup;
Expand Down
Loading