Skip to content

Commit 6159bbc

Browse files
Cleanup, shutdown local participants before
1 parent dd91cd9 commit 6159bbc

6 files changed

Lines changed: 56 additions & 93 deletions

File tree

.github/workflows/tests.yml

Lines changed: 1 addition & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -261,11 +261,7 @@ jobs:
261261
timeout-minutes: 5
262262
shell: bash
263263
env:
264-
# Bump livekit_ffi to debug so handle-table errors (the exact Rust
265-
# message that produces our INVALID_HANDLE → terminate path) land
266-
# in the CI log. Keep livekit at info to avoid drowning out the
267-
# interesting lines.
268-
RUST_LOG: "metrics=debug,livekit_ffi=debug,livekit=info"
264+
RUST_LOG: "metrics=debug"
269265
run: |
270266
set -euo pipefail
271267
source .token_helpers/set_data_track_test_tokens.bash

src/ffi_client.cpp

Lines changed: 0 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -202,10 +202,6 @@ proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) cons
202202
const FfiHandleId handle =
203203
livekit_ffi_request(reinterpret_cast<const uint8_t*>(bytes.data()), bytes.size(), &resp_ptr, &resp_len);
204204
if (handle == INVALID_HANDLE) {
205-
LK_LOG_ERROR(
206-
"FfiClient::sendRequest: livekit_ffi_request returned INVALID_HANDLE; "
207-
"request.message_case={}, bytes_len={}",
208-
static_cast<int>(request.message_case()), bytes.size());
209205
throw std::runtime_error("failed to send request, received an invalid handle");
210206
}
211207

src/ffi_handle.cpp

Lines changed: 0 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,7 +17,6 @@
1717
#include "livekit/ffi_handle.h"
1818

1919
#include "livekit_ffi.h"
20-
#include "lk_log.h"
2120

