2525#include < livekit/e2ee.h>
2626#include < livekit/remote_data_track.h>
2727
28- #include < cmath>
2928#include < condition_variable>
3029#include < exception>
3130#include < future>
32- #include < tuple>
3331
3432#include " ../common/test_common.h"
3533#include " ffi_client.h"
36- #include " lk_log.h"
3734
3835namespace livekit ::test {
3936
@@ -45,9 +42,12 @@ constexpr char kTrackNamePrefix[] = "data_track_e2e";
4542constexpr auto kTrackWaitTimeout = 10s;
4643constexpr auto kPollingInterval = 10ms;
4744constexpr int kResubscribeIterations = 10 ;
45+ constexpr std::size_t kSinglePacketPayloadBytes = 8192 ;
4846constexpr int kPublishManyTrackCount = 256 ;
4947constexpr auto kPublishManyTimeout = 5s;
5048constexpr std::size_t kLargeFramePayloadBytes = 196608 ;
49+ constexpr auto kTransportFrameTimeout = 15s;
50+ constexpr std::uint8_t kTransportPayloadValue = 0xFA ;
5151constexpr char kE2EESharedSecret [] = " password" ;
5252constexpr int kE2EEFrameCount = 5 ;
5353constexpr int kTimestampFrameAttempts = 200 ;
@@ -287,24 +287,15 @@ void runEncryptedDataTrackRoundTrip(KeyDerivationFunction key_derivation_functio
287287
288288class DataTrackE2ETest : public LiveKitTestBase {};
289289
290- class DataTrackTransportTest : public DataTrackE2ETest ,
291- public ::testing::WithParamInterface<std::tuple<double , size_t >> {};
290+ class DataTrackTransportTest : public DataTrackE2ETest , public ::testing::WithParamInterface<std::size_t > {};
292291
293292class DataTrackKeyDerivationTest : public DataTrackE2ETest ,
294293 public ::testing::WithParamInterface<KeyDerivationFunction> {};
295294
296295TEST_P (DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) {
297- const auto publish_fps = std::get<0 >(GetParam ());
298- const auto payload_len = std::get<1 >(GetParam ());
296+ const auto payload_len = GetParam ();
299297 const auto track_name = makeTrackName (" transport" );
300298
301- // How long to publish frames for.
302- constexpr auto PUBLISH_DURATION = 10s;
303-
304- // Percentage of total frames that must be received on the subscriber end in
305- // order for the test to pass.
306- constexpr float MIN_PERCENTAGE = 0 .90f ;
307-
308299 std::vector<TestRoomConnectionOptions> room_configs (2 );
309300 room_configs[0 ].room_options .single_peer_connection = false ;
310301 room_configs[1 ].room_options .single_peer_connection = false ;
@@ -316,104 +307,57 @@ TEST_P(DataTrackTransportTest, PublishesAndReceivesFramesEndToEnd) {
316307 auto & publisher_room = rooms[0 ];
317308 const auto publisher_identity = publisher_room->localParticipant ()->identity ();
318309
319- auto track = requirePublishedTrack (publisher_room->localParticipant (), track_name);
320- std::cerr << " Track published\n " ;
310+ auto local_track = requirePublishedTrack (publisher_room->localParticipant (), track_name);
311+ ASSERT_TRUE (local_track->isPublished ());
312+ EXPECT_FALSE (local_track->info ().uses_e2ee );
313+ EXPECT_EQ (local_track->info ().name , track_name);
321314
322315 auto remote_track = subscriber_delegate.waitForTrack (kTrackWaitTimeout );
323- std::cerr << " Got remote track: " << remote_track->info ().sid << " \n " ;
324-
325316 ASSERT_NE (remote_track, nullptr ) << " Timed out waiting for remote data track" ;
326317 EXPECT_TRUE (remote_track->isPublished ());
327318 EXPECT_FALSE (remote_track->info ().uses_e2ee );
328319 EXPECT_EQ (remote_track->info ().name , track_name);
329320 EXPECT_EQ (remote_track->publisherIdentity (), publisher_identity);
330321
331- const auto frame_count =
332- static_cast <size_t >(std::llround (std::chrono::duration<double >(PUBLISH_DURATION).count () * publish_fps));
333-
334- auto publish = [&]() {
335- if (!track->isPublished ()) {
336- throw std::runtime_error (" Publisher failed to publish data track" );
337- }
338- if (track->info ().uses_e2ee ) {
339- throw std::runtime_error (" Unexpected E2EE on test data track" );
340- }
341- if (track->info ().name != track_name) {
342- throw std::runtime_error (" Published track name mismatch" );
343- }
344-
345- const auto frame_interval = std::chrono::duration_cast<std::chrono::steady_clock::duration>(
346- std::chrono::duration<double >(1.0 / publish_fps));
347- auto next_send = std::chrono::steady_clock::now ();
348-
349- std::cout << " Publishing " << frame_count << " frames with payload length " << payload_len << ' \n ' ;
350- for (size_t index = 0 ; index < frame_count; ++index) {
351- std::vector<std::uint8_t > payload (payload_len, static_cast <std::uint8_t >(index));
352- requirePushSuccess (track->tryPush (std::move (payload)), " Failed to push data frame" );
353-
354- next_send += frame_interval;
355- std::this_thread::sleep_until (next_send);
356- }
357-
358- track->unpublishDataTrack ();
359- };
360-
361322 auto subscribe_result = remote_track->subscribe ();
362323 if (!subscribe_result) {
363324 FAIL () << describeDataTrackError (subscribe_result.error ());
364325 }
365326 auto subscription = subscribe_result.value ();
366327
367- std::promise<size_t > receive_count_promise;
368- auto receive_count_future = receive_count_promise.get_future ();
369-
370- auto subscribe = [&]() {
371- size_t received_count = 0 ;
328+ std::atomic<bool > keep_publishing{true };
329+ auto publisher = std::async (std::launch::async, [&]() {
372330 DataTrackFrame frame;
373- while (subscription->read (frame) && received_count < frame_count) {
374- if (frame.payload .empty ()) {
375- throw std::runtime_error (" Received empty data frame" );
376- }
377-
378- const auto first_byte = frame.payload .front ();
379- if (!std::all_of (frame.payload .begin (), frame.payload .end (),
380- [first_byte](std::uint8_t byte) { return byte == first_byte; })) {
381- throw std::runtime_error (" Received frame with inconsistent payload" );
382- }
383- if (frame.user_timestamp .has_value ()) {
384- throw std::runtime_error (" Received unexpected user timestamp in transport test" );
385- }
386-
387- ++received_count;
331+ frame.payload .assign (payload_len, kTransportPayloadValue );
332+ while (keep_publishing.load ()) {
333+ requirePushSuccess (local_track->tryPush (frame), " Failed to push data frame" );
334+ std::this_thread::sleep_for (50ms);
388335 }
336+ });
389337
390- receive_count_promise.set_value (received_count);
391- };
392-
393- // Launch both publisher and subscriber
394- auto pub_fut = std::async (std::launch::async, publish);
395- auto sub_fut = std::async (std::launch::async, subscribe);
396-
397- // Wait for both, with a combined deadline (the timeout(...) wrapper).
398- const auto deadline = std::chrono::steady_clock::now () + PUBLISH_DURATION + 25s;
399-
400- const bool pub_ok = pub_fut.wait_until (deadline) == std::future_status::ready;
401- const bool sub_ok = sub_fut.wait_until (deadline) == std::future_status::ready;
402-
403- if (!pub_ok || !sub_ok) {
404- ADD_FAILURE () << " Timed out waiting for data frames" ;
338+ DataTrackFrame frame;
339+ std::exception_ptr read_error;
340+ try {
341+ frame = readFrameWithTimeout (subscription, kTransportFrameTimeout );
342+ } catch (...) {
343+ read_error = std::current_exception ();
405344 }
406345
407- // Equivalent of `try_join!`'s ? — re-throws any exception from either task
408- pub_fut.get ();
409- sub_fut.get ();
346+ const bool remote_track_published_after_read = remote_track->isPublished ();
347+ keep_publishing.store (false );
348+ subscription->close ();
349+ local_track->unpublishDataTrack ();
410350
411- const auto received_count = receive_count_future .get ();
412- const auto received_percent = static_cast < float >(received_count) / static_cast < float >(frame_count);
413- std::cout << " Received " << received_count << " / " << frame_count << " frames ( " << received_percent * 100 . 0f << " %) "
414- << ' \n ' ;
351+ publisher .get ();
352+ if (read_error) {
353+ std::rethrow_exception (read_error);
354+ }
415355
416- EXPECT_GE (received_percent, MIN_PERCENTAGE) << " Received " << received_count << " /" << frame_count << " frames" ;
356+ ASSERT_EQ (frame.payload .size (), payload_len);
357+ EXPECT_TRUE (std::all_of (frame.payload .begin (), frame.payload .end (),
358+ [](std::uint8_t byte) { return byte == kTransportPayloadValue ; }));
359+ EXPECT_FALSE (frame.user_timestamp .has_value ());
360+ EXPECT_TRUE (remote_track_published_after_read);
417361}
418362
419363TEST_F (DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) {
@@ -853,20 +797,23 @@ TEST_F(DataTrackE2ETest, PreservesUserTimestampOnEncryptedDataTrack) {
853797 local_track->unpublishDataTrack ();
854798}
855799
856- std::string dataTrackParamName (const ::testing::TestParamInfo<std::tuple<double , size_t >>& info) {
857- if (std::get<0 >(info.param ) > 100.0 ) {
858- return " HighFpsSinglePacket" ;
800+ std::string dataTrackPayloadParamName (const ::testing::TestParamInfo<std::size_t >& info) {
801+ if (info.param == kSinglePacketPayloadBytes ) {
802+ return " SinglePacket" ;
803+ }
804+ if (info.param == kLargeFramePayloadBytes ) {
805+ return " MultiPacket" ;
859806 }
860- return " LowFpsMultiPacket " ;
807+ return " Payload " + std::to_string (info. param ) ;
861808}
862809
863810std::string keyDerivationParamName (const ::testing::TestParamInfo<KeyDerivationFunction>& info) {
864811 return keyDerivationFunctionName (info.param );
865812}
866813
867- INSTANTIATE_TEST_SUITE_P (DataTrackScenarios , DataTrackTransportTest,
868- ::testing::Values (std::make_tuple( 120.0 , size_t { 8192 }), std::make_tuple( 10.0 , size_t { 196608 }) ),
869- dataTrackParamName );
814+ INSTANTIATE_TEST_SUITE_P (DataTrackPayloads , DataTrackTransportTest,
815+ ::testing::Values (kSinglePacketPayloadBytes , kLargeFramePayloadBytes ),
816+ dataTrackPayloadParamName );
870817
871818INSTANTIATE_TEST_SUITE_P (KeyDerivationFunctions, DataTrackKeyDerivationTest,
872819 ::testing::Values (KeyDerivationFunction::PBKDF2, KeyDerivationFunction::HKDF),
0 commit comments