diff --git a/libs/internal/include/launchdarkly/serialization/json_fdv2_events.hpp b/libs/internal/include/launchdarkly/serialization/json_fdv2_events.hpp index 50ea455ad..3088b859c 100644 --- a/libs/internal/include/launchdarkly/serialization/json_fdv2_events.hpp +++ b/libs/internal/include/launchdarkly/serialization/json_fdv2_events.hpp @@ -45,6 +45,8 @@ struct PayloadTransferred { struct Goodbye { std::optional reason; + // If set, indicates an FDv1 fallback directive with this TTL in seconds. + std::optional protocol_fallback_ttl; }; struct FDv2Error { diff --git a/libs/internal/src/serialization/json_fdv2_events.cpp b/libs/internal/src/serialization/json_fdv2_events.cpp index 63fba0d38..34dfa99cf 100644 --- a/libs/internal/src/serialization/json_fdv2_events.cpp +++ b/libs/internal/src/serialization/json_fdv2_events.cpp @@ -132,6 +132,8 @@ tl::expected, JsonError> tag_invoke( Goodbye goodbye{}; PARSE_CONDITIONAL_FIELD(goodbye.reason, obj, "reason"); + PARSE_CONDITIONAL_FIELD(goodbye.protocol_fallback_ttl, obj, + "protocolFallbackTTL"); return goodbye; } diff --git a/libs/internal/tests/fdv2_serialization_test.cpp b/libs/internal/tests/fdv2_serialization_test.cpp index 8083fa03b..1df425bc0 100644 --- a/libs/internal/tests/fdv2_serialization_test.cpp +++ b/libs/internal/tests/fdv2_serialization_test.cpp @@ -363,6 +363,16 @@ TEST(GoodbyeTests, DeserializesWithoutReason) { ASSERT_FALSE(result.value()->reason); } +TEST(GoodbyeTests, DeserializesWithProtocolFallbackTtl) { + auto result = + boost::json::value_to, JsonError>>( + boost::json::parse(R"({"protocolFallbackTTL":60})")); + ASSERT_TRUE(result); + ASSERT_TRUE(result.value()); + ASSERT_TRUE(result.value()->protocol_fallback_ttl); + ASSERT_EQ(60, *result.value()->protocol_fallback_ttl); +} + TEST(GoodbyeTests, WrongTypeReturnsSchemaFailure) { auto result = boost::json::value_to, JsonError>>( diff --git a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp index 73329f348..53a0cd442 100644 --- a/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp +++ b/libs/server-sdk/src/data_interfaces/source/fdv2_source_result.hpp @@ -5,12 +5,51 @@ #include #include +#include +#include +#include #include #include +#include +#include #include namespace launchdarkly::server_side::data_interfaces { +/** + * Server-directed instruction to fall back to FDv1, carried alongside any + * FDv2 source result whose underlying transport observed it (e.g. an + * X-LD-FD-Fallback: true response header). + */ +struct FDv1FallbackDirective { + /** Default TTL used when the directive carries no TTL of its own. */ + static constexpr std::chrono::seconds kDefaultTtl = std::chrono::hours(1); + + /** + * Parse the value of an X-LD-FD-Fallback-TTL header (or equivalent + * protocolFallbackTTL field from a goodbye message). Returns + * std::nullopt if the value is malformed. + */ + static std::optional ParseTtl( + std::string_view value) { + std::uint64_t seconds = 0; + auto const* begin = value.data(); + auto const* end = begin + value.size(); + auto const [ptr, ec] = std::from_chars(begin, end, seconds); + if (ec != std::errc{} || ptr != end) { + return std::nullopt; + } + return std::chrono::seconds(seconds); + } + + /** + * How long to stay on FDv1 before attempting to recover to FDv2. + * A value of 0 seconds means the SDK should remain on FDv1 + * indefinitely. + */ + std::chrono::seconds ttl = kDefaultTtl; +}; + /** * Result returned by IFDv2Initializer::Run and IFDv2Synchronizer::Next. */ @@ -56,10 +95,9 @@ struct FDv2SourceResult { Value value; /** - * If true, the server signaled (via the X-LD-FD-Fallback response header) - * that the client should fall back to FDv1. + * Set if the underlying transport observed an FDv1 fallback directive. */ - bool fdv1_fallback = false; + std::optional fdv1_fallback; }; } // namespace launchdarkly::server_side::data_interfaces diff --git a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp index fc3c6421a..ba130e768 100644 --- a/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp +++ b/libs/server-sdk/src/data_interfaces/source/ifdv2_synchronizer_factory.hpp @@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory { public: virtual std::unique_ptr Build() = 0; + [[nodiscard]] virtual bool IsFDv1Fallback() const { return false; } + virtual ~IFDv2SynchronizerFactory() = default; IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete; IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete; diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp index b3caa5aba..16bc7d8a0 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp @@ -1,6 +1,7 @@ #include "fdv2_data_system.hpp" #include +#include #include @@ -43,6 +44,7 @@ FDv2DataSystem::FDv2DataSystem( store_(), change_notifier_(store_, store_), initialize_called_(false), + last_logged_synchronizer_interrupted_(false), closed_(false), selector_(), initializer_index_(0), @@ -58,6 +60,7 @@ FDv2DataSystem::~FDv2DataSystem() { void FDv2DataSystem::Close() { std::lock_guard lock(mutex_); closed_ = true; + fdv1_fallback_retry_cancel_.Cancel(); if (active_initializer_) { active_initializer_->Close(); } @@ -67,7 +70,6 @@ void FDv2DataSystem::Close() { if (active_conditions_) { active_conditions_->Close(); } - status_manager_->SetState(DataSourceStatus::DataSourceState::kOff); } std::shared_ptr FDv2DataSystem::GetFlag( @@ -121,6 +123,9 @@ void FDv2DataSystem::RunNextInitializer() { } else { auto& factory = initializer_factories_[initializer_index_++]; active_initializer_ = factory->Build(); + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": starting initializer " + << active_initializer_->Identity(); active_initializer_->Run().Then( [this](data_interfaces::FDv2SourceResult const& result) -> std::monostate { @@ -144,6 +149,7 @@ void FDv2DataSystem::OnInitializerResult( bool got_basis = false; bool got_shutdown = false; + bool disconnected = false; std::visit( overloaded{ @@ -152,6 +158,8 @@ void FDv2DataSystem::OnInitializerResult( cs.change_set.selector.value.has_value(); ApplyChangeSet(std::move(cs.change_set)); if (has_selector) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": initializer succeeded"; got_basis = true; } }, @@ -185,8 +193,30 @@ void FDv2DataSystem::OnInitializerResult( if (closed_ || got_shutdown) { return; } + if (result.fdv1_fallback) { + source_manager_.SwitchToFDv1Fallback(); + if (source_manager_.AvailableSynchronizerCount() > 0) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv1 fallback engaged"; + got_basis = true; + } else { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": FDv1 fallback directive received; no FDv1 " + "fallback synchronizer configured"; + disconnected = true; + } + ScheduleFDv2RetryLocked(result.fdv1_fallback->ttl); + } } + if (disconnected) { + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + DataSourceStatus::ErrorInfo::ErrorKind::kUnknown, + "FDv1 fallback directive received; no FDv1 fallback configured"); + return; + } if (got_basis) { StartSynchronizers(); } else { @@ -205,6 +235,10 @@ void FDv2DataSystem::StartSynchronizers() { } active_synchronizer_ = source_manager_.NextSynchronizer(); if (active_synchronizer_) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": starting synchronizer " + << active_synchronizer_->Identity(); + last_logged_synchronizer_interrupted_.store(false); active_conditions_ = BuildActiveConditions(); } else { exhausted = true; @@ -313,34 +347,39 @@ void FDv2DataSystem::OnSynchronizerResult( bool got_shutdown = false; bool advance = false; + bool disconnected = false; - std::visit(overloaded{ - [&](Result::ChangeSet& cs) { - ApplyChangeSet(std::move(cs.change_set)); - }, - [&](Result::Shutdown&) { got_shutdown = true; }, - [&](Result::Interrupted const& iv) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": synchronizer interrupted: " - << iv.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - iv.error.Kind(), iv.error.Message()); - }, - [&](Result::TerminalError const& te) { - LD_LOG(logger_, LogLevel::kWarn) - << Identity() << ": synchronizer terminal error: " - << te.error.Message(); - status_manager_->SetState( - DataSourceStatus::DataSourceState::kInterrupted, - te.error.Kind(), te.error.Message()); - advance = true; - }, - [&](Result::Goodbye const&) { - // The synchronizer handles this internally. - }, - }, - result.value); + std::visit( + overloaded{ + [&](Result::ChangeSet& cs) { + last_logged_synchronizer_interrupted_.store(false); + ApplyChangeSet(std::move(cs.change_set)); + }, + [&](Result::Shutdown&) { got_shutdown = true; }, + [&](Result::Interrupted const& iv) { + if (!last_logged_synchronizer_interrupted_.exchange(true)) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() + << ": synchronizer interrupted: " << iv.error.Message(); + } + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + iv.error.Kind(), iv.error.Message()); + }, + [&](Result::TerminalError const& te) { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": synchronizer terminal error: " << te.error.Message(); + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + te.error.Kind(), te.error.Message()); + advance = true; + }, + [&](Result::Goodbye const&) { + // The synchronizer handles this internally. + }, + }, + result.value); { std::lock_guard lock(mutex_); @@ -349,13 +388,37 @@ void FDv2DataSystem::OnSynchronizerResult( active_conditions_.reset(); return; } - if (advance) { + if (result.fdv1_fallback && + !source_manager_.IsCurrentSynchronizerFDv1Fallback()) { + source_manager_.SwitchToFDv1Fallback(); + active_synchronizer_.reset(); + active_conditions_.reset(); + if (source_manager_.AvailableSynchronizerCount() > 0) { + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv1 fallback engaged"; + advance = true; + } else { + LD_LOG(logger_, LogLevel::kWarn) + << Identity() + << ": FDv1 fallback directive received; no FDv1 " + "fallback synchronizer configured"; + disconnected = true; + } + ScheduleFDv2RetryLocked(result.fdv1_fallback->ttl); + } else if (advance) { source_manager_.BlockCurrentSynchronizer(); active_synchronizer_.reset(); active_conditions_.reset(); } } + if (disconnected) { + status_manager_->SetState( + DataSourceStatus::DataSourceState::kInterrupted, + DataSourceStatus::ErrorInfo::ErrorKind::kUnknown, + "FDv1 fallback directive received; no FDv1 fallback configured"); + return; + } if (advance) { StartSynchronizers(); } else { @@ -363,6 +426,43 @@ void FDv2DataSystem::OnSynchronizerResult( } } +void FDv2DataSystem::ScheduleFDv2RetryLocked(std::chrono::seconds ttl) { + if (ttl == std::chrono::seconds::zero()) { + return; + } + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv2 retry scheduled in " << ttl.count() << "s"; + async::Delay(ioc_, ttl, fdv1_fallback_retry_cancel_.GetToken()) + .Then( + [this](bool fired) -> std::monostate { + if (fired) { + OnFDv1RetryTimer(); + } + return {}; + }, + [ioc = ioc_](async::Continuation work) { + boost::asio::post(ioc, std::move(work)); + }); +} + +void FDv2DataSystem::OnFDv1RetryTimer() { + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + LD_LOG(logger_, LogLevel::kInfo) + << Identity() << ": FDv2 retry timer fired; re-engaging FDv2"; + source_manager_.SwitchBackToFDv2(); + if (active_synchronizer_) { + active_synchronizer_->Close(); + active_synchronizer_.reset(); + } + active_conditions_.reset(); + } + StartSynchronizers(); +} + void FDv2DataSystem::ApplyChangeSet( data_model::ChangeSet change_set) { if (change_set.selector.value.has_value()) { diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp index 7957de429..3c7147ec1 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.hpp @@ -10,12 +10,14 @@ #include "conditions.hpp" #include "source_manager.hpp" +#include #include #include #include #include +#include #include #include #include @@ -51,9 +53,8 @@ namespace launchdarkly::server_side::data_systems { * Destruction protocol: * * The destructor cancels in-flight orchestration (closes the active - * source, emits status kOff), but does NOT block to drain executor - * callbacks that may already be queued. Before destroying, the caller - * must ensure both of: + * source) but does NOT block to drain executor callbacks that may + * already be queued. Before destroying, the caller must ensure both of: * * 1. The executor that orchestration callbacks run on has been stopped * AND any thread running it has been joined. Otherwise a previously- @@ -120,8 +121,6 @@ namespace launchdarkly::server_side::data_systems { * v * [Done; final status preserved] * - * Calling the destructor at any time -> [Closed; status kOff]. - * * Status transitions: * * kInitializing (initial) -> kValid on first successful ChangeSet apply. @@ -130,11 +129,10 @@ namespace launchdarkly::server_side::data_systems { * the initializer phase if not yet Valid). * kOff if all initializers exhaust without data * and no synchronizers are configured. - * kValid -> kInterrupted on errors; kOff in destructor or - * when synchronizers cycle through and exhaust. + * kValid -> kInterrupted on errors; kOff when + * synchronizers cycle through and exhaust. * kInterrupted -> kValid on next successful ChangeSet apply; - * kOff in destructor or on synchronizer - * exhaustion. + * kOff on synchronizer exhaustion. * kOff -> terminal. */ class FDv2DataSystem final : public data_interfaces::IDataSystem { @@ -251,6 +249,14 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { void OnSynchronizerResult(data_interfaces::FDv2SourceResult result); void OnConditionFired(data_interfaces::IFDv2Condition::Type type); + // Schedules an FDv2 recovery attempt after the given TTL. Called with + // mutex_ held. TTL of 0 disables the recovery and is a no-op. + void ScheduleFDv2RetryLocked(std::chrono::seconds ttl); + + // Invoked when the FDv1 fallback TTL expires. Switches the source list + // back to FDv2 and restarts the synchronizer phase. + void OnFDv1RetryTimer(); + // Builds the conditions to apply to the currently active synchronizer. // Must be called with mutex_ held; reads source_manager_ state. std::unique_ptr BuildActiveConditions() const; @@ -283,6 +289,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { // Set by Initialize() to detect repeat or concurrent calls. std::atomic_bool initialize_called_; + // Suppresses consecutive "interrupted" logs from the active synchronizer. + std::atomic_bool last_logged_synchronizer_interrupted_; + // Orchestration state, guarded by mutex_. std::mutex mutex_; bool closed_; @@ -292,6 +301,9 @@ class FDv2DataSystem final : public data_interfaces::IDataSystem { std::unique_ptr active_initializer_; std::unique_ptr active_synchronizer_; std::unique_ptr active_conditions_; + + // Cancelled in Close() to abort any pending FDv1 fallback retry delay. + async::CancellationSource fdv1_fallback_retry_cancel_; }; } // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp index 1178aff46..157eddb99 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv2_polling_impl.cpp @@ -13,6 +13,7 @@ namespace launchdarkly::server_side::data_systems { static char const* const kFDv1FallbackHeader = "X-LD-FD-Fallback"; +static char const* const kFDv1FallbackTtlHeader = "X-LD-FD-Fallback-TTL"; static char const* const kErrorParsingBody = "Could not parse FDv2 polling response"; @@ -34,10 +35,22 @@ static ErrorInfo MakeError(ErrorKind kind, std::chrono::system_clock::now()}; } -static bool ReadFDv1FallbackDirective( - network::HttpResult::HeadersType const& headers) { +static std::optional +ReadFDv1FallbackDirective(network::HttpResult::HeadersType const& headers) { auto const it = headers.find(kFDv1FallbackHeader); - return it != headers.end() && boost::iequals(it->second, "true"); + if (it == headers.end() || !boost::iequals(it->second, "true")) { + return std::nullopt; + } + data_interfaces::FDv1FallbackDirective directive; + auto const ttl_it = headers.find(kFDv1FallbackTtlHeader); + if (ttl_it != headers.end()) { + auto const ttl = + data_interfaces::FDv1FallbackDirective::ParseTtl(ttl_it->second); + if (ttl) { + directive.ttl = *ttl; + } + } + return directive; } network::HttpRequest MakeFDv2PollRequest( @@ -116,7 +129,12 @@ static FDv2SourceResult ParseFDv2PollEvents( FDv2SourceResult::ChangeSet{std::move(*typed)}}; } if (auto* goodbye = std::get_if(&result)) { - return FDv2SourceResult{FDv2SourceResult::Goodbye{goodbye->reason}}; + FDv2SourceResult result{FDv2SourceResult::Goodbye{goodbye->reason}}; + if (goodbye->protocol_fallback_ttl) { + result.fdv1_fallback = data_interfaces::FDv1FallbackDirective{ + std::chrono::seconds(*goodbye->protocol_fallback_ttl)}; + } + return result; } if (auto* error = std::get_if(&result)) { if (error->kind == FDv2ProtocolHandler::Error::Kind::kServerError) { @@ -183,7 +201,7 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( MakeError(ErrorKind::kNetworkError, 0, std::move(error_msg))}}; } - bool const fdv1_fallback = ReadFDv1FallbackDirective(res.Headers()); + auto fdv1_fallback = ReadFDv1FallbackDirective(res.Headers()); if (res.Status() == 304) { return FDv2SourceResult{ @@ -215,7 +233,11 @@ data_interfaces::FDv2SourceResult HandleFDv2PollResponse( << identity << ": " << interrupted->error.Message(); } } - result.fdv1_fallback = fdv1_fallback; + // An explicit directive parsed from the response body (e.g. via a + // goodbye event) takes precedence over the HTTP response header. + if (!result.fdv1_fallback) { + result.fdv1_fallback = fdv1_fallback; + } return result; } diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp index 362f13efd..3f020e404 100644 --- a/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.cpp @@ -11,9 +11,11 @@ SourceManager::SourceManager( std::vector> factories) { synchronizers_.reserve(factories.size()); for (auto& factory : factories) { - synchronizers_.push_back( - SynchronizerFactoryWithState{std::move(factory), State::kAvailable, - /*is_fdv1_fallback=*/false}); + bool const is_fdv1_fallback = factory->IsFDv1Fallback(); + synchronizers_.push_back(SynchronizerFactoryWithState{ + std::move(factory), + is_fdv1_fallback ? State::kBlocked : State::kAvailable, + is_fdv1_fallback}); } } @@ -44,6 +46,22 @@ void SourceManager::ResetSourceIndex() { synchronizer_index_ = -1; } +void SourceManager::SwitchToFDv1Fallback() { + for (auto& entry : synchronizers_) { + entry.state = + entry.is_fdv1_fallback ? State::kAvailable : State::kBlocked; + } + synchronizer_index_ = -1; +} + +void SourceManager::SwitchBackToFDv2() { + for (auto& entry : synchronizers_) { + entry.state = + entry.is_fdv1_fallback ? State::kBlocked : State::kAvailable; + } + synchronizer_index_ = -1; +} + bool SourceManager::IsPrimeSynchronizer() const { for (std::size_t i = 0; i < synchronizers_.size(); ++i) { if (synchronizers_[i].state == State::kAvailable) { diff --git a/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp index 1f310bfed..264dfd494 100644 --- a/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/source_manager.hpp @@ -23,8 +23,7 @@ namespace launchdarkly::server_side::data_systems { * by recovery, which wants to fall back to the most-preferred Available * synchronizer. * - * Each factory also carries an is_fdv1_fallback flag, currently always - * false. TODO: populate when the FDv1 fallback directive is implemented. + * Factories whose IsFDv1Fallback() returns true start in the Blocked state. * * Not thread-safe. The caller is responsible for serializing all calls. */ @@ -54,6 +53,20 @@ class SourceManager { */ void ResetSourceIndex(); + /** + * Blocks every non-FDv1 factory and unblocks the FDv1 fallback factory, + * if one was configured. Resets the iteration cursor so the next call to + * NextSynchronizer returns the FDv1 fallback. If no FDv1 fallback factory + * was configured, every factory is left blocked. + */ + void SwitchToFDv1Fallback(); + + /** + * Returns synchronizer state to the initial configuration, including + * unblocking factories previously blocked by terminal errors. + */ + void SwitchBackToFDv2(); + /** * Returns true if the currently tracked factory is the first Available * factory in the list. Returns false if no factory is currently tracked. @@ -73,9 +86,8 @@ class SourceManager { [[nodiscard]] std::size_t SynchronizerCount() const; /** - * Returns true if the currently tracked factory was configured as the - * FDv1 fallback synchronizer. Always false until the FDv1 fallback - * directive is implemented. + * Returns true if the currently tracked factory is the FDv1 fallback + * synchronizer. */ [[nodiscard]] bool IsCurrentSynchronizerFDv1Fallback() const; diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp index c43483fc3..5e658e319 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.cpp @@ -187,8 +187,21 @@ void FDv2StreamingSynchronizer::State::OnConnect(HttpRequest* req) { void FDv2StreamingSynchronizer::State::OnResponse( HttpResponseHeader const& headers) { auto const it = headers.find("X-LD-FD-Fallback"); - bool const directive = - it != headers.end() && boost::iequals(it->value(), "true"); + if (it == headers.end() || !boost::iequals(it->value(), "true")) { + std::lock_guard lock(mutex_); + latest_fdv1_fallback_.reset(); + return; + } + data_interfaces::FDv1FallbackDirective directive; + auto const ttl_it = headers.find("X-LD-FD-Fallback-TTL"); + if (ttl_it != headers.end()) { + auto const value = ttl_it->value(); + auto const ttl = data_interfaces::FDv1FallbackDirective::ParseTtl( + std::string_view{value.data(), value.size()}); + if (ttl) { + directive.ttl = *ttl; + } + } std::lock_guard lock(mutex_); latest_fdv1_fallback_ = directive; } @@ -240,7 +253,14 @@ void FDv2StreamingSynchronizer::State::OnEvent(sse::Event const& event) { << ": Goodbye was received from the LaunchDarkly " "connection with reason: '" << r.reason.value_or("") << "'."; - Notify(FDv2SourceResult{FDv2SourceResult::Goodbye{r.reason}}); + FDv2SourceResult goodbye_result{ + FDv2SourceResult::Goodbye{r.reason}}; + if (r.protocol_fallback_ttl) { + goodbye_result.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{ + std::chrono::seconds(*r.protocol_fallback_ttl)}; + } + Notify(std::move(goodbye_result)); // Drop the current connection and reconnect; the protocol // handler is reset so the new connection starts in a clean // state. @@ -311,7 +331,12 @@ void FDv2StreamingSynchronizer::State::Notify(FDv2SourceResult result) { std::optional> promise; { std::lock_guard lock(mutex_); - result.fdv1_fallback = latest_fdv1_fallback_; + // An explicit directive on the result (e.g. parsed from a goodbye + // message) takes precedence over the most recent HTTP response + // header. + if (!result.fdv1_fallback) { + result.fdv1_fallback = latest_fdv1_fallback_; + } if (pending_promise_) { promise = std::move(pending_promise_); pending_promise_.reset(); diff --git a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp index 1abed1f64..4e297212c 100644 --- a/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/streaming_synchronizer.hpp @@ -147,7 +147,8 @@ class FDv2StreamingSynchronizer final bool started_ = false; bool closed_ = false; // FDv1 fallback directive from the most recent SSE response. - bool latest_fdv1_fallback_ = false; + std::optional + latest_fdv1_fallback_; data_model::Selector latest_selector_; std::optional base_url_; std::shared_ptr sse_client_; diff --git a/libs/server-sdk/tests/fdv2_data_system_test.cpp b/libs/server-sdk/tests/fdv2_data_system_test.cpp index 48d9b49d1..e7d170ccd 100644 --- a/libs/server-sdk/tests/fdv2_data_system_test.cpp +++ b/libs/server-sdk/tests/fdv2_data_system_test.cpp @@ -145,6 +145,15 @@ class OneShotSynchronizerFactory : public IFDv2SynchronizerFactory { std::unique_ptr source_; }; +class FDv1FallbackOneShotFactory : public OneShotSynchronizerFactory { + public: + explicit FDv1FallbackOneShotFactory( + std::unique_ptr source) + : OneShotSynchronizerFactory(std::move(source)) {} + + bool IsFDv1Fallback() const override { return true; } +}; + // Returns each pre-supplied source in order on successive Build() calls. // Returns nullptr once the supply is exhausted. Used in tests that exercise // wrap-around or recovery, where the same factory is built more than once. @@ -263,25 +272,6 @@ TEST(FDv2DataSystemTest, OfflineMode_NoFactories_StatusValid) { EXPECT_FALSE(ds.Initialized()); } -TEST(FDv2DataSystemTest, Destructor_TransitionsStatusToOff) { - auto logger = MakeNullLogger(); - boost::asio::io_context ioc; - data_components::DataSourceStatusManager status_manager; - - { - FDv2DataSystem ds({}, {}, /*fallback_condition_factory=*/nullptr, - /*recovery_condition_factory=*/nullptr, - ioc.get_executor(), &status_manager, logger); - ds.Initialize(); - ASSERT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kValid); - } - - // After ~FDv2DataSystem, status is Off. - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); -} - // ============================================================================ // Initializer phase // ============================================================================ @@ -1090,19 +1080,360 @@ TEST(FDv2DataSystemTest, SingleSynchronizerHasNoFallbackArmed) { status_manager.Status().State()); } +// ============================================================================ +// FDv1 fallback directive +// ============================================================================ + +TEST(FDv2DataSystemTest, SynchronizerFdv1FlagSwitchesToFdv1Adapter) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // FDv2 synchronizer emits a ChangeSet with the directive, then closes. + auto fdv2_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kNone, + {}, + data_model::Selector{}}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + return r; + }()}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + + // FDv1 adapter returns Shutdown when reached, ending orchestration. + auto fdv1_sync = + std::make_unique(std::vector{}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, + SynchronizerFdv1FlagWithoutAdapterDoesNotTransitionToOff) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Directive with TTL=0: indefinite, no automatic FDv2 retry. + auto fdv2_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + return r; + }()}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_NE(DataSourceStatus::DataSourceState::kOff, + status_manager.Status().State()); +} + +TEST(FDv2DataSystemTest, InitializerFdv1FlagSwitchesToFdv1Adapter) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Initializer returns Interrupted with the directive set. + FDv2SourceResult init_result{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + init_result.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + auto initializer = + std::make_unique(std::move(init_result)); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + auto fdv2_sync = + std::make_unique(std::vector{}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + auto* fdv2_factory_ptr = fdv2_factory.get(); + + auto fdv1_sync = + std::make_unique(std::vector{}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + // FDv2 synchronizer was skipped; FDv1 adapter was built and ran. + EXPECT_EQ(0, fdv2_factory_ptr->build_count_); + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, + InitializerChangeSetWithDirectiveAppliesBasisThenSwitches) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // Initializer returns a Full changeset carrying a flag AND the directive. + // The basis must be applied to the store before the orchestrator + // transitions to the FDv1 fallback. + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + FDv2SourceResult init_result = MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + MakeSelector(1, "state-1")); + init_result.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + + auto initializer = + std::make_unique(std::move(init_result)); + + std::vector> initializers; + initializers.push_back( + std::make_unique(std::move(initializer))); + + auto fdv2_sync = + std::make_unique(std::vector{}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + auto* fdv2_factory_ptr = fdv2_factory.get(); + + auto fdv1_sync = + std::make_unique(std::vector{}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds(std::move(initializers), std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + // Basis applied before the switch. + EXPECT_TRUE(ds.Initialized()); + auto fetched = ds.GetFlag("flagA"); + ASSERT_TRUE(fetched); + EXPECT_EQ(1u, fetched->version); + + // FDv2 synchronizer skipped; FDv1 adapter built and ran. + EXPECT_EQ(0, fdv2_factory_ptr->build_count_); + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, DirectiveTtlElapseRebuildsFDv2) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // First build: source emits the directive with a 1s TTL. Second build + // (after the TTL elapses): source returns no results, so MockSynchronizer + // emits Shutdown and orchestration ends. + auto first_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{1}}; + return r; + }()}); + auto second_sync = + std::make_unique(std::vector{}); + + std::vector> sources; + sources.push_back(std::move(first_sync)); + sources.push_back(std::move(second_sync)); + auto factory = + std::make_unique(std::move(sources)); + auto* factory_ptr = factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(2, factory_ptr->build_count_); +} + +TEST(FDv2DataSystemTest, FDv2RecoveryAfterTtlAcceptsValidData) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // First build: emits directive with 1s TTL. Second build (recovery): + // emits a valid ChangeSet without the directive. + auto first_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "directive", + std::chrono::system_clock::now()}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{1}}; + return r; + }()}); + + data_model::Flag flag_a; + flag_a.key = "flagA"; + flag_a.version = 1; + + auto second_sync = std::make_unique( + std::vector{MakeFullChangeSetResult( + ChangeSetData{ + ItemChange{"flagA", data_model::FlagDescriptor(flag_a)}, + }, + MakeSelector(1, "state-1"))}); + + std::vector> sources; + sources.push_back(std::move(first_sync)); + sources.push_back(std::move(second_sync)); + auto factory = + std::make_unique(std::move(sources)); + auto* factory_ptr = factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + // FDv2 was rebuilt after the TTL elapsed, the new ChangeSet was applied, + // and the data system is in the valid state. + EXPECT_EQ(2, factory_ptr->build_count_); + EXPECT_TRUE(ds.Initialized()); + EXPECT_EQ(DataSourceStatus::DataSourceState::kValid, + status_manager.Status().State()); + auto fetched = ds.GetFlag("flagA"); + ASSERT_TRUE(fetched); +} + +TEST(FDv2DataSystemTest, FDv1SourceSelfDirectiveDoesNotRebuildFDv1) { + auto logger = MakeNullLogger(); + boost::asio::io_context ioc; + data_components::DataSourceStatusManager status_manager; + + // FDv2 source emits a ChangeSet with the directive, switching to the + // FDv1 fallback. + auto fdv2_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kNone, + {}, + data_model::Selector{}}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + return r; + }()}); + auto fdv2_factory = + std::make_unique(std::move(fdv2_sync)); + + // FDv1 source then emits a result also carrying the directive. Once + // FDv1 is active, the directive is silently ignored and the source is + // not rebuilt. + auto fdv1_sync = + std::make_unique(std::vector{[]() { + FDv2SourceResult r{ + FDv2SourceResult::Interrupted{FDv2SourceResult::ErrorInfo{ + FDv2SourceResult::ErrorInfo::ErrorKind::kErrorResponse, + /*status_code=*/418, "self-trigger", + std::chrono::system_clock::now()}}}; + r.fdv1_fallback = + data_interfaces::FDv1FallbackDirective{std::chrono::seconds{0}}; + return r; + }()}); + auto fdv1_factory = + std::make_unique(std::move(fdv1_sync)); + auto* fdv1_factory_ptr = fdv1_factory.get(); + + std::vector> synchronizers; + synchronizers.push_back(std::move(fdv2_factory)); + synchronizers.push_back(std::move(fdv1_factory)); + + FDv2DataSystem ds({}, std::move(synchronizers), + /*fallback_condition_factory=*/nullptr, + /*recovery_condition_factory=*/nullptr, + ioc.get_executor(), &status_manager, logger); + ds.Initialize(); + ioc.run(); + + EXPECT_EQ(1, fdv1_factory_ptr->build_count_); +} + // ============================================================================ // Destruction protocol: in-flight orchestration // ============================================================================ // // The destructor contract (fdv2_data_system.hpp) requires the destructor to -// cancel in-flight orchestration (close the active source, transition status -// to kOff) without firing any continuation against the destroyed object. The -// caller's responsibility is to ensure the executor is no longer running by -// the time destruction begins; the orchestrator's responsibility is to leave -// nothing dangling. These two tests pin that contract for both phases. +// cancel in-flight orchestration (close the active source) without firing +// any continuation against the destroyed object. The caller's responsibility +// is to ensure the executor is no longer running by the time destruction +// begins; the orchestrator's responsibility is to leave nothing dangling. +// These two tests pin that contract for both phases. -TEST(FDv2DataSystemTest, - Destructor_WithInFlightInitializer_ClosesSourceAndStatusOff) { +TEST(FDv2DataSystemTest, Destructor_WithInFlightInitializer_ClosesSource) { auto logger = MakeNullLogger(); boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; @@ -1129,12 +1460,9 @@ TEST(FDv2DataSystemTest, // ~FDv2DataSystem ran with the initializer's Future still unresolved. EXPECT_TRUE(initializer_closed); - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); } -TEST(FDv2DataSystemTest, - Destructor_WithInFlightSynchronizer_ClosesSourceAndStatusOff) { +TEST(FDv2DataSystemTest, Destructor_WithInFlightSynchronizer_ClosesSource) { auto logger = MakeNullLogger(); boost::asio::io_context ioc; data_components::DataSourceStatusManager status_manager; @@ -1160,6 +1488,4 @@ TEST(FDv2DataSystemTest, } EXPECT_TRUE(synchronizer_closed); - EXPECT_EQ(status_manager.Status().State(), - DataSourceStatus::DataSourceState::kOff); } diff --git a/libs/server-sdk/tests/fdv2_polling_impl_test.cpp b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp index c9ed53da5..531a41aa6 100644 --- a/libs/server-sdk/tests/fdv2_polling_impl_test.cpp +++ b/libs/server-sdk/tests/fdv2_polling_impl_test.cpp @@ -98,6 +98,29 @@ TEST(HandleFDv2PollResponseTest, HeaderValueOtherThanTrueDoesNotSetFlag) { EXPECT_FALSE(result.fdv1_fallback); } +TEST(HandleFDv2PollResponseTest, DirectiveWithoutTtlHeaderUsesDefault) { + auto result = + HandleResponse(304, std::nullopt, {{"X-LD-FD-Fallback", "true"}}); + ASSERT_TRUE(result.fdv1_fallback); + EXPECT_EQ(FDv1FallbackDirective::kDefaultTtl, result.fdv1_fallback->ttl); +} + +TEST(HandleFDv2PollResponseTest, DirectiveWithTtlHeaderParsesValue) { + auto result = HandleResponse( + 304, std::nullopt, + {{"X-LD-FD-Fallback", "true"}, {"X-LD-FD-Fallback-TTL", "60"}}); + ASSERT_TRUE(result.fdv1_fallback); + EXPECT_EQ(std::chrono::seconds{60}, result.fdv1_fallback->ttl); +} + +TEST(HandleFDv2PollResponseTest, DirectiveWithMalformedTtlFallsBackToDefault) { + auto result = HandleResponse(304, std::nullopt, + {{"X-LD-FD-Fallback", "true"}, + {"X-LD-FD-Fallback-TTL", "not-a-number"}}); + ASSERT_TRUE(result.fdv1_fallback); + EXPECT_EQ(FDv1FallbackDirective::kDefaultTtl, result.fdv1_fallback->ttl); +} + TEST(HandleFDv2PollResponseTest, NetworkErrorDoesNotSetFlag) { auto logger = MakeNullLogger(); FDv2ProtocolHandler handler; diff --git a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp index 68687adb5..be4a1fe3e 100644 --- a/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv2_streaming_synchronizer_test.cpp @@ -854,3 +854,60 @@ TEST(FDv2StreamingSynchronizerTest, ErrorAfterDirectiveCarriesFlag) { std::holds_alternative(result->value)); EXPECT_TRUE(result->fdv1_fallback); } + +TEST(FDv2StreamingSynchronizerTest, DirectiveWithTtlHeaderParsesValue) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Server sends the directive with an explicit TTL. + boost::beast::http::response_header<> headers; + headers.result(200); + headers.set("X-LD-FD-Fallback", "true"); + headers.set("X-LD-FD-Fallback-TTL", "60"); + FDv2StreamingSynchronizerTestPeer::OnResponse(synchronizer, headers); + + // Drive the source to surface the directive on its next result. + FDv2StreamingSynchronizerTestPeer::OnError( + synchronizer, + sse::Error{sse::errors::ReadTimeout{std::chrono::milliseconds(0)}}); + auto result = synchronizer.Next(data_model::Selector{}).WaitForResult(2s); + + // The parsed TTL is propagated on the result. + ASSERT_TRUE(result.has_value()); + ASSERT_TRUE(result->fdv1_fallback); + EXPECT_EQ(std::chrono::seconds{60}, result->fdv1_fallback->ttl); +} + +TEST(FDv2StreamingSynchronizerTest, DirectiveWithoutTtlHeaderUsesDefault) { + auto logger = MakeNullLogger(); + IoContextRunner runner; + + FDv2StreamingSynchronizer synchronizer( + runner.context().get_executor(), logger, + MakeEndpoints("http://localhost"), MakeHttpProperties(), std::nullopt, + 1s); + FDv2StreamingSynchronizerTestPeer::MarkStarted(synchronizer); + + // Server sends the directive with no TTL header. + boost::beast::http::response_header<> headers; + headers.result(200); + headers.set("X-LD-FD-Fallback", "true"); + FDv2StreamingSynchronizerTestPeer::OnResponse(synchronizer, headers); + + // Drive the source to surface the directive on its next result. + FDv2StreamingSynchronizerTestPeer::OnError( + synchronizer, + sse::Error{sse::errors::ReadTimeout{std::chrono::milliseconds(0)}}); + auto result = synchronizer.Next(data_model::Selector{}).WaitForResult(2s); + + // The result carries the default TTL. + ASSERT_TRUE(result.has_value()); + ASSERT_TRUE(result->fdv1_fallback); + EXPECT_EQ(FDv1FallbackDirective::kDefaultTtl, result->fdv1_fallback->ttl); +} diff --git a/libs/server-sdk/tests/source_manager_test.cpp b/libs/server-sdk/tests/source_manager_test.cpp index 244ca6400..68bfd5cc2 100644 --- a/libs/server-sdk/tests/source_manager_test.cpp +++ b/libs/server-sdk/tests/source_manager_test.cpp @@ -43,6 +43,11 @@ class CountingFactory : public IFDv2SynchronizerFactory { int build_count = 0; }; +class FDv1FallbackFactory : public CountingFactory { + public: + bool IsFDv1Fallback() const override { return true; } +}; + } // namespace TEST(SourceManagerTest, EmptyManagerReportsZeroAvailable) { @@ -176,7 +181,7 @@ TEST(SourceManagerTest, ResetSourceIndexSkipsBlockedFirstFactory) { EXPECT_EQ(1, f1_ptr->build_count); } -TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) { +TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackFalseForFDv2Factory) { auto f0 = std::make_unique(); std::vector> factories; factories.push_back(std::move(f0)); @@ -185,3 +190,109 @@ TEST(SourceManagerTest, IsCurrentSynchronizerFDv1FallbackAlwaysFalse) { mgr.NextSynchronizer(); EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); } + +TEST(SourceManagerTest, FDv1FallbackFactoryStartsBlockedAndIsSkipped) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + mgr.NextSynchronizer(); + EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); + EXPECT_EQ(0, fdv1_ptr->build_count); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackBlocksFDv2AndUnblocksFDv1) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(1, fdv1_ptr->build_count); + EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackWithoutAdapterBlocksEverything) { + auto fdv2 = std::make_unique(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + SourceManager mgr(std::move(factories)); + + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(0u, mgr.AvailableSynchronizerCount()); + EXPECT_EQ(nullptr, mgr.NextSynchronizer()); +} + +TEST(SourceManagerTest, SwitchToFDv1FallbackUnblocksPreviouslyBlockedFDv2) { + auto fdv2 = std::make_unique(); + auto fdv1 = std::make_unique(); + auto* fdv1_ptr = fdv1.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + mgr.SwitchToFDv1Fallback(); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(1, fdv1_ptr->build_count); + EXPECT_TRUE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} + +TEST(SourceManagerTest, SwitchBackToFDv2UnblocksFDv2AndBlocksFDv1) { + auto fdv2 = std::make_unique(); + auto* fdv2_ptr = fdv2.get(); + auto fdv1 = std::make_unique(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + factories.push_back(std::move(fdv1)); + SourceManager mgr(std::move(factories)); + + // Switch to FDv1 first, then back to FDv2. + mgr.SwitchToFDv1Fallback(); + mgr.SwitchBackToFDv2(); + + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(1, fdv2_ptr->build_count); + EXPECT_FALSE(mgr.IsCurrentSynchronizerFDv1Fallback()); +} + +TEST(SourceManagerTest, SwitchBackToFDv2UnblocksTerminallyFailedFDv2Factory) { + auto fdv2 = std::make_unique(); + auto* fdv2_ptr = fdv2.get(); + std::vector> factories; + factories.push_back(std::move(fdv2)); + SourceManager mgr(std::move(factories)); + + // Simulate a terminal error blocking the FDv2 factory. + mgr.NextSynchronizer(); + mgr.BlockCurrentSynchronizer(); + EXPECT_EQ(0u, mgr.AvailableSynchronizerCount()); + + mgr.SwitchBackToFDv2(); + + // Previously-blocked FDv2 factory is now available again. + EXPECT_EQ(1u, mgr.AvailableSynchronizerCount()); + auto sync = mgr.NextSynchronizer(); + ASSERT_NE(sync, nullptr); + EXPECT_EQ(2, fdv2_ptr->build_count); +}