Skip to content

Commit 59bc2fd

Browse files
Initial attempt at fixing room disconnect issue report
1 parent 069b78d commit 59bc2fd

7 files changed

Lines changed: 296 additions & 9 deletions

File tree

include/livekit/room.h

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -141,6 +141,26 @@ class LIVEKIT_API Room {
141141
bool Connect(const std::string& url, const std::string& token, const RoomOptions& options);
142142
// NOLINTEND(readability-identifier-naming)
143143

144+
/// Gracefully disconnect from the room.
145+
///
146+
/// Sends a `DisconnectRequest` to the server with the given reason, blocks
147+
/// until the FFI acknowledges, and tears down all room state (participants,
148+
/// listener, E2EE manager, subscription threads). The `onDisconnected`
149+
/// delegate is invoked once with the supplied reason.
150+
///
151+
/// Safe to call from any thread, but **must not** be called from inside a
152+
/// `RoomDelegate` callback — doing so will deadlock the event listener.
153+
///
154+
/// @note `~Room()` invokes `disconnect()` automatically if the room is
155+
/// still connected, so explicit calls are optional. Calling this
156+
/// explicitly lets you handle the disconnect outcome and choose a
157+
/// reason other than `ClientInitiated`.
158+
///
159+
/// @param reason Reason reported to the server (default: ClientInitiated).
160+
/// @returns true if a disconnect roundtrip was performed; false if the
161+
/// room was already disconnected.
162+
bool disconnect(DisconnectReason reason = DisconnectReason::ClientInitiated);
163+
144164
// Accessors
145165

146166
/// Retrieve static metadata about the room.

src/ffi_client.cpp

Lines changed: 31 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -380,6 +380,37 @@ std::future<proto::ConnectCallback> FfiClient::connectAsync(const std::string& u
380380
return fut;
381381
}
382382

383+
std::future<void> FfiClient::disconnectAsync(uintptr_t room_handle, DisconnectReason reason) {
384+
const AsyncId async_id = generateAsyncId();
385+
386+
auto fut = registerAsync<void>(
387+
async_id,
388+
// match: this DisconnectCallback's async_id
389+
[async_id](const proto::FfiEvent& event) {
390+
return event.has_disconnect() && event.disconnect().async_id() == async_id;
391+
},
392+
// handler: nothing to extract; the callback is signal-only
393+
[](const proto::FfiEvent& /*event*/, std::promise<void>& pr) { pr.set_value(); });
394+
395+
proto::FfiRequest req;
396+
auto* disconnect = req.mutable_disconnect();
397+
disconnect->set_room_handle(room_handle);
398+
disconnect->set_request_async_id(async_id);
399+
disconnect->set_reason(toProto(reason));
400+
401+
try {
402+
const proto::FfiResponse resp = sendRequest(req);
403+
if (!resp.has_disconnect()) {
404+
logAndThrow("FfiResponse missing disconnect");
405+
}
406+
} catch (...) {
407+
cancelPendingByAsyncId(async_id);
408+
throw;
409+
}
410+
411+
return fut;
412+
}
413+
383414
// Track APIs Implementation
384415
std::future<std::vector<RtcStats>> FfiClient::getTrackStatsAsync(uintptr_t track_handle) {
385416
// Generate client-side async_id first

src/ffi_client.h

Lines changed: 5 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -30,6 +30,7 @@
3030
#include "data_track.pb.h"
3131
#include "livekit/data_track_error.h"
3232
#include "livekit/result.h"
33+
#include "livekit/room_event_types.h"
3334
#include "livekit/stats.h"
3435
#include "livekit/visibility.h"
3536
#include "lk_log.h"
@@ -94,6 +95,10 @@ class LIVEKIT_INTERNAL_API FfiClient {
9495
std::future<proto::ConnectCallback> connectAsync(const std::string& url, const std::string& token,
9596
const RoomOptions& options);
9697

98+
// Send a DisconnectRequest for the given room handle and wait for the
99+
// FFI's DisconnectCallback. Throws std::runtime_error on FFI failure.
100+
std::future<void> disconnectAsync(uintptr_t room_handle, DisconnectReason reason);
101+
97102
// Track APIs
98103
std::future<std::vector<RtcStats>> getTrackStatsAsync(uintptr_t track_handle);
99104

src/room.cpp

Lines changed: 103 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -76,6 +76,21 @@ void readyForRoomEvent(std::uint64_t room_handle) {
7676
Room::Room() : subscription_thread_dispatcher_(std::make_unique<SubscriptionThreadDispatcher>()) {}
7777

7878
Room::~Room() {
79+
// Issue a graceful disconnect so the server sees us leave instead of
80+
// timing out (RAII expectation; see issue #118). disconnect() does the
81+
// full teardown including subscription threads, listener, and local
82+
// participant, so the destructor only needs to handle the
83+
// already-disconnected path.
84+
try {
85+
disconnect();
86+
} catch (const std::exception& e) {
87+
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: {}", e.what());
88+
} catch (...) {
89+
LK_LOG_ERROR("Room::~Room: graceful disconnect failed: unknown exception");
90+
}
91+
92+
// Defensive: if disconnect() bailed early (e.g. never connected), still
93+
// tear down any state that may have leaked.
7994
if (subscription_thread_dispatcher_) {
8095
subscription_thread_dispatcher_->stopAll();
8196
}
@@ -86,22 +101,16 @@ Room::~Room() {
86101
const std::scoped_lock<std::mutex> g(lock_);
87102
listener_to_remove = listener_id_;
88103
listener_id_ = 0;
89-
// Move local participant out for cleanup outside the lock
90104
local_participant_to_cleanup = std::move(local_participant_);
91105
}
92106

93-
// Shutdown local participant (unregisters RPC handlers, etc.) before
94-
// removing the listener. This prevents in-flight RPC responses from
95-
// trying to use destroyed handles.
96107
if (local_participant_to_cleanup) {
97108
local_participant_to_cleanup->shutdown();
98109
}
99110

100111
if (listener_to_remove != 0) {
101112
FfiClient::instance().removeListener(listener_to_remove);
102113
}
103-
104-
// local_participant_to_cleanup is destroyed here after listener is removed
105114
}
106115

107116
void Room::setDelegate(RoomDelegate* delegate) {
@@ -234,6 +243,83 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO
234243
return connect(url, token, options);
235244
}
236245

246+
bool Room::disconnect(DisconnectReason reason) {
247+
TRACE_EVENT0("livekit", "Room::disconnect");
248+
249+
std::shared_ptr<FfiHandle> handle;
250+
RoomDelegate* delegate_snapshot = nullptr;
251+
{
252+
const std::scoped_lock<std::mutex> g(lock_);
253+
if (connection_state_ == ConnectionState::Disconnected) {
254+
return false;
255+
}
256+
handle = room_handle_;
257+
delegate_snapshot = delegate_;
258+
// Flip state immediately so the in-flight Disconnected room-event we'll
259+
// get back doesn't double-fire onDisconnected. Mirrors Python's
260+
// Room.disconnect(), which also flips state before sending the request.
261+
connection_state_ = ConnectionState::Disconnected;
262+
}
263+
264+
// Tell the FFI to close the room and wait for the callback. Catch the
265+
// exception so we still run teardown below; the caller learns about the
266+
// failure via the returned bool / logs.
267+
bool ffi_ok = true;
268+
if (handle) {
269+
try {
270+
FfiClient::instance().disconnectAsync(handle->get(), reason).get();
271+
} catch (const std::exception& e) {
272+
LK_LOG_ERROR("Room::disconnect: FFI disconnect failed: {}", e.what());
273+
ffi_ok = false;
274+
}
275+
}
276+
277+
// Stop dispatcher first so no track callbacks fire mid-teardown.
278+
if (subscription_thread_dispatcher_) {
279+
subscription_thread_dispatcher_->stopAll();
280+
}
281+
282+
int listener_to_remove = 0;
283+
std::unique_ptr<LocalParticipant> local_participant_to_cleanup;
284+
{
285+
const std::scoped_lock<std::mutex> g(lock_);
286+
listener_to_remove = listener_id_;
287+
listener_id_ = 0;
288+
local_participant_to_cleanup = std::move(local_participant_);
289+
remote_participants_.clear();
290+
room_handle_.reset();
291+
e2ee_manager_.reset();
292+
text_stream_readers_.clear();
293+
byte_stream_readers_.clear();
294+
}
295+
296+
// Shut down local participant (unregisters RPC handlers, etc.) before
297+
// removing the listener, so in-flight RPC responses don't reach a
298+
// destroyed handle.
299+
if (local_participant_to_cleanup) {
300+
local_participant_to_cleanup->shutdown();
301+
}
302+
303+
if (listener_to_remove != 0) {
304+
FfiClient::instance().removeListener(listener_to_remove);
305+
}
306+
307+
// Fire onDisconnected exactly once, with the reason the caller passed.
308+
if (delegate_snapshot) {
309+
DisconnectedEvent ev;
310+
ev.reason = reason;
311+
try {
312+
delegate_snapshot->onDisconnected(*this, ev);
313+
} catch (const std::exception& e) {
314+
LK_LOG_ERROR("Room::disconnect: onDisconnected threw: {}", e.what());
315+
} catch (...) {
316+
LK_LOG_ERROR("Room::disconnect: onDisconnected threw: unknown exception");
317+
}
318+
}
319+
320+
return ffi_ok;
321+
}
322+
237323
RoomInfoData Room::roomInfo() const {
238324
const std::scoped_lock<std::mutex> g(lock_);
239325
return room_info_;
@@ -1139,6 +1225,17 @@ void Room::onEvent(const FfiEvent& event) {
11391225
break;
11401226
}
11411227
case proto::RoomEvent::kDisconnected: {
1228+
// If disconnect() was driven from our side, it already flipped state
1229+
// to Disconnected and fired the delegate; skip the duplicate here.
1230+
bool already_disconnected = false;
1231+
{
1232+
const std::scoped_lock<std::mutex> guard(lock_);
1233+
already_disconnected = (connection_state_ == ConnectionState::Disconnected);
1234+
connection_state_ = ConnectionState::Disconnected;
1235+
}
1236+
if (already_disconnected) {
1237+
break;
1238+
}
11421239
DisconnectedEvent ev;
11431240
ev.reason = toDisconnectReason(re.disconnected().reason());
11441241
if (delegate_snapshot) {

src/room_proto_converter.cpp

Lines changed: 74 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -99,9 +99,80 @@ DataPacketKind toDataPacketKind(proto::DataPacketKind in) {
9999
}
100100
}
101101

102-
DisconnectReason toDisconnectReason(proto::DisconnectReason /*in*/) {
103-
// TODO: map each proto::DisconnectReason to your DisconnectReason enum
104-
return DisconnectReason::Unknown;
102+
DisconnectReason toDisconnectReason(proto::DisconnectReason in) {
103+
switch (in) {
104+
case proto::CLIENT_INITIATED:
105+
return DisconnectReason::ClientInitiated;
106+
case proto::DUPLICATE_IDENTITY:
107+
return DisconnectReason::DuplicateIdentity;
108+
case proto::SERVER_SHUTDOWN:
109+
return DisconnectReason::ServerShutdown;
110+
case proto::PARTICIPANT_REMOVED:
111+
return DisconnectReason::ParticipantRemoved;
112+
case proto::ROOM_DELETED:
113+
return DisconnectReason::RoomDeleted;
114+
case proto::STATE_MISMATCH:
115+
return DisconnectReason::StateMismatch;
116+
case proto::JOIN_FAILURE:
117+
return DisconnectReason::JoinFailure;
118+
case proto::MIGRATION:
119+
return DisconnectReason::Migration;
120+
case proto::SIGNAL_CLOSE:
121+
return DisconnectReason::SignalClose;
122+
case proto::ROOM_CLOSED:
123+
return DisconnectReason::RoomClosed;
124+
case proto::USER_UNAVAILABLE:
125+
return DisconnectReason::UserUnavailable;
126+
case proto::USER_REJECTED:
127+
return DisconnectReason::UserRejected;
128+
case proto::SIP_TRUNK_FAILURE:
129+
return DisconnectReason::SipTrunkFailure;
130+
case proto::CONNECTION_TIMEOUT:
131+
return DisconnectReason::ConnectionTimeout;
132+
case proto::MEDIA_FAILURE:
133+
return DisconnectReason::MediaFailure;
134+
case proto::UNKNOWN_REASON:
135+
default:
136+
return DisconnectReason::Unknown;
137+
}
138+
}
139+
140+
proto::DisconnectReason toProto(DisconnectReason in) {
141+
switch (in) {
142+
case DisconnectReason::ClientInitiated:
143+
return proto::CLIENT_INITIATED;
144+
case DisconnectReason::DuplicateIdentity:
145+
return proto::DUPLICATE_IDENTITY;
146+
case DisconnectReason::ServerShutdown:
147+
return proto::SERVER_SHUTDOWN;
148+
case DisconnectReason::ParticipantRemoved:
149+
return proto::PARTICIPANT_REMOVED;
150+
case DisconnectReason::RoomDeleted:
151+
return proto::ROOM_DELETED;
152+
case DisconnectReason::StateMismatch:
153+
return proto::STATE_MISMATCH;
154+
case DisconnectReason::JoinFailure:
155+
return proto::JOIN_FAILURE;
156+
case DisconnectReason::Migration:
157+
return proto::MIGRATION;
158+
case DisconnectReason::SignalClose:
159+
return proto::SIGNAL_CLOSE;
160+
case DisconnectReason::RoomClosed:
161+
return proto::ROOM_CLOSED;
162+
case DisconnectReason::UserUnavailable:
163+
return proto::USER_UNAVAILABLE;
164+
case DisconnectReason::UserRejected:
165+
return proto::USER_REJECTED;
166+
case DisconnectReason::SipTrunkFailure:
167+
return proto::SIP_TRUNK_FAILURE;
168+
case DisconnectReason::ConnectionTimeout:
169+
return proto::CONNECTION_TIMEOUT;
170+
case DisconnectReason::MediaFailure:
171+
return proto::MEDIA_FAILURE;
172+
case DisconnectReason::Unknown:
173+
default:
174+
return proto::UNKNOWN_REASON;
175+
}
105176
}
106177

107178
// --------- basic helper conversions ---------

src/room_proto_converter.h

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -37,6 +37,7 @@ LIVEKIT_INTERNAL_API ConnectionQuality toConnectionQuality(proto::ConnectionQual
3737
LIVEKIT_INTERNAL_API ConnectionState toConnectionState(proto::ConnectionState in);
3838
LIVEKIT_INTERNAL_API DataPacketKind toDataPacketKind(proto::DataPacketKind in);
3939
LIVEKIT_INTERNAL_API DisconnectReason toDisconnectReason(proto::DisconnectReason in);
40+
LIVEKIT_INTERNAL_API proto::DisconnectReason toProto(DisconnectReason in);
4041

4142
LIVEKIT_INTERNAL_API UserPacketData fromProto(const proto::UserPacket& in);
4243
LIVEKIT_INTERNAL_API SipDtmfData fromProto(const proto::SipDTMF& in);

src/tests/integration/test_room.cpp

Lines changed: 62 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -70,4 +70,66 @@ TEST_F(RoomTest, ConnectWithInvalidUrl) {
7070
EXPECT_FALSE(connected) << "Should fail to connect to invalid URL";
7171
}
7272

73+
namespace {
74+
75+
class DisconnectTrackingDelegate : public RoomDelegate {
76+
public:
77+
void onDisconnected(Room&, const DisconnectedEvent& ev) override {
78+
++count;
79+
last_reason = ev.reason;
80+
}
81+
82+
std::atomic<int> count{0};
83+
DisconnectReason last_reason = DisconnectReason::Unknown;
84+
};
85+
86+
} // namespace
87+
88+
// Issue #118: explicit disconnect() sends DisconnectRequest, flips state,
89+
// and fires onDisconnected exactly once.
90+
TEST_F(RoomTest, ExplicitDisconnectFiresDelegateAndClearsState) {
91+
if (!server_available_) {
92+
GTEST_SKIP() << "LIVEKIT_URL / LIVEKIT_TOKEN_A not set";
93+
}
94+
95+
Room room;
96+
DisconnectTrackingDelegate delegate;
97+
room.setDelegate(&delegate);
98+
99+
RoomOptions options;
100+
ASSERT_TRUE(room.connect(server_url_, token_, options)) << "connect failed";
101+
ASSERT_EQ(room.connectionState(), ConnectionState::Connected);
102+
ASSERT_NE(room.localParticipant(), nullptr);
103+
104+
EXPECT_TRUE(room.disconnect()) << "disconnect should report success on a connected room";
105+
EXPECT_EQ(room.connectionState(), ConnectionState::Disconnected);
106+
EXPECT_EQ(room.localParticipant(), nullptr) << "local participant should be cleared after disconnect";
107+
EXPECT_EQ(delegate.count.load(), 1) << "onDisconnected should fire exactly once";
108+
EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated);
109+
110+
// Idempotent: calling again on an already-disconnected room is a no-op.
111+
EXPECT_FALSE(room.disconnect()) << "second disconnect should report no-op";
112+
EXPECT_EQ(delegate.count.load(), 1) << "delegate must not double-fire";
113+
}
114+
115+
// Issue #118: destruction of a still-connected Room must trigger a graceful
116+
// disconnect (RAII), not silently leak the participant on the server side.
117+
TEST_F(RoomTest, DestructorTriggersGracefulDisconnect) {
118+
if (!server_available_) {
119+
GTEST_SKIP() << "LIVEKIT_URL / LIVEKIT_TOKEN_A not set";
120+
}
121+
122+
DisconnectTrackingDelegate delegate;
123+
{
124+
Room room;
125+
room.setDelegate(&delegate);
126+
RoomOptions options;
127+
ASSERT_TRUE(room.connect(server_url_, token_, options));
128+
ASSERT_EQ(room.connectionState(), ConnectionState::Connected);
129+
// Let `room` go out of scope while still connected.
130+
}
131+
EXPECT_EQ(delegate.count.load(), 1) << "destructor should fire onDisconnected exactly once";
132+
EXPECT_EQ(delegate.last_reason, DisconnectReason::ClientInitiated);
133+
}
134+
73135
} // namespace livekit::test

0 commit comments

Comments
 (0)