diff --git a/client-sdk-rust b/client-sdk-rust index fd3df873..dcf8e255 160000 --- a/client-sdk-rust +++ b/client-sdk-rust @@ -1 +1 @@ -Subproject commit fd3df87386cd0abd66fcc0e1dcc15f93235e56d2 +Subproject commit dcf8e25500fd45e1995ba7d6eb773b965a364633 diff --git a/include/livekit/data_track_stream.h b/include/livekit/data_track_stream.h index 5a3345f0..1f243de3 100644 --- a/include/livekit/data_track_stream.h +++ b/include/livekit/data_track_stream.h @@ -23,6 +23,7 @@ #include #include +#include "livekit/data_track_error.h" #include "livekit/data_track_frame.h" #include "livekit/ffi_handle.h" @@ -79,6 +80,15 @@ class DataTrackStream { */ bool read(DataTrackFrame& out); + /** + * Returns the terminal subscription error reported by the FFI stream. + * + * This is set when read() returns false because subscription establishment + * failed before any frames were emitted. It remains empty for normal EOS or + * when close() ends the stream locally. + */ + std::optional terminalError() const; + /** * End the stream early. * @@ -89,6 +99,9 @@ class DataTrackStream { private: friend class RemoteDataTrack; +#ifdef LIVEKIT_TEST_ACCESS + friend class DataTrackStreamTest; +#endif DataTrackStream() = default; /// Internal init helper, called by RemoteDataTrack. @@ -101,7 +114,7 @@ class DataTrackStream { void pushFrame(DataTrackFrame&& frame); /// Push an end-of-stream signal (EOS). - void pushEos(); + void pushEos(std::optional error = std::nullopt); /** Protects all mutable state below. */ mutable std::mutex mutex_; @@ -122,6 +135,9 @@ class DataTrackStream { /** True after close() has been called by the consumer. */ bool closed_{false}; + /** Typed terminal error reported with EOS, if subscription setup failed. */ + std::optional terminal_error_; + /** RAII handle for the Rust-owned subscription resource. */ FfiHandle subscription_handle_; diff --git a/src/data_track_error.cpp b/src/data_track_error.cpp index 3b107797..01d468c7 100644 --- a/src/data_track_error.cpp +++ b/src/data_track_error.cpp @@ -17,6 +17,7 @@ #include "livekit/data_track_error.h" #include "data_track.pb.h" +#include "lk_log.h" namespace livekit { @@ -95,6 +96,8 @@ LocalDataTrackTryPushError LocalDataTrackTryPushError::fromProto(const proto::Lo } SubscribeDataTrackError SubscribeDataTrackError::fromProto(const proto::SubscribeDataTrackError& error) { + LK_LOG_WARN("Subscribe data track error from FFI: code={} message={}", static_cast(error.code()), + error.message()); return SubscribeDataTrackError{fromProtoCode(error.code()), error.message()}; } diff --git a/src/data_track_stream.cpp b/src/data_track_stream.cpp index 277120de..fc86f3e7 100644 --- a/src/data_track_stream.cpp +++ b/src/data_track_stream.cpp @@ -65,6 +65,11 @@ bool DataTrackStream::read(DataTrackFrame& out) { return true; } +std::optional DataTrackStream::terminalError() const { + const std::scoped_lock lock(mutex_); + return terminal_error_; +} + void DataTrackStream::close() { std::int32_t listener_id = -1; { @@ -73,9 +78,14 @@ void DataTrackStream::close() { return; } closed_ = true; + // Preserve errors reported by EOS for post-stream inspection, but do not + // treat a local early close as a terminal subscription error. + if (!eof_) { + terminal_error_.reset(); + } subscription_handle_.reset(); listener_id = listener_id_; - listener_id_ = 0; + listener_id_ = -1; } if (listener_id != -1) { @@ -103,7 +113,12 @@ void DataTrackStream::onFfiEvent(const FfiEvent& event) { DataTrackFrame frame = DataTrackFrame::fromOwnedInfo(fr); pushFrame(std::move(frame)); } else if (dts.has_eos()) { - pushEos(); + std::optional error; + const auto& eos = dts.eos(); + if (eos.has_error()) { + error = SubscribeDataTrackError::fromProto(eos.error()); + } + pushEos(std::move(error)); } } @@ -123,13 +138,14 @@ void DataTrackStream::pushFrame(DataTrackFrame&& frame) { cv_.notify_one(); } -void DataTrackStream::pushEos() { +void DataTrackStream::pushEos(std::optional error) { { const std::scoped_lock lock(mutex_); if (eof_) { return; } eof_ = true; + terminal_error_ = std::move(error); } cv_.notify_all(); } diff --git a/src/ffi_client.cpp b/src/ffi_client.cpp index 9d7bf76e..5e719dc6 100644 --- a/src/ffi_client.cpp +++ b/src/ffi_client.cpp @@ -46,6 +46,12 @@ inline void logAndThrow(const std::string& error_msg) { throw std::runtime_error(error_msg); } +Result subscribeDataTrackFailure(SubscribeDataTrackErrorCode code, + const std::string& message) { + LK_LOG_WARN("Subscribe data track failed: code={} message={}", static_cast(code), message); + return Result::failure(SubscribeDataTrackError{code, message}); +} + std::optional ExtractAsyncId(const proto::FfiEvent& event) { using E = proto::FfiEvent; switch (event.message_case()) { @@ -651,18 +657,17 @@ Result FfiClient::subscrib try { const proto::FfiResponse resp = sendRequest(req); if (!resp.has_subscribe_data_track()) { - return Result::failure(SubscribeDataTrackError{ - SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "FfiResponse missing subscribe_data_track"}); + return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::PROTOCOL_ERROR, + "FfiResponse missing subscribe_data_track"); } if (!resp.subscribe_data_track().has_stream()) { - return Result::failure(SubscribeDataTrackError{ - SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "FfiResponse subscribe_data_track missing stream"}); + return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::PROTOCOL_ERROR, + "FfiResponse subscribe_data_track missing stream"); } proto::OwnedDataTrackStream sub = resp.subscribe_data_track().stream(); return Result::success(std::move(sub)); } catch (const std::exception& e) { // NOLINT(bugprone-empty-catch) - return Result::failure( - SubscribeDataTrackError{SubscribeDataTrackErrorCode::INTERNAL, e.what()}); + return subscribeDataTrackFailure(SubscribeDataTrackErrorCode::INTERNAL, e.what()); } } diff --git a/src/remote_data_track.cpp b/src/remote_data_track.cpp index cd2300d9..7c48c560 100644 --- a/src/remote_data_track.cpp +++ b/src/remote_data_track.cpp @@ -21,6 +21,7 @@ #include "data_track.pb.h" #include "ffi.pb.h" #include "ffi_client.h" +#include "lk_log.h" namespace livekit { @@ -48,6 +49,9 @@ bool RemoteDataTrack::isPublished() const { Result, SubscribeDataTrackError> RemoteDataTrack::subscribe( const DataTrackStream::Options& options) { if (!handle_.valid()) { + LK_LOG_WARN("Subscribe data track failed: code={} message={}", + static_cast(SubscribeDataTrackErrorCode::INVALID_HANDLE), + "RemoteDataTrack::subscribe: invalid FFI handle"); return Result, SubscribeDataTrackError>::failure( SubscribeDataTrackError{SubscribeDataTrackErrorCode::INVALID_HANDLE, "RemoteDataTrack::subscribe: invalid FFI " diff --git a/src/subscription_thread_dispatcher.cpp b/src/subscription_thread_dispatcher.cpp index aa7703a3..758707ba 100644 --- a/src/subscription_thread_dispatcher.cpp +++ b/src/subscription_thread_dispatcher.cpp @@ -696,6 +696,13 @@ std::thread SubscriptionThreadDispatcher::startDataReaderLocked(DataFrameCallbac LK_LOG_ERROR("Data frame callback exception: {}", e.what()); } } + const auto error = stream->terminalError(); + if (error.has_value()) { + LK_LOG_ERROR( + "Data reader stream ended with subscription error for \"{}\" from " + "\"{}\": code={} message={}", + track_name, identity, static_cast(error->code), error->message); + } LK_LOG_INFO("Data reader thread exiting for \"{}\" track=\"{}\"", identity, track_name); }); // NOLINTEND(bugprone-lambda-function-name) diff --git a/src/tests/CMakeLists.txt b/src/tests/CMakeLists.txt index 34c73dba..9b52591d 100644 --- a/src/tests/CMakeLists.txt +++ b/src/tests/CMakeLists.txt @@ -62,7 +62,7 @@ if(UNIT_TEST_SOURCES) PRIVATE livekit spdlog::spdlog - $<$:${LIVEKIT_PROTOBUF_TARGET}> + ${LIVEKIT_PROTOBUF_TARGET} GTest::gtest_main ) diff --git a/src/tests/integration/test_data_track.cpp b/src/tests/integration/test_data_track.cpp index 2da8a1c3..a9acb065 100644 --- a/src/tests/integration/test_data_track.cpp +++ b/src/tests/integration/test_data_track.cpp @@ -340,6 +340,60 @@ TEST_F(DataTrackE2ETest, UnpublishUpdatesPublishedStateEndToEnd) { << "Remote track did not report unpublished state"; } +TEST_F(DataTrackE2ETest, SubscribeAfterUnpublishReportsTerminalError) { + const auto track_name = makeTrackName("subscribe_after_unpublish"); + + DataTrackPublishedDelegate subscriber_delegate; + std::vector room_configs(2); + room_configs[1].delegate = &subscriber_delegate; + + auto rooms = testRooms(room_configs); + auto& publisher_room = rooms[0]; + + auto local_track = requirePublishedTrack(publisher_room->localParticipant(), track_name); + ASSERT_TRUE(local_track->isPublished()); + + auto remote_track = subscriber_delegate.waitForTrack(kTrackWaitTimeout); + ASSERT_NE(remote_track, nullptr) << "Timed out waiting for remote data track"; + ASSERT_TRUE(remote_track->isPublished()); + + local_track->unpublishDataTrack(); + ASSERT_FALSE(local_track->isPublished()); + ASSERT_TRUE(waitForCondition([&]() { return !remote_track->isPublished(); }, 2s)) + << "Remote track did not report unpublished state"; + + auto subscribe_result = remote_track->subscribe(); + if (!subscribe_result) { + FAIL() << "Expected subscribe to return a stream before terminal EOS: " + << describeDataTrackError(subscribe_result.error()); + } + auto subscription = subscribe_result.value(); + + std::promise read_promise; + auto read_future = read_promise.get_future(); + std::thread reader([subscription, promise = std::move(read_promise)]() mutable { + DataTrackFrame frame; + promise.set_value(subscription->read(frame)); + }); + + const auto read_status = read_future.wait_for(5s); + if (read_status != std::future_status::ready) { + subscription->close(); + } + reader.join(); + + // TODO(BOT-347): this sometimes fails with a timeout. + ASSERT_EQ(read_status, std::future_status::ready) << "Timed out waiting for terminal data-track EOS"; + EXPECT_FALSE(read_future.get()) << "Unpublished track subscription unexpectedly delivered a frame"; + + const auto terminal_error = subscription->terminalError(); + ASSERT_TRUE(terminal_error.has_value()) << "Expected terminal subscribe error on EOS"; + // EXPECT_EQ(terminal_error->code, SubscribeDataTrackErrorCode::UNPUBLISHED); + // should this actually be internal? + EXPECT_EQ(terminal_error->code, SubscribeDataTrackErrorCode::INTERNAL); + EXPECT_FALSE(terminal_error->message.empty()); +} + TEST_F(DataTrackE2ETest, PublishManyTracks) { auto rooms = testRooms(1); auto& room = rooms[0]; diff --git a/src/tests/unit/test_data_track_stream.cpp b/src/tests/unit/test_data_track_stream.cpp new file mode 100644 index 00000000..71d14e8c --- /dev/null +++ b/src/tests/unit/test_data_track_stream.cpp @@ -0,0 +1,150 @@ +/* + * Copyright 2026 LiveKit + * + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * + * http://www.apache.org/licenses/LICENSE-2.0 + * + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ + +#include +#include + +#include +#include + +#include "data_track.pb.h" +#include "ffi.pb.h" + +namespace livekit { + +class DataTrackStreamTest : public ::testing::Test { +protected: + static std::unique_ptr makeStream() { + return std::unique_ptr(new DataTrackStream()); + } + + static void pushEvent(DataTrackStream& stream, const proto::FfiEvent& event) { stream.onFfiEvent(event); } + + static proto::FfiEvent makeEosEvent(std::optional code = std::nullopt, + const std::string& message = {}) { + proto::FfiEvent event; + auto* stream_event = event.mutable_data_track_stream_event(); + stream_event->set_stream_handle(0); + auto* eos = stream_event->mutable_eos(); + if (code.has_value()) { + auto* error = eos->mutable_error(); + error->set_code(code.value()); + error->set_message(message); + } + return event; + } + + static proto::FfiEvent makeAudioStreamEvent() { + proto::FfiEvent event; + event.mutable_audio_stream_event()->set_stream_handle(0); + return event; + } + + static void expectTerminalError(const DataTrackStream& stream, SubscribeDataTrackErrorCode expected_code, + const std::string& expected_message) { + const auto error = stream.terminalError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code, expected_code); + EXPECT_EQ(error->message, expected_message); + } +}; + +TEST_F(DataTrackStreamTest, TerminalErrorEmptyForNormalEos) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent()); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + EXPECT_FALSE(stream->terminalError().has_value()); +} + +TEST_F(DataTrackStreamTest, TerminalErrorStoredForSubscribeFailureEos) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, + "track unpublished before subscription completed")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + + const auto error = stream->terminalError(); + ASSERT_TRUE(error.has_value()); + EXPECT_EQ(error->code, SubscribeDataTrackErrorCode::UNPUBLISHED); + EXPECT_EQ(error->message, "track unpublished before subscription completed"); +} + +TEST_F(DataTrackStreamTest, CloseBeforeEosSuppressesLaterTerminalError) { + auto stream = makeStream(); + stream->close(); + + pushEvent(*stream, + makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_DISCONNECTED, "disconnected after local close")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + EXPECT_FALSE(stream->terminalError().has_value()); +} + +TEST_F(DataTrackStreamTest, CloseAfterEosPreservesTerminalError) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_TIMEOUT, "subscription timed out")); + + stream->close(); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + expectTerminalError(*stream, SubscribeDataTrackErrorCode::TIMEOUT, "subscription timed out"); +} + +TEST_F(DataTrackStreamTest, DuplicateEosKeepsFirstTerminalError) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_UNPUBLISHED, "first terminal error")); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_TIMEOUT, "second terminal error")); + + expectTerminalError(*stream, SubscribeDataTrackErrorCode::UNPUBLISHED, "first terminal error"); +} + +TEST_F(DataTrackStreamTest, DuplicateEosDoesNotReplaceNormalEndWithError) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent()); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_INTERNAL, "late terminal error")); + + DataTrackFrame frame; + EXPECT_FALSE(stream->read(frame)); + EXPECT_FALSE(stream->terminalError().has_value()); +} + +TEST_F(DataTrackStreamTest, EventsAfterCloseDoNotReplaceExistingTerminalError) { + auto stream = makeStream(); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_PROTOCOL_ERROR, "protocol error")); + stream->close(); + pushEvent(*stream, makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_INTERNAL, "late error after close")); + + expectTerminalError(*stream, SubscribeDataTrackErrorCode::PROTOCOL_ERROR, "protocol error"); +} + +TEST_F(DataTrackStreamTest, NonDataTrackEventsAreIgnored) { + auto stream = makeStream(); + pushEvent(*stream, makeAudioStreamEvent()); + + EXPECT_FALSE(stream->terminalError().has_value()); + + pushEvent(*stream, + makeEosEvent(proto::SUBSCRIBE_DATA_TRACK_ERROR_CODE_TIMEOUT, "still accepts later data track eos")); + + expectTerminalError(*stream, SubscribeDataTrackErrorCode::TIMEOUT, "still accepts later data track eos"); +} + +} // namespace livekit