Skip to content

Commit dd91cd9

Browse files
Drain RPC invocations on kEos + add lifecycle tracing
The kEos RoomEvent handler moves out and destroys the local participant without first calling shutdown(). If the listener thread is mid-RPC- invocation when kEos arrives, the participant's FfiHandle is dropped while a handler is still in flight; the handler's later sendRequest then hits an invalid Rust handle and the uncaught std::runtime_error on the listener thread aborts the process. Mirror the ordering used in disconnect(): call shutdown() on the moved-out participant before letting it destruct. Add INFO-level tracing at the key lifecycle points (~Room, disconnect, kDisconnected, kEos, FfiHandle drops, INVALID_HANDLE error path) and DEBUG tracing on the RPC invocation hot path. Bump RUST_LOG in the integration-test CI step so the Rust side's "handle not found" error message lands in the log. Co-Authored-By: Claude Opus 4.7 <noreply@anthropic.com>
1 parent 12946e2 commit dd91cd9

5 files changed

Lines changed: 39 additions & 2 deletions

File tree

.github/workflows/tests.yml

Lines changed: 5 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -261,7 +261,11 @@ jobs:
261261
timeout-minutes: 5
262262
shell: bash
263263
env:
264-
RUST_LOG: "metrics=debug"
264+
# Bump livekit_ffi to debug so handle-table errors (the exact Rust
265+
# message that produces our INVALID_HANDLE → terminate path) land
266+
# in the CI log. Keep livekit at info to avoid drowning out the
267+
# interesting lines.
268+
RUST_LOG: "metrics=debug,livekit_ffi=debug,livekit=info"
265269
run: |
266270
set -euo pipefail
267271
source .token_helpers/set_data_track_test_tokens.bash

src/ffi_client.cpp

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -202,6 +202,10 @@ proto::FfiResponse FfiClient::sendRequest(const proto::FfiRequest& request) cons
202202
const FfiHandleId handle =
203203
livekit_ffi_request(reinterpret_cast<const uint8_t*>(bytes.data()), bytes.size(), &resp_ptr, &resp_len);
204204
if (handle == INVALID_HANDLE) {
205+
LK_LOG_ERROR(
206+
"FfiClient::sendRequest: livekit_ffi_request returned INVALID_HANDLE; "
207+
"request.message_case={}, bytes_len={}",
208+
static_cast<int>(request.message_case()), bytes.size());
205209
throw std::runtime_error("failed to send request, received an invalid handle");
206210
}
207211

src/ffi_handle.cpp

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
#include "livekit/ffi_handle.h"
1818

1919
#include "livekit_ffi.h"
20+
#include "lk_log.h"
2021

