-
Notifications
You must be signed in to change notification settings - Fork 41
fixes for race conditions on disconnects #249
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: master
Are you sure you want to change the base?
Changes from all commits
88cacd4
f09731e
75c5425
e69b6bf
846a43a
2fb97e8
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -61,8 +61,6 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& | |
| std::is_same<decltype(Accessor::get(server_context.call_context.getParams())), Context::Reader>::value, | ||
| kj::Promise<typename ServerContext::CallContext>>::type | ||
| { | ||
| const auto& params = server_context.call_context.getParams(); | ||
| Context::Reader context_arg = Accessor::get(params); | ||
| auto& server = server_context.proxy_server; | ||
| int req = server_context.req; | ||
| // Keep a reference to the ProxyServer instance by assigning it to the self | ||
|
|
@@ -74,8 +72,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& | |
| auto self = server.thisCap(); | ||
| auto invoke = [self = kj::mv(self), call_context = kj::mv(server_context.call_context), &server, req, fn, args...](CancelMonitor& cancel_monitor) mutable { | ||
| MP_LOG(*server.m_context.loop, Log::Debug) << "IPC server executing request #" << req; | ||
| const auto& params = call_context.getParams(); | ||
| Context::Reader context_arg = Accessor::get(params); | ||
| EventLoop& loop = *server.m_context.loop; | ||
| if (loop.testing_hook_async_request_start) loop.testing_hook_async_request_start(); | ||
| ServerContext server_context{server, call_context, req}; | ||
| { | ||
| // Before invoking the function, store a reference to the | ||
|
|
@@ -127,6 +125,8 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& | |
| server_context.request_canceled = true; | ||
| }; | ||
| // Update requests_threads map if not canceled. | ||
| const auto& params = call_context.getParams(); | ||
| Context::Reader context_arg = Accessor::get(params); | ||
| std::tie(request_thread, inserted) = SetThread( | ||
| GuardedRef{thread_context.waiter->m_mutex, request_threads}, server.m_context.connection, | ||
| [&] { return context_arg.getCallbackThread(); }); | ||
|
|
@@ -153,6 +153,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& | |
| // the disconnect handler trying to destroy the thread | ||
| // client object. | ||
| server.m_context.loop->sync([&] { | ||
| // Clear cancellation callback. At this point the | ||
| // method invocation finished and the result is | ||
| // either being returned, or discarded if a | ||
| // cancellation happened. So we do not need to be | ||
| // notified of cancellations after this point. Also | ||
| // we do not want to be notified because | ||
| // cancel_mutex and server_context could be out of | ||
| // scope when it happens. | ||
| cancel_monitor.m_on_cancel = nullptr; | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Gemini wrote a mermaid diagram Before 2fb97e8 sequenceDiagram
participant EL as Event Loop Thread
participant W as Worker Thread
participant CM as CancelMonitor (m_on_cancel)
Note over W: --- PHASE: EXECUTION ---
W->>W: fn.invoke(server_context, cancel_mutex)
Note over W: invocation finished
W->>W: [Pop Stack Frame]
Note right of W: server_context & cancel_mutex are now FREED
Note over EL, W: --- THE RACE WINDOW ---
Note right of EL: Client Disconnects NOW
EL->>CM: Trigger ~CancelProbe()
CM->>CM: Execute m_on_cancel()
CM-->>W: Access freed server_context / cancel_mutex
Note over CM: CRASH: Use-After-Free
W->>EL: m_loop->sync() (Too late!)
After 2fb97e8 sequenceDiagram
participant EL as Event Loop Thread
participant W as Worker Thread
participant CM as CancelMonitor (m_on_cancel)
Note over W: --- PHASE: EXECUTION ---
W->>W: fn.invoke(server_context, cancel_mutex)
Note over W: --- PHASE: CLEANUP ---
W->>EL: m_loop->sync() [START]
Note right of EL: SYNC BARRIER (On Event Loop)
EL->>CM: m_on_cancel = nullptr
Note right of EL: DISARMED: Callback is gone
W->>W: [Pop Stack Frame]
Note right of W: server_context & cancel_mutex are now FREED
Note over EL, W: --- THE DISCONNECT WINDOW ---
EL->>EL: Trigger ~CancelProbe()
EL->>EL: Check m_on_cancel (is NULL)
Note right of EL: SAFE: No callback to trigger
EL-->>W: sync() returns [END]
W->>W: Worker exits safely
|
||
| auto self_dispose{kj::mv(self)}; | ||
| if (erase_thread) { | ||
| // Look up the thread again without using existing | ||
|
|
@@ -183,12 +192,15 @@ auto PassField(Priority<1>, TypeList<>, ServerContext& server_context, const Fn& | |
| } | ||
| // End of scope: if KJ_DEFER was reached, it runs here | ||
| } | ||
| if (loop.testing_hook_async_request_done) loop.testing_hook_async_request_done(); | ||
| return call_context; | ||
| }; | ||
|
|
||
| // Lookup Thread object specified by the client. The specified thread should | ||
| // be a local Thread::Server object, but it needs to be looked up | ||
| // asynchronously with getLocalServer(). | ||
| const auto& params = server_context.call_context.getParams(); | ||
| Context::Reader context_arg = Accessor::get(params); | ||
| auto thread_client = context_arg.getThread(); | ||
| auto result = server.m_context.connection->m_threads.getLocalServer(thread_client) | ||
| .then([&server, invoke = kj::mv(invoke), req](const kj::Maybe<Thread::Server&>& perhaps) mutable { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||||||||||||||||||||||||||||||||||||
|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|---|
|
|
@@ -411,13 +411,16 @@ ProxyServer<ThreadMap>::ProxyServer(Connection& connection) : m_connection(conne | |||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| kj::Promise<void> ProxyServer<ThreadMap>::makeThread(MakeThreadContext context) | ||||||||||||||||||||||||||||||||||||||||
| { | ||||||||||||||||||||||||||||||||||||||||
| if (m_connection.m_loop->testing_hook_makethread) m_connection.m_loop->testing_hook_makethread(); | ||||||||||||||||||||||||||||||||||||||||
| const std::string from = context.getParams().getName(); | ||||||||||||||||||||||||||||||||||||||||
| std::promise<ThreadContext*> thread_context; | ||||||||||||||||||||||||||||||||||||||||
| std::thread thread([&thread_context, from, this]() { | ||||||||||||||||||||||||||||||||||||||||
| g_thread_context.thread_name = ThreadName(m_connection.m_loop->m_exe_name) + " (from " + from + ")"; | ||||||||||||||||||||||||||||||||||||||||
| EventLoop& loop{*m_connection.m_loop}; | ||||||||||||||||||||||||||||||||||||||||
| g_thread_context.thread_name = ThreadName(loop.m_exe_name) + " (from " + from + ")"; | ||||||||||||||||||||||||||||||||||||||||
| g_thread_context.waiter = std::make_unique<Waiter>(); | ||||||||||||||||||||||||||||||||||||||||
| thread_context.set_value(&g_thread_context); | ||||||||||||||||||||||||||||||||||||||||
| Lock lock(g_thread_context.waiter->m_mutex); | ||||||||||||||||||||||||||||||||||||||||
| thread_context.set_value(&g_thread_context); | ||||||||||||||||||||||||||||||||||||||||
| if (loop.testing_hook_makethread_created) loop.testing_hook_makethread_created(); | ||||||||||||||||||||||||||||||||||||||||
|
Comment on lines
+414
to
+422
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In "test: worker thread destroyed before it is initialized" 88cacd4 Why not initialize the loop at the top and of the method and capture it in the thread, which will make the first if statement less verbose and more readable?
Suggested change
|
||||||||||||||||||||||||||||||||||||||||
| // Wait for shutdown signal from ProxyServer<Thread> destructor (signal | ||||||||||||||||||||||||||||||||||||||||
| // is just waiter getting set to null.) | ||||||||||||||||||||||||||||||||||||||||
| g_thread_context.waiter->wait(lock, [] { return !g_thread_context.waiter; }); | ||||||||||||||||||||||||||||||||||||||||
|
|
||||||||||||||||||||||||||||||||||||||||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -9,6 +9,7 @@ | |
| #include <capnp/capability.h> | ||
| #include <capnp/rpc.h> | ||
| #include <cassert> | ||
| #include <chrono> | ||
| #include <condition_variable> | ||
| #include <cstdint> | ||
| #include <cstring> | ||
|
|
@@ -63,6 +64,7 @@ class TestSetup | |
| { | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In "test: worker thread destroyed before it is initialized" 88cacd4 Comment can be adjusted to with - * Provides client_disconnect and server_disconnect lambdas that can be used to
- * trigger disconnects and test handling of broken and closed connections.
+ * Provides disconnection lambdas that can be used to trigger
+ * disconnects and test handling of broken and closed connections.
* |
||
| public: | ||
| std::function<void()> server_disconnect; | ||
| std::function<void()> server_disconnect_later; | ||
| std::function<void()> client_disconnect; | ||
| std::promise<std::unique_ptr<ProxyClient<messages::FooInterface>>> client_promise; | ||
| std::unique_ptr<ProxyClient<messages::FooInterface>> client; | ||
|
|
@@ -88,6 +90,10 @@ class TestSetup | |
| return capnp::Capability::Client(kj::mv(server_proxy)); | ||
| }); | ||
| server_disconnect = [&] { loop.sync([&] { server_connection.reset(); }); }; | ||
| server_disconnect_later = [&] { | ||
| assert(std::this_thread::get_id() == loop.m_thread_id); | ||
| loop.m_task_set->add(kj::evalLater([&] { server_connection.reset(); })); | ||
| }; | ||
| // Set handler to destroy the server when the client disconnects. This | ||
| // is ignored if server_disconnect() is called instead. | ||
| server_connection->onDisconnect([&] { server_connection.reset(); }); | ||
|
|
@@ -325,6 +331,99 @@ KJ_TEST("Calling IPC method, disconnecting and blocking during the call") | |
| signal.set_value(); | ||
| } | ||
|
|
||
| KJ_TEST("Worker thread destroyed before it is initialized") | ||
| { | ||
| // Regression test for bitcoin/bitcoin#34711, bitcoin/bitcoin#34756 | ||
| // where worker thread is destroyed before it starts. | ||
| // | ||
| // The test works by using the `makethread` hook to start a disconnect as | ||
| // soon as ProxyServer<ThreadMap>::makeThread is called, and using the | ||
| // `makethread_created` hook to sleep 100ms after the thread is created but | ||
|
Member
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. In 88cacd4 test: worker thread destroyed before it is initialized: nit, it's only waiting 10ms |
||
| // before it starts waiting, so without the bugfix, | ||
| // ProxyServer<Thread>::~ProxyServer would run and destroy the waiter, | ||
| // causing a SIGSEGV in the worker thread after the sleep. | ||
| TestSetup setup; | ||
| ProxyClient<messages::FooInterface>* foo = setup.client.get(); | ||
| foo->initThreadMap(); | ||
| setup.server->m_impl->m_fn = [] {}; | ||
|
|
||
| EventLoop& loop = *setup.server->m_context.connection->m_loop; | ||
| loop.testing_hook_makethread = [&] { | ||
| setup.server_disconnect_later(); | ||
| }; | ||
| loop.testing_hook_makethread_created = [&] { | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
| }; | ||
|
|
||
| bool disconnected{false}; | ||
| try { | ||
| foo->callFnAsync(); | ||
| } catch (const std::runtime_error& e) { | ||
| KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); | ||
| disconnected = true; | ||
| } | ||
| KJ_EXPECT(disconnected); | ||
| } | ||
|
|
||
| KJ_TEST("Calling async IPC method, with server disconnect racing the call") | ||
| { | ||
| // Regression test for bitcoin/bitcoin#34777 heap-use-after-free where | ||
| // an async request is canceled before it starts to execute. | ||
| // | ||
| // Use testing_hook_async_request_start to trigger a disconnect from the | ||
| // worker thread as soon as it begins to execute an async request. Without | ||
| // the bugfix, the worker thread would trigger a SIGSEGV after this by | ||
| // calling call_context.getParams(). | ||
| TestSetup setup; | ||
| ProxyClient<messages::FooInterface>* foo = setup.client.get(); | ||
| foo->initThreadMap(); | ||
| setup.server->m_impl->m_fn = [] {}; | ||
|
|
||
| EventLoop& loop = *setup.server->m_context.connection->m_loop; | ||
| loop.testing_hook_async_request_start = [&] { | ||
| setup.server_disconnect(); | ||
| // Sleep is neccessary to let the event loop fully clean up after the | ||
| // disconnect and trigger the SIGSEGV. | ||
| std::this_thread::sleep_for(std::chrono::milliseconds(10)); | ||
| }; | ||
|
|
||
| try { | ||
| foo->callFnAsync(); | ||
| KJ_EXPECT(false); | ||
| } catch (const std::runtime_error& e) { | ||
| KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); | ||
| } | ||
| } | ||
|
|
||
| KJ_TEST("Calling async IPC method, with server disconnect after cleanup") | ||
| { | ||
| // Regression test for bitcoin/bitcoin#34782 stack-use-after-return where | ||
| // an async request is canceled after it finishes executing but before the | ||
| // response is sent. | ||
| // | ||
| // Use testing_hook_async_request_done to trigger a disconnect from the | ||
| // worker thread after it execute an async requests but before it returns. | ||
| // Without the bugfix, the m_on_cancel callback would be called at this | ||
| // point accessing the cancel_mutex stack variable that had gone out of | ||
| // scope. | ||
| TestSetup setup; | ||
| ProxyClient<messages::FooInterface>* foo = setup.client.get(); | ||
| foo->initThreadMap(); | ||
| setup.server->m_impl->m_fn = [] {}; | ||
|
|
||
| EventLoop& loop = *setup.server->m_context.connection->m_loop; | ||
| loop.testing_hook_async_request_done = [&] { | ||
| setup.server_disconnect(); | ||
| }; | ||
|
|
||
| try { | ||
| foo->callFnAsync(); | ||
| KJ_EXPECT(false); | ||
| } catch (const std::runtime_error& e) { | ||
| KJ_EXPECT(std::string_view{e.what()} == "IPC client method call interrupted by disconnect."); | ||
| } | ||
| } | ||
|
|
||
| KJ_TEST("Make simultaneous IPC calls on single remote thread") | ||
| { | ||
| TestSetup setup; | ||
|
|
||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
In e69b6bf race fix: getParams() called after request cancel
Perhaps add a comment here (also trying to test my understanding of the fix)