diff --git a/.gitignore b/.gitignore index 2a54c6ab..e6706760 100644 --- a/.gitignore +++ b/.gitignore @@ -34,3 +34,4 @@ lib/ web/ *trace.json compile_commands.json +user_stories diff --git a/client-sdk-rust b/client-sdk-rust index fd3df873..b26a6acb 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit fd3df87386cd0abd66fcc0e1dcc15f93235e56d2 +Subproject commit b26a6acba2ed0b8fcc778283231777cb3d2fbe89 diff --git a/src/room.cpp b/src/room.cpp index d647b0fc..f809c201 100644 --- a/src/room.cpp +++ b/src/room.cpp @@ -17,6 +17,8 @@ #include "livekit/room.h" #include +#include +#include #include "data_track.pb.h" #include "ffi.pb.h" @@ -112,7 +114,7 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO } connection_state_ = ConnectionState::Reconnecting; } - auto fut = FfiClient::instance().connectAsync(url, token, options); + auto fut = FfiClient::instance().connectAsync(url, token, options); // [1] try { auto connectCb = fut.get(); // fut will throw if it fails to connect to the room @@ -178,13 +180,28 @@ bool Room::Connect(const std::string& url, const std::string& token, const RoomO connection_state_ = ConnectionState::Connected; } - // Install listener (Room is fully initialized) + // Proof that the issue is fixed + std::this_thread::sleep_for(std::chrono::seconds(1)); + LK_LOG_INFO("Room::Connect: sleeping for 1 second"); + + // Install listener (Room is fully initialized) [2] auto listenerId = FfiClient::instance().AddListener([this](const proto::FfiEvent& e) { OnEvent(e); }); { const std::scoped_lock g(lock_); listener_id_ = listenerId; } + // Tell the Rust core it can start forwarding room events. Rust parks the + // connect task after sending the ConnectCallback, so events emitted between + // the callback and the listener registration above would otherwise be + // dropped (PushEvent has no buffering). + { + proto::FfiRequest flush_req; + auto* msg = flush_req.mutable_flush_events(); + msg->set_room_handle(static_cast(owned_room.handle().id())); + (void)FfiClient::instance().sendRequest(flush_req); + } + return true; } catch (const std::exception& e) { // On error, set the connection_state_ to Disconnected diff --git a/src/tests/integration/test_late_join_track_publication.cpp b/src/tests/integration/test_late_join_track_publication.cpp new file mode 100644 index 00000000..9f74d7ca --- /dev/null +++ b/src/tests/integration/test_late_join_track_publication.cpp @@ -0,0 +1,569 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include "../common/audio_utils.h" +#include "../common/test_common.h" +#include "../common/video_utils.h" + +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include +#include + +namespace livekit::test { + +using namespace std::chrono_literals; + +namespace { + +constexpr auto kWaitTimeout = 20s; +constexpr int kAudioTrackCount = 2; +constexpr int kVideoTrackCount = 2; +constexpr int kDataTrackCount = 4; +constexpr int kVideoWidth = 160; +constexpr int kVideoHeight = 90; + +struct ExpectedPublication { + std::string name; + TrackKind kind = TrackKind::KIND_UNKNOWN; +}; + +struct LateJoinPublicationState { + std::mutex mutex; + std::condition_variable cv; + std::map published_media_tracks; + std::map subscribed_media_tracks; + std::map published_data_tracks; +}; + +class LateJoinPublicationDelegate : public RoomDelegate { +public: + explicit LateJoinPublicationDelegate(LateJoinPublicationState &state) + : state_(state) {} + + void onTrackPublished(Room &, const TrackPublishedEvent &event) override { + if (!event.publication) { + return; + } + + std::lock_guard lock(state_.mutex); + state_.published_media_tracks[event.publication->name()] = + event.publication->kind(); + state_.cv.notify_all(); + } + + void onTrackSubscribed(Room &, const TrackSubscribedEvent &event) override { + if (!event.publication) { + return; + } + + std::lock_guard lock(state_.mutex); + state_.subscribed_media_tracks[event.publication->name()] = + event.publication->kind(); + state_.cv.notify_all(); + } + + void onDataTrackPublished(Room &, + const DataTrackPublishedEvent &event) override { + if (!event.track) { + return; + } + + std::lock_guard lock(state_.mutex); + state_.published_data_tracks[event.track->info().name] = + event.track->publisherIdentity(); + state_.cv.notify_all(); + } + +private: + LateJoinPublicationState &state_; +}; + +class MediaLoopGuard { +public: + MediaLoopGuard() = default; + MediaLoopGuard(const MediaLoopGuard &) = delete; + MediaLoopGuard &operator=(const MediaLoopGuard &) = delete; + + ~MediaLoopGuard() { stop(); } + + void addVideoSource(const std::shared_ptr &source, + bool red_mode) { + threads_.emplace_back([this, source, red_mode]() { + runVideoLoop(source, running_, + red_mode ? fillRedWrapper : fillWebcamWrapper, kVideoWidth, + kVideoHeight); + }); + } + + void addAudioSource(const std::shared_ptr &source, + double base_freq_hz, bool siren_mode) { + threads_.emplace_back([this, source, base_freq_hz, siren_mode]() { + runToneLoop(source, running_, base_freq_hz, siren_mode); + }); + } + + void stop() { + running_.store(false, std::memory_order_relaxed); + for (auto &thread : threads_) { + if (thread.joinable()) { + thread.join(); + } + } + } + +private: + std::atomic running_{true}; + std::vector threads_; +}; + +class PublishedTrackGuard { +public: + explicit PublishedTrackGuard(LocalParticipant *participant) + : participant_(participant) {} + + PublishedTrackGuard(const PublishedTrackGuard &) = delete; + PublishedTrackGuard &operator=(const PublishedTrackGuard &) = delete; + + ~PublishedTrackGuard() { unpublishAll(); } + + void addMediaTrack(const std::shared_ptr &track, + const std::string &sid) { + media_tracks_.push_back({track, sid}); + } + + void addDataTrack(const std::shared_ptr &track) { + data_tracks_.push_back(track); + } + + void unpublishAll() { + if (participant_ != nullptr) { + for (const auto &track : media_tracks_) { + if (track.track && !track.sid.empty()) { + participant_->unpublishTrack(track.sid); + } + } + } + + for (const auto &track : data_tracks_) { + if (track && track->isPublished()) { + track->unpublishDataTrack(); + } + } + + media_tracks_.clear(); + data_tracks_.clear(); + } + +private: + struct PublishedMediaTrack { + std::shared_ptr track; + std::string sid; + }; + + LocalParticipant *participant_ = nullptr; + std::vector media_tracks_; + std::vector> data_tracks_; +}; + +bool hasExpectedMediaPublications( + const LateJoinPublicationState &state, + const std::vector &expected_media) { + for (const auto &expected : expected_media) { + const auto it = state.published_media_tracks.find(expected.name); + if (it == state.published_media_tracks.end() || + it->second != expected.kind) { + return false; + } + } + return true; +} + +bool hasExpectedDataPublications(const LateJoinPublicationState &state, + const std::set &expected_data) { + for (const auto &name : expected_data) { + if (state.published_data_tracks.count(name) == 0) { + return false; + } + } + return true; +} + +const char *trackKindName(TrackKind kind) { + switch (kind) { + case TrackKind::KIND_AUDIO: + return "audio"; + case TrackKind::KIND_VIDEO: + return "video"; + case TrackKind::KIND_UNKNOWN: + break; + } + return "unknown"; +} + +std::string +describeMediaTracks(const std::map &tracks) { + std::ostringstream out; + bool first = true; + for (const auto &[name, kind] : tracks) { + if (!first) { + out << ", "; + } + first = false; + out << name << "=" << trackKindName(kind); + } + return tracks.empty() ? "" : out.str(); +} + +std::string +describeDataTracks(const std::map &tracks) { + std::ostringstream out; + bool first = true; + for (const auto &[name, publisher_identity] : tracks) { + if (!first) { + out << ", "; + } + first = false; + out << name << "=" << publisher_identity; + } + return tracks.empty() ? "" : out.str(); +} + +std::string makeTrackName(const std::string &prefix, int index) { + return prefix + "-" + std::to_string(index) + "-" + + std::to_string(getTimestampUs()); +} + +} // namespace + +class LateJoinTrackPublicationIntegrationTest + : public LiveKitTestBase, + public ::testing::WithParamInterface {}; + +TEST_P(LateJoinTrackPublicationIntegrationTest, + ConsumerReceivesAlreadyPublishedAudioTrackEvents) { + skipIfNotConfigured(); + + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) + << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = + publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + MediaLoopGuard media_loops; + std::vector expected_media; + + for (int i = 0; i < kAudioTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-audio", i); + auto source = std::make_shared(kDefaultAudioSampleRate, + kDefaultAudioChannels, 0); + auto track = LocalAudioTrack::createLocalAudioTrack(track_name, source); + TrackPublishOptions publish_options; + publish_options.source = TrackSource::SOURCE_MICROPHONE; + + ASSERT_NO_THROW(publisher_room.localParticipant()->publishTrack( + track, publish_options)); + ASSERT_NE(track->publication(), nullptr) + << "Audio track was not locally published"; + + published_tracks.addMediaTrack(track, track->publication()->sid()); + media_loops.addAudioSource(source, 320.0 + static_cast(i) * 60.0, + i % 2 == 1); + expected_media.push_back({track_name, TrackKind::KIND_AUDIO}); + } + + LateJoinPublicationState state; + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) + << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + const bool got_expected = state.cv.wait_for(lock, kWaitTimeout, [&]() { + return hasExpectedMediaPublications(state, expected_media); + }); + EXPECT_TRUE(got_expected) + << "Timed out waiting for late-join audio publication events\n" + << "Published media events: " + << describeMediaTracks(state.published_media_tracks) << "\n" + << "Subscribed media events: " + << describeMediaTracks(state.subscribed_media_tracks); + } + + std::map media_snapshot; + { + std::lock_guard lock(state.mutex); + media_snapshot = state.published_media_tracks; + } + + for (const auto &expected : expected_media) { + const auto it = media_snapshot.find(expected.name); + EXPECT_NE(it, media_snapshot.end()) + << "Missing onTrackPublished event for " << expected.name + << "; received: " << describeMediaTracks(media_snapshot); + if (it != media_snapshot.end()) { + EXPECT_EQ(it->second, expected.kind) + << "Track kind mismatch for " << expected.name; + } + } + + auto *publisher_on_consumer = + consumer_room.remoteParticipant(publisher_identity); + ASSERT_NE(publisher_on_consumer, nullptr); + + std::map remote_publications; + for (const auto &[sid, publication] : + publisher_on_consumer->trackPublications()) { + (void)sid; + if (publication) { + remote_publications[publication->name()] = publication->kind(); + } + } + + for (const auto &expected : expected_media) { + const auto it = remote_publications.find(expected.name); + EXPECT_NE(it, remote_publications.end()) + << "Late consumer snapshot missing publication " << expected.name + << "; snapshot: " << describeMediaTracks(remote_publications); + if (it != remote_publications.end()) { + EXPECT_EQ(it->second, expected.kind) + << "Snapshot track kind mismatch for " << expected.name; + } + } + + media_loops.stop(); + published_tracks.unpublishAll(); +} + +TEST_P(LateJoinTrackPublicationIntegrationTest, + ConsumerReceivesAlreadyPublishedVideoTrackEvents) { + skipIfNotConfigured(); + + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) + << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = + publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + MediaLoopGuard media_loops; + std::vector expected_media; + + for (int i = 0; i < kVideoTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-video", i); + auto source = std::make_shared(kVideoWidth, kVideoHeight); + auto track = LocalVideoTrack::createLocalVideoTrack(track_name, source); + TrackPublishOptions publish_options; + publish_options.source = TrackSource::SOURCE_CAMERA; + publish_options.simulcast = false; + + ASSERT_NO_THROW(publisher_room.localParticipant()->publishTrack( + track, publish_options)); + ASSERT_NE(track->publication(), nullptr) + << "Video track was not locally published"; + + published_tracks.addMediaTrack(track, track->publication()->sid()); + media_loops.addVideoSource(source, i % 2 == 1); + expected_media.push_back({track_name, TrackKind::KIND_VIDEO}); + } + + LateJoinPublicationState state; + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) + << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + const bool got_expected = state.cv.wait_for(lock, kWaitTimeout, [&]() { + return hasExpectedMediaPublications(state, expected_media); + }); + EXPECT_TRUE(got_expected) + << "Timed out waiting for late-join video publication events\n" + << "Published media events: " + << describeMediaTracks(state.published_media_tracks) << "\n" + << "Subscribed media events: " + << describeMediaTracks(state.subscribed_media_tracks); + } + + std::map media_snapshot; + { + std::lock_guard lock(state.mutex); + media_snapshot = state.published_media_tracks; + } + + for (const auto &expected : expected_media) { + const auto it = media_snapshot.find(expected.name); + EXPECT_NE(it, media_snapshot.end()) + << "Missing onTrackPublished event for " << expected.name + << "; received: " << describeMediaTracks(media_snapshot); + if (it != media_snapshot.end()) { + EXPECT_EQ(it->second, expected.kind) + << "Track kind mismatch for " << expected.name; + } + } + + auto *publisher_on_consumer = + consumer_room.remoteParticipant(publisher_identity); + ASSERT_NE(publisher_on_consumer, nullptr); + + std::map remote_publications; + for (const auto &[sid, publication] : + publisher_on_consumer->trackPublications()) { + (void)sid; + if (publication) { + remote_publications[publication->name()] = publication->kind(); + } + } + + for (const auto &expected : expected_media) { + const auto it = remote_publications.find(expected.name); + EXPECT_NE(it, remote_publications.end()) + << "Late consumer snapshot missing publication " << expected.name + << "; snapshot: " << describeMediaTracks(remote_publications); + if (it != remote_publications.end()) { + EXPECT_EQ(it->second, expected.kind) + << "Snapshot track kind mismatch for " << expected.name; + } + } + + media_loops.stop(); + published_tracks.unpublishAll(); +} + +TEST_P(LateJoinTrackPublicationIntegrationTest, + ConsumerReceivesAlreadyPublishedDataTrackEvents) { + skipIfNotConfigured(); + + const bool single_peer_connection = GetParam(); + RoomOptions options; + options.auto_subscribe = true; + options.single_peer_connection = single_peer_connection; + + Room publisher_room; + ASSERT_TRUE(publisher_room.Connect(config_.url, config_.token_a, options)) + << "Publisher failed to connect"; + ASSERT_NE(publisher_room.localParticipant(), nullptr); + + const std::string publisher_identity = + publisher_room.localParticipant()->identity(); + ASSERT_FALSE(publisher_identity.empty()); + + PublishedTrackGuard published_tracks(publisher_room.localParticipant()); + std::set expected_data; + + for (int i = 0; i < kDataTrackCount; ++i) { + const std::string track_name = makeTrackName("late-join-data", i); + auto publish_result = + publisher_room.localParticipant()->publishDataTrack(track_name); + ASSERT_TRUE(publish_result) << "Failed to publish data track " << track_name + << ": " << publish_result.error().message; + + const auto &track = publish_result.value(); + ASSERT_TRUE(track->isPublished()) + << "Data track was not locally published: " << track_name; + + published_tracks.addDataTrack(track); + expected_data.insert(track_name); + } + + LateJoinPublicationState state; + LateJoinPublicationDelegate delegate(state); + Room consumer_room; + consumer_room.setDelegate(&delegate); + + ASSERT_TRUE(consumer_room.Connect(config_.url, config_.token_b, options)) + << "Consumer failed to connect"; + ASSERT_NE(consumer_room.localParticipant(), nullptr); + ASSERT_TRUE(waitForParticipant(&consumer_room, publisher_identity, 10s)) + << "Publisher not visible to late-joining consumer"; + + { + std::unique_lock lock(state.mutex); + const bool got_expected = state.cv.wait_for(lock, kWaitTimeout, [&]() { + return hasExpectedDataPublications(state, expected_data); + }); + EXPECT_TRUE(got_expected) + << "Timed out waiting for late-join data publication events\n" + << "Published data events: " + << describeDataTracks(state.published_data_tracks); + } + + std::map data_snapshot; + { + std::lock_guard lock(state.mutex); + data_snapshot = state.published_data_tracks; + } + + for (const auto &name : expected_data) { + const auto it = data_snapshot.find(name); + EXPECT_NE(it, data_snapshot.end()) + << "Missing onDataTrackPublished event for " << name; + if (it != data_snapshot.end()) { + EXPECT_EQ(it->second, publisher_identity) + << "Publisher identity mismatch for data track " << name; + } + } + + published_tracks.unpublishAll(); +} + +INSTANTIATE_TEST_SUITE_P(PeerConnectionModes, + LateJoinTrackPublicationIntegrationTest, + ::testing::Values(false, true), + [](const ::testing::TestParamInfo &info) { + return info.param ? "SinglePeerConnection" + : "DualPeerConnection"; + }); + +} // namespace livekit::test