2122
namespace livekit {
2223

@@ -35,6 +36,7 @@ FfiHandle& FfiHandle::operator=(FfiHandle&& other) noexcept {
3536

3637
void FfiHandle::reset(uintptr_t new_handle) noexcept {
3738
if (handle_) {
39+
LK_LOG_INFO("FfiHandle::reset: dropping handle {} (replacement={})", handle_, new_handle);
3840
livekit_ffi_drop_handle(handle_);
3941
}
4042
handle_ = new_handle;

src/local_participant.cpp

Lines changed: 12 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -370,13 +370,17 @@ void LocalParticipant::shutdown() {
370370
void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const std::string& method,
371371
const std::string& request_id, const std::string& caller_identity,
372372
const std::string& payload, double response_timeout_sec) {
373+
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: entry (handle={}, invocation_id={}, method={})",
374+
ffiHandleId(), invocation_id, method);
373375
// Capture shared state so it outlives LocalParticipant if needed
374376
auto state = rpc_state_;
375377

376378
// Track this invocation and check if we're shutting down
377379
{
378380
const std::scoped_lock<std::mutex> lock(state->mutex);
379381
if (state->shutting_down) {
382+
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: shutting_down, skipping (invocation_id={})",
383+
invocation_id);
380384
// Already shutting down, don't process new invocations
381385
return;
382386
}
@@ -423,14 +427,19 @@ void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const s
423427
{
424428
const std::scoped_lock<std::mutex> lock(state->mutex);
425429
if (state->shutting_down) {
430+
LK_LOG_DEBUG(
431+
"LocalParticipant::handleRpcMethodInvocation: shutdown during handler, skipping response "
432+
"(invocation_id={})",
433+
invocation_id);
426434
// Shutdown started, don't send response - handle may be invalid
427435
return;
428436
}
429437
}
430438

431439
FfiRequest req;
432440
auto* msg = req.mutable_rpc_method_invocation_response();
433-
msg->set_local_participant_handle(ffiHandleId());
441+
const auto handle_id_at_send = ffiHandleId();
442+
msg->set_local_participant_handle(handle_id_at_send);
434443
msg->set_invocation_id(invocation_id);
435444
if (response_error.has_value()) {
436445
auto* err_proto = msg->mutable_error();
@@ -439,6 +448,8 @@ void LocalParticipant::handleRpcMethodInvocation(uint64_t invocation_id, const s
439448
if (response_payload.has_value()) {
440449
msg->set_payload(*response_payload);
441450
}
451+
LK_LOG_DEBUG("LocalParticipant::handleRpcMethodInvocation: sending response (handle={}, invocation_id={})",
452+
handle_id_at_send, invocation_id);
442453
FfiClient::instance().sendRequest(req);
443454
}
444455

src/room.cpp

Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,7 @@ void readyForRoomEvent(std::uint64_t room_handle) {
7676
Room::Room() : subscription_thread_dispatcher_(std::make_unique<SubscriptionThreadDispatcher>()) {}
7777

7878
Room::~Room() {
79+
LK_LOG_INFO("Room::~Room: entry (this={})", static_cast<const void*>(this));
7980
// Issue a graceful disconnect so the server sees us leave instead of
8081
// timing out (RAII expectation; see issue #118). disconnect() does the
8182
// full teardown including subscription threads, listener, and local
@@ -245,6 +246,7 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
245246

246247
bool Room::disconnect(DisconnectReason reason) {
247248
TRACE_EVENT0("livekit", "Room::disconnect");
249+
LK_LOG_INFO("Room::disconnect: entry (this={}, reason={})", static_cast<const void*>(this), static_cast<int>(reason));
248250

249251
// Hold onto this in case the
250252
auto prev_connection_state = connection_state_;
@@ -254,6 +256,7 @@ bool Room::disconnect(DisconnectReason reason) {
254256
{
255257
const std::scoped_lock<std::mutex> g(lock_);
256258
if (connection_state_ == ConnectionState::Disconnected) {
259+
LK_LOG_INFO("Room::disconnect: already disconnected, returning false (this={})", static_cast<const void*>(this));
257260
return false;
258261
}
259262
handle = room_handle_;
@@ -1228,6 +1231,8 @@ void Room::onEvent(const FfiEvent& event) {
12281231
break;
12291232
}
12301233
case proto::RoomEvent::kDisconnected: {
1234+
LK_LOG_INFO("Room::onFfiEvent: kDisconnected received (this={}, reason={})", static_cast<const void*>(this),
1235+
static_cast<int>(re.disconnected().reason()));
12311236
// If disconnect() was driven from our side, it already flipped state
12321237
// to Disconnected and fired the delegate; skip the duplicate here.
12331238
bool already_disconnected = false;
@@ -1261,6 +1266,7 @@ void Room::onEvent(const FfiEvent& event) {
12611266
break;
12621267
}
12631268
case proto::RoomEvent::kEos: {
1269+
LK_LOG_INFO("Room::onFfiEvent: kEos received (this={})", static_cast<const void*>(this));
12641270
if (subscription_thread_dispatcher_) {
12651271
subscription_thread_dispatcher_->stopAll();
12661272
}
@@ -1293,6 +1299,16 @@ void Room::onEvent(const FfiEvent& event) {
12931299
old_byte_readers = std::move(byte_stream_readers_);
12941300
}
12951301

1302+
// Drain in-flight RPC invocations before destroying the local
1303+
// participant's FFI handle. Mirrors the ordering in disconnect();
1304+
// without this, a listener-thread RPC handler can race with handle
1305+
// disposal and send to a dead handle → INVALID_HANDLE → terminate.
1306+
if (old_local_participant) {
1307+
LK_LOG_INFO("Room::onFfiEvent: kEos shutting down local participant (handle={})",
1308+
old_local_participant->ffiHandleId());
1309+
old_local_participant->shutdown();
1310+
}
1311+
12961312
// Remove listener outside lock
12971313
if (listener_to_remove != 0) {
12981314
FfiClient::instance().removeListener(listener_to_remove);

0 commit comments

Comments
 (0)