Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
Original file line number Diff line number Diff line change
Expand Up @@ -45,6 +45,8 @@ struct PayloadTransferred {

struct Goodbye {
std::optional<std::string> reason;
// If set, indicates an FDv1 fallback directive with this TTL in seconds.
std::optional<std::int64_t> protocol_fallback_ttl;
};

struct FDv2Error {
Expand Down
2 changes: 2 additions & 0 deletions libs/internal/src/serialization/json_fdv2_events.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -132,6 +132,8 @@ tl::expected<std::optional<Goodbye>, JsonError> tag_invoke(
Goodbye goodbye{};

PARSE_CONDITIONAL_FIELD(goodbye.reason, obj, "reason");
PARSE_CONDITIONAL_FIELD(goodbye.protocol_fallback_ttl, obj,
"protocolFallbackTTL");

return goodbye;
}
Expand Down
10 changes: 10 additions & 0 deletions libs/internal/tests/fdv2_serialization_test.cpp
Original file line number Diff line number Diff line change
Expand Up @@ -363,6 +363,16 @@ TEST(GoodbyeTests, DeserializesWithoutReason) {
ASSERT_FALSE(result.value()->reason);
}

TEST(GoodbyeTests, DeserializesWithProtocolFallbackTtl) {
auto result =
boost::json::value_to<tl::expected<std::optional<Goodbye>, JsonError>>(
boost::json::parse(R"({"protocolFallbackTTL":60})"));
ASSERT_TRUE(result);
ASSERT_TRUE(result.value());
ASSERT_TRUE(result.value()->protocol_fallback_ttl);
ASSERT_EQ(60, *result.value()->protocol_fallback_ttl);
}

TEST(GoodbyeTests, WrongTypeReturnsSchemaFailure) {
auto result =
boost::json::value_to<tl::expected<std::optional<Goodbye>, JsonError>>(
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -5,12 +5,51 @@
#include <launchdarkly/data_model/change_set.hpp>
#include <launchdarkly/data_sources/data_source_status_error_info.hpp>

#include <charconv>
#include <chrono>
#include <cstdint>
#include <optional>
#include <string>
#include <string_view>
#include <system_error>
#include <variant>

namespace launchdarkly::server_side::data_interfaces {

/**
* Server-directed instruction to fall back to FDv1, carried alongside any
* FDv2 source result whose underlying transport observed it (e.g. an
* X-LD-FD-Fallback: true response header).
*/
struct FDv1FallbackDirective {
/** Default TTL used when the directive carries no TTL of its own. */
static constexpr std::chrono::seconds kDefaultTtl = std::chrono::hours(1);

/**
* Parse the value of an X-LD-FD-Fallback-TTL header (or equivalent
* protocolFallbackTTL field from a goodbye message). Returns
* std::nullopt if the value is malformed.
*/
static std::optional<std::chrono::seconds> ParseTtl(
std::string_view value) {
std::uint64_t seconds = 0;
auto const* begin = value.data();
auto const* end = begin + value.size();
auto const [ptr, ec] = std::from_chars(begin, end, seconds);
if (ec != std::errc{} || ptr != end) {
return std::nullopt;
}
return std::chrono::seconds(seconds);
}

/**
* How long to stay on FDv1 before attempting to recover to FDv2.
* A value of 0 seconds means the SDK should remain on FDv1
* indefinitely.
*/
std::chrono::seconds ttl = kDefaultTtl;
};

/**
* Result returned by IFDv2Initializer::Run and IFDv2Synchronizer::Next.
*/
Expand Down Expand Up @@ -56,10 +95,9 @@ struct FDv2SourceResult {
Value value;

/**
* If true, the server signaled (via the X-LD-FD-Fallback response header)
* that the client should fall back to FDv1.
* Set if the underlying transport observed an FDv1 fallback directive.
*/
bool fdv1_fallback = false;
std::optional<FDv1FallbackDirective> fdv1_fallback;
};

} // namespace launchdarkly::server_side::data_interfaces
Original file line number Diff line number Diff line change
Expand Up @@ -14,6 +14,8 @@ class IFDv2SynchronizerFactory {
public:
virtual std::unique_ptr<IFDv2Synchronizer> Build() = 0;

[[nodiscard]] virtual bool IsFDv1Fallback() const { return false; }

virtual ~IFDv2SynchronizerFactory() = default;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory const&) = delete;
IFDv2SynchronizerFactory(IFDv2SynchronizerFactory&&) = delete;
Expand Down
158 changes: 129 additions & 29 deletions libs/server-sdk/src/data_systems/fdv2/fdv2_data_system.cpp
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
#include "fdv2_data_system.hpp"

#include <launchdarkly/async/promise.hpp>
#include <launchdarkly/async/timer.hpp>

#include <boost/asio/post.hpp>

Expand Down Expand Up @@ -43,6 +44,7 @@ FDv2DataSystem::FDv2DataSystem(
store_(),
change_notifier_(store_, store_),
initialize_called_(false),
last_logged_synchronizer_interrupted_(false),
closed_(false),
selector_(),
initializer_index_(0),
Expand All @@ -58,6 +60,7 @@ FDv2DataSystem::~FDv2DataSystem() {
void FDv2DataSystem::Close() {
std::lock_guard<std::mutex> lock(mutex_);
closed_ = true;
fdv1_fallback_retry_cancel_.Cancel();
if (active_initializer_) {
active_initializer_->Close();
}
Expand All @@ -67,7 +70,6 @@ void FDv2DataSystem::Close() {
if (active_conditions_) {
active_conditions_->Close();
}
status_manager_->SetState(DataSourceStatus::DataSourceState::kOff);
}

std::shared_ptr<data_model::FlagDescriptor> FDv2DataSystem::GetFlag(
Expand Down Expand Up @@ -121,6 +123,9 @@ void FDv2DataSystem::RunNextInitializer() {
} else {
auto& factory = initializer_factories_[initializer_index_++];
active_initializer_ = factory->Build();
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": starting initializer "
<< active_initializer_->Identity();
active_initializer_->Run().Then(
[this](data_interfaces::FDv2SourceResult const& result)
-> std::monostate {
Expand All @@ -144,6 +149,7 @@ void FDv2DataSystem::OnInitializerResult(

bool got_basis = false;
bool got_shutdown = false;
bool disconnected = false;

std::visit(
overloaded{
Expand All @@ -152,6 +158,8 @@ void FDv2DataSystem::OnInitializerResult(
cs.change_set.selector.value.has_value();
ApplyChangeSet(std::move(cs.change_set));
if (has_selector) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": initializer succeeded";
got_basis = true;
}
},
Expand Down Expand Up @@ -185,8 +193,30 @@ void FDv2DataSystem::OnInitializerResult(
if (closed_ || got_shutdown) {
return;
}
if (result.fdv1_fallback) {
source_manager_.SwitchToFDv1Fallback();
if (source_manager_.AvailableSynchronizerCount() > 0) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
got_basis = true;
} else {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity()
<< ": FDv1 fallback directive received; no FDv1 "
"fallback synchronizer configured";
disconnected = true;
}
ScheduleFDv2RetryLocked(result.fdv1_fallback->ttl);
}
}

if (disconnected) {
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
DataSourceStatus::ErrorInfo::ErrorKind::kUnknown,
"FDv1 fallback directive received; no FDv1 fallback configured");
return;
}
if (got_basis) {
StartSynchronizers();
} else {
Expand All @@ -205,6 +235,10 @@ void FDv2DataSystem::StartSynchronizers() {
}
active_synchronizer_ = source_manager_.NextSynchronizer();
if (active_synchronizer_) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": starting synchronizer "
<< active_synchronizer_->Identity();
last_logged_synchronizer_interrupted_.store(false);
active_conditions_ = BuildActiveConditions();
} else {
exhausted = true;
Expand Down Expand Up @@ -313,34 +347,39 @@ void FDv2DataSystem::OnSynchronizerResult(

bool got_shutdown = false;
bool advance = false;
bool disconnected = false;

std::visit(overloaded{
[&](Result::ChangeSet& cs) {
ApplyChangeSet(std::move(cs.change_set));
},
[&](Result::Shutdown&) { got_shutdown = true; },
[&](Result::Interrupted const& iv) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity() << ": synchronizer interrupted: "
<< iv.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
iv.error.Kind(), iv.error.Message());
},
[&](Result::TerminalError const& te) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity() << ": synchronizer terminal error: "
<< te.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
te.error.Kind(), te.error.Message());
advance = true;
},
[&](Result::Goodbye const&) {
// The synchronizer handles this internally.
},
},
result.value);
std::visit(
overloaded{
[&](Result::ChangeSet& cs) {
last_logged_synchronizer_interrupted_.store(false);
ApplyChangeSet(std::move(cs.change_set));
},
[&](Result::Shutdown&) { got_shutdown = true; },
[&](Result::Interrupted const& iv) {
if (!last_logged_synchronizer_interrupted_.exchange(true)) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity()
<< ": synchronizer interrupted: " << iv.error.Message();
}
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
iv.error.Kind(), iv.error.Message());
},
[&](Result::TerminalError const& te) {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity()
<< ": synchronizer terminal error: " << te.error.Message();
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
te.error.Kind(), te.error.Message());
advance = true;
},
[&](Result::Goodbye const&) {
// The synchronizer handles this internally.
},
},
result.value);

{
std::lock_guard<std::mutex> lock(mutex_);
Expand All @@ -349,20 +388,81 @@ void FDv2DataSystem::OnSynchronizerResult(
active_conditions_.reset();
return;
}
if (advance) {
if (result.fdv1_fallback &&
!source_manager_.IsCurrentSynchronizerFDv1Fallback()) {
source_manager_.SwitchToFDv1Fallback();
active_synchronizer_.reset();
active_conditions_.reset();
if (source_manager_.AvailableSynchronizerCount() > 0) {
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv1 fallback engaged";
advance = true;
} else {
LD_LOG(logger_, LogLevel::kWarn)
<< Identity()
<< ": FDv1 fallback directive received; no FDv1 "
"fallback synchronizer configured";
disconnected = true;
}
ScheduleFDv2RetryLocked(result.fdv1_fallback->ttl);
} else if (advance) {
source_manager_.BlockCurrentSynchronizer();
active_synchronizer_.reset();
active_conditions_.reset();
}
}

if (disconnected) {
status_manager_->SetState(
DataSourceStatus::DataSourceState::kInterrupted,
DataSourceStatus::ErrorInfo::ErrorKind::kUnknown,
"FDv1 fallback directive received; no FDv1 fallback configured");
return;
}
if (advance) {
StartSynchronizers();
} else {
RunSynchronizerNext();
}
}

void FDv2DataSystem::ScheduleFDv2RetryLocked(std::chrono::seconds ttl) {
if (ttl == std::chrono::seconds::zero()) {
return;
}
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv2 retry scheduled in " << ttl.count() << "s";
async::Delay(ioc_, ttl, fdv1_fallback_retry_cancel_.GetToken())
.Then(
[this](bool fired) -> std::monostate {
if (fired) {
OnFDv1RetryTimer();
}
return {};
},
[ioc = ioc_](async::Continuation<void()> work) {
boost::asio::post(ioc, std::move(work));
});
}

void FDv2DataSystem::OnFDv1RetryTimer() {
{
std::lock_guard<std::mutex> lock(mutex_);
if (closed_) {
return;
}
LD_LOG(logger_, LogLevel::kInfo)
<< Identity() << ": FDv2 retry timer fired; re-engaging FDv2";
source_manager_.SwitchBackToFDv2();
if (active_synchronizer_) {
active_synchronizer_->Close();
active_synchronizer_.reset();
}
active_conditions_.reset();
}
StartSynchronizers();
}

void FDv2DataSystem::ApplyChangeSet(
data_model::ChangeSet<data_interfaces::ChangeSetData> change_set) {
if (change_set.selector.value.has_value()) {
Expand Down
Loading
Loading