2221
namespace livekit {
2322

@@ -36,7 +35,6 @@ FfiHandle& FfiHandle::operator=(FfiHandle&& other) noexcept {
3635

3736
void FfiHandle::reset(uintptr_t new_handle) noexcept {
3837
if (handle_) {
39-
LK_LOG_INFO("FfiHandle::reset: dropping handle {} (replacement={})", handle_, new_handle);
4038
livekit_ffi_drop_handle(handle_);
4139
}
4240
handle_ = new_handle;

src/local_participant.cpp

Lines changed: 1 addition & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -370,17 +370,13 @@ void LocalParticipant::shutdown() {
370370
void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const std::string& method,
371371
const std::string& request_id, const std::string& caller_identity,
372372
const std::string& payload, double response_timeout_sec) {
373-
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: entry (handle={}, invocation_id={}, method={})",
374-
ffiHandleId(), invocation_id, method);
375373
// Capture shared state so it outlives LocalParticipant if needed
376374
auto state = rpc_state_;
377375

378376
// Track this invocation and check if we're shutting down
379377
{
380378
const std::scoped_lock<std::mutex> lock(state->mutex);
381379
if (state->shutting_down) {
382-
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: shutting_down, skipping (invocation_id={})",
383-
invocation_id);
384380
// Already shutting down, don't process new invocations
385381
return;
386382
}
@@ -427,19 +423,14 @@ void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const s
427423
{
428424
const std::scoped_lock<std::mutex> lock(state->mutex);
429425
if (state->shutting_down) {
430-
LK_LOG_DEBUG(
431-
"LocalParticipant::handleRpcMethodInvocation: shutdown during handler, skipping response "
432-
"(invocation_id={})",
433-
invocation_id);
434426
// Shutdown started, don't send response - handle may be invalid
435427
return;
436428
}
437429
}
438430

439431
FfiRequest req;
440432
auto* msg = req.mutable_rpc_method_invocation_response();
441-
const auto handle_id_at_send = ffiHandleId();
442-
msg->set_local_participant_handle(handle_id_at_send);
433+
msg->set_local_participant_handle(ffiHandleId());
443434
msg->set_invocation_id(invocation_id);
444435
if (response_error.has_value()) {
445436
auto* err_proto = msg->mutable_error();
@@ -448,8 +439,6 @@ void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const s
448439
if (response_payload.has_value()) {
449440
msg->set_payload(*response_payload);
450441
}
451-
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: sending response (handle={}, invocation_id={})",
452-
handle_id_at_send, invocation_id);
453442
FfiClient::instance().sendRequest(req);
454443
}
455444

src/room.cpp

Lines changed: 50 additions & 65 deletions
Original file line numberDiff line numberDiff line change
@@ -76,42 +76,17 @@ void readyForRoomEvent(std::uint64_t room_handle) {
7676
Room::Room() : subscription_thread_dispatcher_(std::make_unique<SubscriptionThreadDispatcher>()) {}
7777

7878
Room::~Room() {
79-
LK_LOG_INFO("Room::~Room: entry (this={})", static_cast<const void*>(this));
80-
// Issue a graceful disconnect so the server sees us leave instead of
81-
// timing out (RAII expectation; see issue #118). disconnect() does the
82-
// full teardown including subscription threads, listener, and local
83-
// participant, so the destructor only needs to handle the
84-
// already-disconnected path.
79+
// disconnect() is used for all tear down cases: it handles the
80+
// already-disconnected case (returns false, no-op), the partial/Reconnecting
81+
// case, and the FFI-failure case (local teardown still runs). Nothing else
82+
// needs to live in the destructor.
8583
try {
86-
disconnect();
84+
(void)disconnect(); // Don't need return value
8785
} catch (const std::exception& e) {
8886
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: {}", e.what());
8987
} catch (...) {
9088
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: unknown exception");
9189
}
92-
93-
// Defensive: if disconnect() bailed early (e.g. never connected), still
94-
// tear down any state that may have leaked.
95-
if (subscription_thread_dispatcher_) {
96-
subscription_thread_dispatcher_->stopAll();
97-
}
98-
99-
int listener_to_remove = 0;
100-
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
101-
{
102-
const std::scoped_lock<std::mutex> g(lock_);
103-
listener_to_remove = listener_id_;
104-
listener_id_ = 0;
105-
local_participant_to_cleanup = std::move(local_participant_);
106-
}
107-
108-
if (local_participant_to_cleanup) {
109-
local_participant_to_cleanup->shutdown();
110-
}
111-
112-
if (listener_to_remove != 0) {
113-
FfiClient::instance().removeListener(listener_to_remove);
114-
}
11590
}
11691

11792
void Room::setDelegate(RoomDelegate* delegate) {
@@ -246,66 +221,79 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
246221

247222
bool Room::disconnect(DisconnectReason reason) {
248223
TRACE_EVENT0("livekit", "Room::disconnect");
249-
LK_LOG_INFO("Room::disconnect: entry (this={}, reason={})", static_cast<const void*>(this), static_cast<int>(reason));
250224

251-
// Hold onto this in case the
252-
auto prev_connection_state = connection_state_;
225+
// Canonical teardown path. Move all owned state out under the lock, then
226+
// operate on it outside the lock. The destructor (and any caller) gets
227+
// the same behavior: once this returns, the Room is fully torn down.
228+
//
229+
// Return value:
230+
// true - we owned live state and tore it down (FFI disconnect succeeded)
231+
// false - either already disconnected (no-op) or FFI disconnect failed.
232+
// In both false cases local-side teardown still completed.
253233

254234
std::shared_ptr<FfiHandle> handle;
255235
RoomDelegate* delegate_snapshot = nullptr;
236+
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
237+
std::unordered_map<std::string, std::shared_ptr<RemoteParticipant>> remote_participants_to_clear;
238+
std::unique_ptr<E2EEManager> e2ee_manager_to_clear;
239+
std::unordered_map<std::string, std::shared_ptr<TextStreamReader>> text_stream_readers_to_clear;
240+
std::unordered_map<std::string, std::shared_ptr<ByteStreamReader>> byte_stream_readers_to_clear;
241+
int listener_to_remove = 0;
242+
256243
{
257244
const std::scoped_lock<std::mutex> g(lock_);
258245
if (connection_state_ == ConnectionState::Disconnected) {
259-
LK_LOG_INFO("Room::disconnect: already disconnected, returning false (this={})", static_cast<const void*>(this));
246+
// Already torn down (or never connected). Nothing to do.
260247
return false;
261248
}
262249
handle = room_handle_;
263250
delegate_snapshot = delegate_;
251+
// Take ownership of everything under the lock so the kEos handler (which
252+
// also tries to move it out) loses any race here — only one teardown
253+
// path operates on this state.
254+
local_participant_to_cleanup = std::move(local_participant_);
255+
remote_participants_to_clear = std::move(remote_participants_);
256+
e2ee_manager_to_clear = std::move(e2ee_manager_);
257+
text_stream_readers_to_clear = std::move(text_stream_readers_);
258+
byte_stream_readers_to_clear = std::move(byte_stream_readers_);
259+
listener_to_remove = listener_id_;
260+
listener_id_ = 0;
261+
room_handle_.reset();
264262
// Flip state immediately so the in-flight Disconnected room-event we'll
265263
// get back doesn't double-fire onDisconnected. Mirrors Python's
266264
// Room.disconnect(), which also flips state before sending the request.
267265
connection_state_ = ConnectionState::Disconnected;
268266
}
269267

270-
// Tell the FFI to close the room and wait for the callback. Catch the
271-
// exception so we still run teardown below; the caller learns about the
272-
// failure via the returned bool / logs.
268+
// Drain in-flight RPC handlers BEFORE telling Rust to tear down the room.
269+
// Mirrors client-sdk-python's Room.disconnect() ordering: once the FFI
270+
// dispatches the Disconnect, Rust starts invalidating participant handles
271+
// in its table, and any listener-thread RPC handler still mid-flight
272+
// would race with that invalidation and send to a dead handle →
273+
// INVALID_HANDLE → terminate.
274+
if (local_participant_to_cleanup) {
275+
local_participant_to_cleanup->shutdown();
276+
}
277+
278+
// Tell the FFI to close the room and wait for the callback. If this fails
279+
// we still complete local-side teardown below — releasing the listener,
280+
// dropping handles, and notifying the delegate — so the Room is fully
281+
// cleaned up regardless of whether the FFI round-trip succeeded.
273282
bool ffi_ok = true;
274283
if (handle) {
275284
try {
276285
FfiClient::instance().disconnectAsync(handle->get(), reason).get();
277286
} catch (const std::exception& e) {
278-
LK_LOG_ERROR("Room::disconnect: FFI disconnect failed: {}", e.what());
287+
LK_LOG_ERROR("Room::disconnect: FFI disconnect failed (continuing local teardown): {}", e.what());
279288
ffi_ok = false;
280289
}
281290
}
282291

283-
// Stop dispatcher first so no track callbacks fire mid-teardown.
292+
// Stop dispatcher so no track callbacks fire mid-teardown.
284293
if (subscription_thread_dispatcher_) {
285294
subscription_thread_dispatcher_->stopAll();
286295
}
287296

288-
int listener_to_remove = 0;
289-
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
290-
{
291-
const std::scoped_lock<std::mutex> g(lock_);
292-
listener_to_remove = listener_id_;
293-
listener_id_ = 0;
294-
local_participant_to_cleanup = std::move(local_participant_);
295-
remote_participants_.clear();
296-
room_handle_.reset();
297-
e2ee_manager_.reset();
298-
text_stream_readers_.clear();
299-
byte_stream_readers_.clear();
300-
}
301-
302-
// Shut down local participant (unregisters RPC handlers, etc.) before
303-
// removing the listener, so in-flight RPC responses don't reach a
304-
// destroyed handle.
305-
if (local_participant_to_cleanup) {
306-
local_participant_to_cleanup->shutdown();
307-
}
308-
309297
if (listener_to_remove != 0) {
310298
FfiClient::instance().removeListener(listener_to_remove);
311299
}
@@ -323,6 +311,8 @@ bool Room::disconnect(DisconnectReason reason) {
323311
}
324312
}
325313

314+
// Moved-out state (local participant, remote participants, e2ee manager,
315+
// stream readers) destructs here, releasing FFI handles.
326316
return ffi_ok;
327317
}
328318

@@ -1231,8 +1221,6 @@ void Room::onEvent(const FfiEvent& event) {
12311221
break;
12321222
}
12331223
case proto::RoomEvent::kDisconnected: {
1234-
LK_LOG_INFO("Room::onFfiEvent: kDisconnected received (this={}, reason={})", static_cast<const void*>(this),
1235-
static_cast<int>(re.disconnected().reason()));
12361224
// If disconnect() was driven from our side, it already flipped state
12371225
// to Disconnected and fired the delegate; skip the duplicate here.
12381226
bool already_disconnected = false;
@@ -1266,7 +1254,6 @@ void Room::onEvent(const FfiEvent& event) {
12661254
break;
12671255
}
12681256
case proto::RoomEvent::kEos: {
1269-
LK_LOG_INFO("Room::onFfiEvent: kEos received (this={})", static_cast<const void*>(this));
12701257
if (subscription_thread_dispatcher_) {
12711258
subscription_thread_dispatcher_->stopAll();
12721259
}
@@ -1304,8 +1291,6 @@ void Room::onEvent(const FfiEvent& event) {
13041291
// without this, a listener-thread RPC handler can race with handle
13051292
// disposal and send to a dead handle → INVALID_HANDLE → terminate.
13061293
if (old_local_participant) {
1307-
LK_LOG_INFO("Room::onFfiEvent: kEos shutting down local participant (handle={})",
1308-
old_local_participant->ffiHandleId());
13091294
old_local_participant->shutdown();
13101295
}
13111296

src/tests/integration/test_room.cpp

Lines changed: 4 additions & 5 deletions
Original file line numberDiff line numberDiff line change
@@ -96,19 +96,19 @@ TEST_F(RoomTest, UserDisconnect) {
9696
ASSERT_EQ(room.connectionState(), ConnectionState::Connected);
9797
ASSERT_NE(room.localParticipant(), nullptr);
9898

99-
EXPECT_TRUE(room.disconnect()) << "disconnect should report success on a connected room";
99+
EXPECT_NO_THROW(room.disconnect()) << "disconnect should not throw on a connected room";
100100
EXPECT_EQ(room.connectionState(), ConnectionState::Disconnected);
101101
EXPECT_EQ(room.localParticipant(), nullptr) << "local participant should be cleared after disconnect";
102102
EXPECT_EQ(delegate.count.load(), 1) << "onDisconnected should fire exactly once";
103103
EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated);
104104

105105
// Calling again on an already-disconnected room is a no-op
106-
EXPECT_FALSE(room.disconnect()) << "second disconnect should report no-op";
106+
EXPECT_NO_THROW(room.disconnect()) << "second disconnect should not throw on an already-disconnected room";
107107
EXPECT_EQ(delegate.count.load(), 1) << "delegate must not double-fire";
108108
}
109109

110110
// Case: Room goes out of scope while still connected
111-
TEST_F(RoomTest, RoomDestructorDisconnect) {
111+
TEST_F(RoomTest, DestructorDisconnect) {
112112
std::unique_ptr<Room> room = std::make_unique<Room>();
113113

114114
DisconnectTrackingDelegate delegate;
@@ -117,9 +117,8 @@ TEST_F(RoomTest, RoomDestructorDisconnect) {
117117
ASSERT_TRUE(room->connect(server_url_, token_, options));
118118
ASSERT_EQ(room->connectionState(), ConnectionState::Connected);
119119

120-
room.reset();
120+
room.reset(); // invokes destructor which calls disconnect()
121121

122-
// Let room go out of scope while still connected
123122
EXPECT_EQ(delegate.count.load(), 1) << "destructor should fire onDisconnected exactly once";
124123
EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated);
125124
}

0 commit comments

Comments
 (0)