From 86b2e3d7244044142de698fc127fcac5a25aeea4 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Thu, 28 May 2026 11:04:40 -0700 Subject: [PATCH 1/2] feat: add FDv1AdapterSynchronizer wrapping IDataSynchronizer as IFDv2Synchronizer --- libs/server-sdk/src/CMakeLists.txt | 2 + .../fdv2/fdv1_adapter_synchronizer.cpp | 187 +++++++++++++++++ .../fdv2/fdv1_adapter_synchronizer.hpp | 107 ++++++++++ .../tests/fdv1_adapter_synchronizer_test.cpp | 189 ++++++++++++++++++ 4 files changed, 485 insertions(+) create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp create mode 100644 libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp create mode 100644 libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp diff --git a/libs/server-sdk/src/CMakeLists.txt b/libs/server-sdk/src/CMakeLists.txt index 94e12f113..176c95421 100644 --- a/libs/server-sdk/src/CMakeLists.txt +++ b/libs/server-sdk/src/CMakeLists.txt @@ -76,6 +76,8 @@ target_sources(${LIBNAME} data_systems/fdv2/source_manager.cpp data_systems/fdv2/fdv2_data_system.hpp data_systems/fdv2/fdv2_data_system.cpp + data_systems/fdv2/fdv1_adapter_synchronizer.hpp + data_systems/fdv2/fdv1_adapter_synchronizer.cpp data_systems/background_sync/sources/streaming/streaming_data_source.hpp data_systems/background_sync/sources/streaming/streaming_data_source.cpp data_systems/background_sync/sources/streaming/event_handler.hpp diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp new file mode 100644 index 000000000..2dfd953cb --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp @@ -0,0 +1,187 @@ +#include "fdv1_adapter_synchronizer.hpp" + +#include + +namespace launchdarkly::server_side::data_systems { + +using data_interfaces::FDv2SourceResult; + +// ----- State ----- + +bool FDv1AdapterSynchronizer::State::TryStart() { + std::lock_guard lock(mutex_); + if (started_ || closed_) { + return false; + } + started_ = true; + return true; +} + +bool FDv1AdapterSynchronizer::State::MarkClosed() { + std::lock_guard lock(mutex_); + closed_ = true; + return started_; +} + +async::Future FDv1AdapterSynchronizer::State::GetNext() { + std::lock_guard lock(mutex_); + if (!result_queue_.empty()) { + auto result = std::move(result_queue_.front()); + result_queue_.pop_front(); + return async::MakeFuture(std::move(result)); + } + return pending_promise_.emplace().GetFuture(); +} + +void FDv1AdapterSynchronizer::State::ResolvePendingAsShutdown() { + std::optional> promise; + { + std::lock_guard lock(mutex_); + if (pending_promise_) { + promise = std::move(pending_promise_); + pending_promise_.reset(); + } + } + if (promise) { + promise->Resolve(FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } +} + +void FDv1AdapterSynchronizer::State::Notify(FDv2SourceResult result) { + std::optional> promise; + { + std::lock_guard lock(mutex_); + if (closed_) { + return; + } + if (pending_promise_) { + promise = std::move(pending_promise_); + pending_promise_.reset(); + } else { + result_queue_.push_back(std::move(result)); + return; + } + } + // Resolve outside the lock — Promise::Resolve may invoke inline + // continuations that could call back into Notify or GetNext. + promise->Resolve(std::move(result)); +} + +// ----- ConvertingDestination ----- + +FDv1AdapterSynchronizer::ConvertingDestination::ConvertingDestination( + std::weak_ptr state) + : state_(std::move(state)) {} + +void FDv1AdapterSynchronizer::ConvertingDestination::Init( + data_model::SDKDataSet data_set) { + auto state = state_.lock(); + if (!state) { + return; + } + data_interfaces::ChangeSetData changes; + changes.reserve(data_set.flags.size() + data_set.segments.size()); + for (auto& [key, flag] : data_set.flags) { + changes.push_back({key, std::move(flag)}); + } + for (auto& [key, segment] : data_set.segments) { + changes.push_back({key, std::move(segment)}); + } + state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kFull, std::move(changes), + data_model::Selector{}}}}); +} + +void FDv1AdapterSynchronizer::ConvertingDestination::Upsert( + std::string const& key, + data_model::FlagDescriptor flag) { + auto state = state_.lock(); + if (!state) { + return; + } + data_interfaces::ChangeSetData changes; + changes.push_back({key, std::move(flag)}); + state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kPartial, std::move(changes), + data_model::Selector{}}}}); +} + +void FDv1AdapterSynchronizer::ConvertingDestination::Upsert( + std::string const& key, + data_model::SegmentDescriptor segment) { + auto state = state_.lock(); + if (!state) { + return; + } + data_interfaces::ChangeSetData changes; + changes.push_back({key, std::move(segment)}); + state->Notify(FDv2SourceResult{FDv2SourceResult::ChangeSet{ + data_model::ChangeSet{ + data_model::ChangeSetType::kPartial, std::move(changes), + data_model::Selector{}}}}); +} + +std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity() + const { + static std::string const identity = "FDv1 adapter destination"; + return identity; +} + +// ----- FDv1AdapterSynchronizer ----- + +FDv1AdapterSynchronizer::FDv1AdapterSynchronizer( + std::unique_ptr fdv1_source) + : state_(std::make_shared()), + destination_(std::make_unique(state_)), + fdv1_source_(std::move(fdv1_source)) {} + +FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() { + Close(); +} + +async::Future FDv1AdapterSynchronizer::Next( + data_model::Selector /*selector*/) { + auto closed = close_promise_.GetFuture(); + if (closed.IsFinished()) { + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + if (state_->TryStart()) { + fdv1_source_->StartAsync(destination_.get(), + /*bootstrap_data=*/nullptr); + } + auto result_future = state_->GetNext(); + if (result_future.IsFinished()) { + return result_future; + } + return async::WhenAny(closed, result_future) + .Then( + [state = state_, result_future](std::size_t const& idx) mutable + -> async::Future { + if (idx == 0) { + state->ResolvePendingAsShutdown(); + return async::MakeFuture( + FDv2SourceResult{FDv2SourceResult::Shutdown{}}); + } + return result_future; + }, + async::kInlineExecutor); +} + +void FDv1AdapterSynchronizer::Close() { + if (!close_promise_.Resolve(std::monostate{})) { + return; + } + if (state_->MarkClosed()) { + fdv1_source_->ShutdownAsync([] {}); + } +} + +std::string const& FDv1AdapterSynchronizer::Identity() const { + static std::string const identity = "FDv1 fallback adapter"; + return identity; +} + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp new file mode 100644 index 000000000..7d2d10a3e --- /dev/null +++ b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp @@ -0,0 +1,107 @@ +#pragma once + +#include "../../data_interfaces/destination/idestination.hpp" +#include "../../data_interfaces/source/idata_synchronizer.hpp" +#include "../../data_interfaces/source/ifdv2_synchronizer.hpp" + +#include + +#include +#include +#include +#include +#include +#include + +namespace launchdarkly::server_side::data_systems { + +/** + * Adapts an FDv1 IDataSynchronizer to the IFDv2Synchronizer interface. + * + * FDv1 Init/Upsert callbacks delivered through an internal IDestination are + * translated into FDv2SourceResult::ChangeSet results, with empty selectors + * and fdv1_fallback = false (the directive does not re-fire from FDv1 data). + * + * Threading: Next() and Close() may be called from any thread; only one + * Next() may be outstanding at a time. The adapter blocks in its destructor + * waiting for the FDv1 source's ShutdownAsync completion, so no callbacks + * are in flight when the wrapped source is destroyed. + */ +class FDv1AdapterSynchronizer final + : public data_interfaces::IFDv2Synchronizer { + public: + explicit FDv1AdapterSynchronizer( + std::unique_ptr fdv1_source); + + ~FDv1AdapterSynchronizer() override; + + async::Future Next( + data_model::Selector selector) override; + void Close() override; + [[nodiscard]] std::string const& Identity() const override; + + private: + /** + * Holds the lifecycle, result queue, and pending Next() promise; shared + * with the FDv1 source's IDestination via the inner ConvertingDestination. + * All methods are thread-safe. + */ + class State { + public: + // Returns true if this call transitioned Initial → Started; false if + // already started or already closed. Used to gate the one-time + // StartAsync call on the wrapped FDv1 source. + bool TryStart(); + + // Marks the state closed and returns whether the source was started + // before the transition (so the caller knows whether ShutdownAsync + // needs to be called). + bool MarkClosed(); + + async::Future GetNext(); + + // Resolves any pending Next() promise with Shutdown and clears it. + // Called on the close path so the abandoned promise doesn't leave + // potential continuations dangling. + void ResolvePendingAsShutdown(); + + void Notify(data_interfaces::FDv2SourceResult result); + + private: + // Protected by mutex_. + mutable std::mutex mutex_; + bool started_ = false; + bool closed_ = false; + std::optional> + pending_promise_; + std::deque result_queue_; + }; + + /** + * Translates FDv1 IDestination callbacks into FDv2 results queued on + * State. Thread-safe (delegates to State). + */ + class ConvertingDestination final : public data_interfaces::IDestination { + public: + explicit ConvertingDestination(std::weak_ptr state); + void Init(data_model::SDKDataSet data_set) override; + void Upsert(std::string const& key, + data_model::FlagDescriptor flag) override; + void Upsert(std::string const& key, + data_model::SegmentDescriptor segment) override; + [[nodiscard]] std::string const& Identity() const override; + + private: + std::weak_ptr state_; + }; + + // const after construction. + std::shared_ptr const state_; + std::unique_ptr const destination_; + std::unique_ptr const fdv1_source_; + + // Thread-safe primitive. + async::Promise close_promise_; +}; + +} // namespace launchdarkly::server_side::data_systems diff --git a/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp b/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp new file mode 100644 index 000000000..77058f6bc --- /dev/null +++ b/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp @@ -0,0 +1,189 @@ +#include + +#include + +#include +#include +#include +#include + +using namespace launchdarkly; +using namespace launchdarkly::server_side::data_interfaces; +using namespace launchdarkly::server_side::data_systems; +using namespace std::chrono_literals; + +namespace { + +// Mock FDv1 source: records StartAsync and ShutdownAsync calls and exposes +// the IDestination it was given so the test can drive Init/Upsert. +class MockFDv1Source final : public IDataSynchronizer { + public: + void StartAsync(IDestination* destination, + data_model::SDKDataSet const* bootstrap) override { + ++start_count; + destination_ = destination; + bootstrap_was_null = (bootstrap == nullptr); + } + + void ShutdownAsync(std::function completion) override { + ++shutdown_count; + if (completion) { + completion(); + } + } + + std::string const& Identity() const override { + static std::string const id = "mock fdv1"; + return id; + } + + IDestination* destination_ = nullptr; + int start_count = 0; + int shutdown_count = 0; + bool bootstrap_was_null = false; +}; + +} // namespace + +TEST(FDv1AdapterSynchronizerTest, FirstNextStartsFDv1Source) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + auto future = adapter.Next(data_model::Selector{}); + + EXPECT_EQ(1, source_ptr->start_count); + EXPECT_TRUE(source_ptr->bootstrap_was_null); + EXPECT_FALSE(future.IsFinished()); +} + +TEST(FDv1AdapterSynchronizerTest, SecondNextDoesNotRestartSource) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + auto first = adapter.Next(data_model::Selector{}); + source_ptr->destination_->Init(data_model::SDKDataSet{}); + auto result = first.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + adapter.Next(data_model::Selector{}); + + EXPECT_EQ(1, source_ptr->start_count); +} + +TEST(FDv1AdapterSynchronizerTest, FDv1InitProducesFullChangeSet) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + auto future = adapter.Next(data_model::Selector{}); + + data_model::SDKDataSet data_set; + data_model::Flag flag; + flag.key = "flagA"; + flag.version = 1; + data_set.flags.emplace("flagA", data_model::FlagDescriptor(flag)); + source_ptr->destination_->Init(std::move(data_set)); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + auto* change_set = std::get_if(&result->value); + ASSERT_NE(change_set, nullptr); + EXPECT_EQ(data_model::ChangeSetType::kFull, change_set->change_set.type); + ASSERT_EQ(1u, change_set->change_set.data.size()); + EXPECT_EQ("flagA", change_set->change_set.data[0].key); + EXPECT_FALSE(change_set->change_set.selector.value.has_value()); + EXPECT_FALSE(result->fdv1_fallback); +} + +TEST(FDv1AdapterSynchronizerTest, FDv1UpsertProducesPartialChangeSet) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + // Flag upsert. + auto flag_future = adapter.Next(data_model::Selector{}); + data_model::Flag flag; + flag.key = "flagA"; + flag.version = 2; + source_ptr->destination_->Upsert("flagA", data_model::FlagDescriptor(flag)); + + auto flag_result = flag_future.WaitForResult(1s); + ASSERT_TRUE(flag_result.has_value()); + auto* flag_change_set = + std::get_if(&flag_result->value); + ASSERT_NE(flag_change_set, nullptr); + EXPECT_EQ(data_model::ChangeSetType::kPartial, + flag_change_set->change_set.type); + ASSERT_EQ(1u, flag_change_set->change_set.data.size()); + EXPECT_EQ("flagA", flag_change_set->change_set.data[0].key); + + // Segment upsert. + auto seg_future = adapter.Next(data_model::Selector{}); + data_model::Segment seg; + seg.key = "segA"; + seg.version = 3; + source_ptr->destination_->Upsert("segA", + data_model::SegmentDescriptor(seg)); + + auto seg_result = seg_future.WaitForResult(1s); + ASSERT_TRUE(seg_result.has_value()); + auto* seg_change_set = + std::get_if(&seg_result->value); + ASSERT_NE(seg_change_set, nullptr); + EXPECT_EQ(data_model::ChangeSetType::kPartial, + seg_change_set->change_set.type); + ASSERT_EQ(1u, seg_change_set->change_set.data.size()); + EXPECT_EQ("segA", seg_change_set->change_set.data[0].key); +} + +TEST(FDv1AdapterSynchronizerTest, ClosePendingNextReturnsShutdown) { + auto source = std::make_unique(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + auto future = adapter.Next(data_model::Selector{}); + EXPECT_FALSE(future.IsFinished()); + + adapter.Close(); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} + +TEST(FDv1AdapterSynchronizerTest, CloseShutsDownStartedFDv1Source) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + adapter.Next(data_model::Selector{}); + adapter.Close(); + + EXPECT_EQ(1, source_ptr->shutdown_count); +} + +TEST(FDv1AdapterSynchronizerTest, CloseWithoutStartDoesNotShutDownFDv1Source) { + auto source = std::make_unique(); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + // No Next() call — FDv1 source was never started. + adapter.Close(); + + EXPECT_EQ(0, source_ptr->start_count); + EXPECT_EQ(0, source_ptr->shutdown_count); +} + +TEST(FDv1AdapterSynchronizerTest, NextAfterCloseReturnsShutdown) { + auto source = std::make_unique(); + FDv1AdapterSynchronizer adapter(std::move(source)); + + adapter.Close(); + auto future = adapter.Next(data_model::Selector{}); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + EXPECT_TRUE( + std::holds_alternative(result->value)); +} From b1157e18f6bb0388a32c50379a1fac5528242db6 Mon Sep 17 00:00:00 2001 From: Bee Klimt Date: Tue, 9 Jun 2026 10:07:36 -0700 Subject: [PATCH 2/2] feat: translate FDv1 status changes to FDv2 results in FDv1AdapterSynchronizer --- .../fdv2/fdv1_adapter_synchronizer.cpp | 26 +++- .../fdv2/fdv1_adapter_synchronizer.hpp | 33 ++++- .../tests/fdv1_adapter_synchronizer_test.cpp | 119 +++++++++++++++--- 3 files changed, 155 insertions(+), 23 deletions(-) diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp index 2dfd953cb..0fd6094d3 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.cpp @@ -5,6 +5,7 @@ namespace launchdarkly::server_side::data_systems { using data_interfaces::FDv2SourceResult; +using DataSourceState = DataSourceStatus::DataSourceState; // ----- State ----- @@ -132,9 +133,32 @@ std::string const& FDv1AdapterSynchronizer::ConvertingDestination::Identity() // ----- FDv1AdapterSynchronizer ----- FDv1AdapterSynchronizer::FDv1AdapterSynchronizer( - std::unique_ptr fdv1_source) + std::unique_ptr fdv1_source, + data_components::DataSourceStatusManager* status_manager) : state_(std::make_shared()), destination_(std::make_unique(state_)), + status_manager_(status_manager), + status_subscription_(status_manager_->OnDataSourceStatusChange( + [state = state_](DataSourceStatus status) { + auto error = status.LastError(); + if (!error) { + return; + } + switch (status.State()) { + case DataSourceState::kInterrupted: + state->Notify(FDv2SourceResult{ + FDv2SourceResult::Interrupted{*error}}); + break; + case DataSourceState::kOff: + state->Notify(FDv2SourceResult{ + FDv2SourceResult::TerminalError{*error}}); + break; + case DataSourceState::kInitializing: + case DataSourceState::kValid: + // No FDv2 result for these states. + break; + } + })), fdv1_source_(std::move(fdv1_source)) {} FDv1AdapterSynchronizer::~FDv1AdapterSynchronizer() { diff --git a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp index 7d2d10a3e..e52b1b321 100644 --- a/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp +++ b/libs/server-sdk/src/data_systems/fdv2/fdv1_adapter_synchronizer.hpp @@ -1,10 +1,12 @@ #pragma once +#include "../../data_components/status_notifications/data_source_status_manager.hpp" #include "../../data_interfaces/destination/idestination.hpp" #include "../../data_interfaces/source/idata_synchronizer.hpp" #include "../../data_interfaces/source/ifdv2_synchronizer.hpp" #include +#include #include #include @@ -23,15 +25,26 @@ namespace launchdarkly::server_side::data_systems { * and fdv1_fallback = false (the directive does not re-fire from FDv1 data). * * Threading: Next() and Close() may be called from any thread; only one - * Next() may be outstanding at a time. The adapter blocks in its destructor - * waiting for the FDv1 source's ShutdownAsync completion, so no callbacks - * are in flight when the wrapped source is destroyed. + * Next() may be outstanding at a time. Member declaration order ensures + * the wrapped FDv1 source destructs before destination_ and state_, so any + * in-flight FDv1 callbacks land on live objects during teardown. This + * relies on the wrapped IDataSynchronizer blocking on its in-flight work + * in its destructor. */ class FDv1AdapterSynchronizer final : public data_interfaces::IFDv2Synchronizer { public: - explicit FDv1AdapterSynchronizer( - std::unique_ptr fdv1_source); + /** + * @param fdv1_source The wrapped source. Must have been constructed + * with status_manager as its status sink so that + * state changes flow back into this adapter. + * @param status_manager Non-owning. The caller retains ownership and + * must keep it alive for the lifetime of the + * wrapped source and this adapter. + */ + FDv1AdapterSynchronizer( + std::unique_ptr fdv1_source, + data_components::DataSourceStatusManager* status_manager); ~FDv1AdapterSynchronizer() override; @@ -95,9 +108,17 @@ class FDv1AdapterSynchronizer final std::weak_ptr state_; }; - // const after construction. + // shared_ptr so async callbacks that may fire after this is destroyed + // can hold their own reference. std::shared_ptr const state_; std::unique_ptr const destination_; + + // Non-owning. The caller must keep this alive for the lifetime of + // the wrapped source, which holds a reference to it for status + // reporting. + data_components::DataSourceStatusManager* const status_manager_; + std::unique_ptr const status_subscription_; + std::unique_ptr const fdv1_source_; // Thread-safe primitive. diff --git a/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp b/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp index 77058f6bc..da1408b94 100644 --- a/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp +++ b/libs/server-sdk/tests/fdv1_adapter_synchronizer_test.cpp @@ -8,6 +8,8 @@ #include using namespace launchdarkly; +using namespace launchdarkly::server_side; +using namespace launchdarkly::server_side::data_components; using namespace launchdarkly::server_side::data_interfaces; using namespace launchdarkly::server_side::data_systems; using namespace std::chrono_literals; @@ -18,6 +20,8 @@ namespace { // the IDestination it was given so the test can drive Init/Upsert. class MockFDv1Source final : public IDataSynchronizer { public: + explicit MockFDv1Source(DataSourceStatusManager& /*status_manager*/) {} + void StartAsync(IDestination* destination, data_model::SDKDataSet const* bootstrap) override { ++start_count; @@ -46,9 +50,10 @@ class MockFDv1Source final : public IDataSynchronizer { } // namespace TEST(FDv1AdapterSynchronizerTest, FirstNextStartsFDv1Source) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); auto future = adapter.Next(data_model::Selector{}); @@ -58,9 +63,10 @@ TEST(FDv1AdapterSynchronizerTest, FirstNextStartsFDv1Source) { } TEST(FDv1AdapterSynchronizerTest, SecondNextDoesNotRestartSource) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); auto first = adapter.Next(data_model::Selector{}); source_ptr->destination_->Init(data_model::SDKDataSet{}); @@ -72,9 +78,10 @@ TEST(FDv1AdapterSynchronizerTest, SecondNextDoesNotRestartSource) { } TEST(FDv1AdapterSynchronizerTest, FDv1InitProducesFullChangeSet) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); auto future = adapter.Next(data_model::Selector{}); @@ -97,9 +104,10 @@ TEST(FDv1AdapterSynchronizerTest, FDv1InitProducesFullChangeSet) { } TEST(FDv1AdapterSynchronizerTest, FDv1UpsertProducesPartialChangeSet) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); // Flag upsert. auto flag_future = adapter.Next(data_model::Selector{}); @@ -138,8 +146,9 @@ TEST(FDv1AdapterSynchronizerTest, FDv1UpsertProducesPartialChangeSet) { } TEST(FDv1AdapterSynchronizerTest, ClosePendingNextReturnsShutdown) { - auto source = std::make_unique(); - FDv1AdapterSynchronizer adapter(std::move(source)); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); auto future = adapter.Next(data_model::Selector{}); EXPECT_FALSE(future.IsFinished()); @@ -153,9 +162,10 @@ TEST(FDv1AdapterSynchronizerTest, ClosePendingNextReturnsShutdown) { } TEST(FDv1AdapterSynchronizerTest, CloseShutsDownStartedFDv1Source) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); adapter.Next(data_model::Selector{}); adapter.Close(); @@ -164,9 +174,10 @@ TEST(FDv1AdapterSynchronizerTest, CloseShutsDownStartedFDv1Source) { } TEST(FDv1AdapterSynchronizerTest, CloseWithoutStartDoesNotShutDownFDv1Source) { - auto source = std::make_unique(); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); auto* source_ptr = source.get(); - FDv1AdapterSynchronizer adapter(std::move(source)); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); // No Next() call — FDv1 source was never started. adapter.Close(); @@ -175,9 +186,85 @@ TEST(FDv1AdapterSynchronizerTest, CloseWithoutStartDoesNotShutDownFDv1Source) { EXPECT_EQ(0, source_ptr->shutdown_count); } +TEST(FDv1AdapterSynchronizerTest, QueuedResultsDrainInFifoOrder) { + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); + auto* source_ptr = source.get(); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); + + // Start the source by satisfying one Next() with an Init. + auto first = adapter.Next(data_model::Selector{}); + source_ptr->destination_->Init(data_model::SDKDataSet{}); + first.WaitForResult(1s); + + // Two upserts queue with no Next() in flight. + data_model::Flag flag_a; + flag_a.key = "a"; + data_model::Flag flag_b; + flag_b.key = "b"; + source_ptr->destination_->Upsert("a", data_model::FlagDescriptor(flag_a)); + source_ptr->destination_->Upsert("b", data_model::FlagDescriptor(flag_b)); + + // Drain in FIFO order. + auto r1 = adapter.Next(data_model::Selector{}).WaitForResult(1s); + auto r2 = adapter.Next(data_model::Selector{}).WaitForResult(1s); + ASSERT_TRUE(r1.has_value()); + ASSERT_TRUE(r2.has_value()); + auto* cs1 = std::get_if(&r1->value); + auto* cs2 = std::get_if(&r2->value); + ASSERT_NE(cs1, nullptr); + ASSERT_NE(cs2, nullptr); + ASSERT_EQ(1u, cs1->change_set.data.size()); + ASSERT_EQ(1u, cs2->change_set.data.size()); + EXPECT_EQ("a", cs1->change_set.data[0].key); + EXPECT_EQ("b", cs2->change_set.data[0].key); +} + +TEST(FDv1AdapterSynchronizerTest, InterruptedStatusProducesInterruptedResult) { + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); + + // kInterrupted from kInitializing stays kInitializing; drive past first. + status_manager.SetState(DataSourceStatus::DataSourceState::kValid); + + auto future = adapter.Next(data_model::Selector{}); + status_manager.SetState( + DataSourceStatus::DataSourceState::kInterrupted, + DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, "boom"); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + auto* interrupted = + std::get_if(&result->value); + ASSERT_NE(interrupted, nullptr); + EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kNetworkError, + interrupted->error.Kind()); +} + +TEST(FDv1AdapterSynchronizerTest, OffStatusProducesTerminalErrorResult) { + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); + + auto future = adapter.Next(data_model::Selector{}); + status_manager.SetState( + DataSourceStatus::DataSourceState::kOff, + DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse, "401"); + + auto result = future.WaitForResult(1s); + ASSERT_TRUE(result.has_value()); + auto* terminal = + std::get_if(&result->value); + ASSERT_NE(terminal, nullptr); + EXPECT_EQ(DataSourceStatus::ErrorInfo::ErrorKind::kErrorResponse, + terminal->error.Kind()); +} + TEST(FDv1AdapterSynchronizerTest, NextAfterCloseReturnsShutdown) { - auto source = std::make_unique(); - FDv1AdapterSynchronizer adapter(std::move(source)); + DataSourceStatusManager status_manager; + auto source = std::make_unique(status_manager); + FDv1AdapterSynchronizer adapter(std::move(source), &status_manager); adapter.Close(); auto future = adapter.Next(data_model::Selector{});