fix: stop FDv2DataSource.Conditions from leaking on healthy primary#163
Conversation
FDv2DataSource's run loop calls CompletableFuture.anyOf(conditions.getFuture(), synchronizer.next()).get() on every iteration. Before this change, getFuture() returned the same shared CompletableFuture<Object> instance to every caller. Each anyOf call attaches an OrRelay Completion node to the shared instance's stack; CompletableFuture has no deregister path for the loser of a race, so the OrRelay stays on the stack until the shared future completes. The shared future only completes when fallback or recovery fires. On a healthy primary streaming ChangeSets, fallback is never armed and recovery is suppressed (only-available-synchronizer / single-prime configurations). The future never completes; the stack grows monotonically for the synchronizer's full tenure -- effectively the SDK's uptime on a healthy server. Per-iteration cost ~200 B: an OrRelay Completion plus the anyOf result CompletableFuture plus the chain references back to the inputs. At 10 ChangeSets/sec sustained that is ~150 MB/day per active synchronizer. The fix: a single permanent whenComplete listener on the underlying aggregate fans out completion to every fresh future handed out by getFuture(). Pending fresh futures are tracked via WeakReference, so a fresh future whose only strong references were the caller's local variables (typical lifetime: one loop iteration) becomes garbage-collectable once that iteration ends. Pending entries whose referent has been collected are pruned opportunistically on each getFuture() call and on close(). Conditions is now package-private rather than private so the new direct unit tests can reach it. Adds a test-only pendingSize() helper. Verified bug-proving discipline: two of the new tests (getFutureReturnsDistinctInstancesPerCall, getFutureReturnsDistinctInstancesEvenWithNoConditions) fail on the pre-fix shared-instance behavior and pass after the fix. Full server-sdk test suite (1857 tests) is clean.
The pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped test needed System.gc() + Thread.sleep to encourage reclamation, which is brittle. The two distinctness tests are sufficient bug-provers for the shared-instance behavior they fail on, so drop the soak test and the test-only pendingSize() helper that supported it.
There was a problem hiding this comment.
Cursor Bugbot has reviewed your changes using default mode and found 1 potential issue.
❌ Bugbot Autofix is OFF. To automatically fix reported issues with cloud agents, have a team admin enable autofix in the Cursor dashboard.
Reviewed by Cursor Bugbot for commit 536176e. Configure here.
Bugbot pointed out that completedValue (volatile Object) used null as a
sentinel for "not yet fired", and the whenComplete listener also stored
null on exceptional completion. After exceptional completion a
subsequent getFuture() entered the lock, saw pending == null (drained
by the listener), and returned CompletableFuture.completedFuture(null)
-- silently converting the exception into a null result. The run loop
would then NPE on res.getClass().
Replace the null-sentinel pattern with an explicit volatile boolean
isFired plus separate firedResult/firedThrowable fields. A new
makeCompletedFuture() helper builds a fresh completed future mirroring
whichever terminal state the aggregate reached.
Adds a bug-proving test (getFutureFailsExceptionallyWhenAggregateFails-
Exceptionally) that drives a manually-controlled condition to
completeExceptionally and asserts both pre- and post-firing getFuture()
results throw ExecutionException with the original cause. Verified the
test fails on the pre-fix null-sentinel behavior ("expected
ExecutionException, got normal completion").
beekld
left a comment
There was a problem hiding this comment.
I really like this solution to the problem, but have a few comments about how it might be simplified / improved.
(Unfortunately, this will be more complicated in C++, since weak references aren't really built into the language.)
| * true. Written under {@code lock}; readable without the lock once | ||
| * {@code isFired} has been observed true (volatile happens-before). | ||
| */ | ||
| private Object firedResult; |
There was a problem hiding this comment.
Do we really need firedResult and firedThrowable? It seems like if isFired = true, then we can read aggregate.getNow(null) to get the result/throwable. It's safe to call that without a lock if the future is complete, and it means we don't have to worry about getting the order of setting them correct.
There was a problem hiding this comment.
Actually, wouldn't it even be safe for makeCompletedFuture to just return aggregate if isFired == true? It won't grow unbounded, because whatever continuations get added to it will be fired and removed immediately.
There was a problem hiding this comment.
Done in a6c7c36. Returning aggregate directly when aggregate.isDone() == true -- continuations registered on an already-completed CompletableFuture fire synchronously at registration and cleanStack removes them immediately, so the original per-iteration accumulation can't re-occur in the post-completion path. Dropped isFired, firedResult, firedThrowable, and makeCompletedFuture().
| * (and all pending entries have been drained). Mutated only under | ||
| * {@code lock}. | ||
| */ | ||
| private List<WeakReference<CompletableFuture<Object>>> pending = new ArrayList<>(); |
There was a problem hiding this comment.
I guess it's an improvement to store an empty WeakReference<CompletableFuture<Object>> instead of an unresolved CompletableFuture<Object>, but won't this list still grow unbounded? Maybe we should use a WeakHashMap instead, so the collection itself gets automatically pruned?
There was a problem hiding this comment.
I can see if that would work better.
Right now it doesn't grow unbounded because it prunes the old entries when its adding the new entry.
while (it.hasNext()) {
if (it.next().get() == null) {
it.remove();
}
}
There was a problem hiding this comment.
Switched to a WeakHashMap-backed Set in a6c7c36 -- entries auto-prune when GC reclaims their keys, no more manual loop. Listener snapshots new ArrayList<>(pending) under the lock so entries surviving GC to that point get strong refs and are completed.
Two simplifications suggested by @beekld: 1. Return aggregate directly when it has already completed. Continuations registered on an already-completed CompletableFuture fire synchronously at registration time and are removed from the stack immediately by cleanStack, so the original leak (per-iteration anyOf accumulation on a never-completing aggregate) cannot re-occur in the post-completion path. Drops the isFired flag, firedResult/firedThrowable fields, and the makeCompletedFuture helper. 2. Replace the WeakReference list with a WeakHashMap-backed Set. Fresh pending futures get GC'd automatically when the caller's loop iteration drops its strong reference, with no manual prune loop in getFuture(). All five aggregate tests still pass; full server-sdk suite (1857 tests) still passes. Verified bug-proving discipline: temporarily reverting getFuture() to the pre-fix shared-instance behavior makes the two distinctness tests + the exceptional-path test go red, exactly as before.
## What this adds Timed conditions that the FDv2 data source orchestrator (a later PR) observes alongside the active synchronizer's results to drive tier transitions: - **Fallback condition**: starts its timer when the synchronizer reports an interrupted status and cancels it when a change set arrives. If it fires (120 seconds by default), the orchestrator moves to the next available synchronizer. Terminal statuses (shutdown, terminal error, goodbye) do not arm the timer — the orchestrator reacts to those immediately rather than waiting out a fallback period — and repeated interruptions do not extend the deadline; the period counts from the first interruption. - **Recovery condition**: starts when observed and ignores results. If it fires (300 seconds by default), the orchestrator returns to the primary synchronizer. `getConditions` selects which conditions apply: none when only one synchronizer is available (nowhere to fall back to), fallback only for the primary, and both for a non-primary synchronizer. `ConditionGroup` merges its members and emits the first to fire. The timeout defaults match the other client-side FDv2 implementations. ## Why streams instead of futures The Java and C++ implementations model conditions as one-shot futures, and both had the same leak (fixed in java-core by `c27bf26` / launchdarkly/java-core#163): a future's listeners can never be detached, only released by completion, so an orchestration loop that races a long-lived pending future per result accumulates an irremovable listener garland per change set — unbounded on a healthy primary. Dart futures have the identical property (measured: ~559 B retained per race against a pending future), but Dart also has the primitive those platforms lack: cancellable stream subscriptions. Each condition therefore exposes a single-subscription `Stream<ConditionType>` that emits **at most once** and then closes (closing without emitting if the condition is closed first). Lifetimes are scoped to the subscription: self-starting timers begin when the stream is listened to, and cancelling the subscription closes the condition and releases its timers. One bounded edge remains by design — informing a never-listened group can arm a fallback timer until its timeout elapses or `close()` is called — and is documented on the API. The orchestrator PR consumes these with one subscription per synchronizer run, closes in a `finally`, and includes a soak test asserting bounded memory across a sustained stream of results. ## Testing Tests run inside a `package:fake_async` zone, advancing time with `elapse` and asserting timer state through `pendingTimers` — no timer injection in the production code. They cover timer start/cancel behavior for both condition kinds, terminal statuses not arming the fallback timer, the fallback deadline not extending on repeated interruptions, at-most-once emission including when two member timers contend in the same instant, close semantics, subscription cancellation releasing condition and group timers, group merging and inform broadcast, and the `getConditions` selection rules. SDK-2186 <!-- CURSOR_SUMMARY --> --- > [!NOTE] > **Medium Risk** > New timing logic will drive synchronizer failover when wired into the orchestrator; behavior is isolated in new modules with thorough tests and no production integration in this PR. > > **Overview** > Adds **FDv2 synchronizer tier-transition conditions** for a future orchestrator: timed signals that emit **fallback** (switch to the next synchronizer after sustained interruption) or **recovery** (return to primary after running on a backup). > > **Fallback** arms on `interrupted`, cancels on a change set, ignores terminal statuses, and does not reset the deadline on repeated interruptions (defaults: 120s). **Recovery** starts when the condition is observed and ignores results (default 300s). Conditions expose **single-subscription streams** (at-most-one emit) so subscriptions can be cancelled without the listener-retention issues of racing futures. > > `ConditionGroup` merges members (first fire wins), broadcasts `inform` to all members, and `getConditions` picks an empty group, fallback-only for primary, or fallback+recovery for non-primary when multiple synchronizers exist. > > Adds **`fake_async`** tests for timer behavior, close/cancel semantics, group contention, and `getConditions` rules. > > <sup>Reviewed by [Cursor Bugbot](https://cursor.com/bugbot) for commit 4fc80f6. Bugbot is set up for automated code reviews on this repo. Configure [here](https://www.cursor.com/dashboard/bugbot).</sup> <!-- /CURSOR_SUMMARY -->

Summary
FDv2DataSource.Conditions.getFuture()returned the same sharedCompletableFuture<Object>instance to every caller. The run loop doesCompletableFuture.anyOf(getFuture(), synchronizer.next()).get()on every iteration, which attaches a newOrRelayCompletionto the shared future'sstackeach time.CompletableFuturehas no deregister path for the loser of ananyOfrace, so thoseCompletionnodes stay on the stack until the shared future itself completes.On a healthy primary streaming ChangeSets without ever firing fallback/recovery, the shared future never completes — the
stackgrows monotonically for the synchronizer's entire tenure (effectively the SDK's uptime on a stable server).Per-iteration cost: ~200 B (OrRelay + anyOf result CF + chain references).
At 10 ChangeSets/sec sustained: ~150 MB/day per active synchronizer.
The fix
A single permanent
whenCompletelistener on the underlying aggregate fans out completion to every fresh future handed out bygetFuture(). Pending fresh futures are tracked viaWeakReference, so a fresh future whose only strong references were the caller's local variables (typical lifetime: one loop iteration) becomes garbage-collectable once that iteration ends. Pending entries whose referent has been collected are pruned opportunistically on eachgetFuture()call and onclose().Conditionsis now package-private (wasprivate) so direct unit tests can reach it. A test-onlypendingSize()helper is added.Test plan
Adds
FDv2DataSourceConditionsAggregateTestwith five tests:getFutureReturnsDistinctInstancesPerCall— bug-prover. Fails on the pre-fix shared-instance behavior, passes after the fix.getFutureReturnsDistinctInstancesEvenWithNoConditions— bug-prover. Covers the empty-conditions case (single-synchronizer configuration), which is exactly where per-iteration accumulation would be most damaging.allFreshFuturesCompleteWhenAggregateFires— verifies fan-out via the single permanent listener actually delivers to multiple fresh futures handed out before the aggregate fires.getFutureAfterAggregateFiresReturnsCompletedFuture— verifies the fast path: callers arriving after completion get an already-completed future synchronously.pendingListDoesNotGrowUnboundedlyWhenFreshFuturesAreDropped— 10k-iteration soak test that simulates the run-loop pattern (race a fresh future against a fast-resolving sibling, drop the result) and asserts the pending list stays bounded via GC + opportunistic pruning. Caveat in the test docstring aboutSystem.gc()not being guaranteed — if it ever flakes on CI we can migrate to-XX:+UseSerialGCor relax the ceiling.Verified bug-proving discipline: the two distinctness tests fail on the pre-fix shared-instance behavior and pass after the fix. The full server-sdk test suite (1857 tests across 109 classes) is clean.
Context
This was identified during a multi-agent review of the analogous cpp-sdks PR (launchdarkly/cpp-sdks#531), which mirrors this Java implementation's
Conditionsdesign. The cpp version has the same structural leak; this Java fix shape is what was prototyped there. Filing here first since the runtime impact on a long-running JVM-based server SDK is more pronounced.Note
Medium Risk
Touches the FDv2 synchronizer condition-aggregation logic used in the main run loop; mistakes could cause missed fallback/recovery signals or incorrect exceptional completion behavior, though changes are localized and covered by new unit tests.
Overview
Prevents a long-lived memory leak in
FDv2DataSource.Conditionsby changinggetFuture()to return a freshCompletableFutureper call until the underlying condition aggregate completes, rather than returning the same shared pending future each iteration.Adds a single
whenCompletefan-out from the aggregate to complete all outstanding per-call futures (and to propagate exceptional completion), tracks pending futures in aWeakHashMap-backed set for GC cleanup, and makesConditionspackage-private to allow direct testing.Introduces
FDv2DataSourceConditionsAggregateTestto assert per-call distinctness, correct completion fan-out, and correct behavior on exceptional and post-completion paths.Reviewed by Cursor Bugbot for commit a6c7c36. Bugbot is set up for automated code reviews on this repo. Configure here.