Skip to content

Commit b301213

Browse files
committed
fix: preserve FDv1 fallback signal across streaming error paths
When a streaming Start action latched the FDv1 Fallback Directive (via X-LD-FD-Fallback: true), a subsequent Fault, JSONDecodeError, or generic exception path could yield an Update with fallback_to_fdv1=False because each error path independently checked the failure's headers for the directive instead of consulting the latched state from the earlier Start. The most visible symptom is an HTTPStatusError 401 mid-stream after the directive was set: the SDK treated it as an ordinary unrecoverable failure (REMOVE) instead of honoring the directive (FDV1). Funnel every error-path Update through a small _with_fallback_signal helper that propagates the latched signal, and break out of the SSE read loop unconditionally once the directive is set so we don't keep retrying the FDv2 endpoint -- the directive is one-way and terminal.
1 parent f37695b commit b301213

2 files changed

Lines changed: 99 additions & 16 deletions

File tree

ldclient/impl/datasourcev2/streaming.py

Lines changed: 32 additions & 16 deletions
Original file line numberDiff line numberDiff line change
@@ -213,8 +213,25 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
213213
# fallback_requested is set when a Start action carries
214214
# X-LD-FD-Fallback: true. We finish applying the current payload
215215
# before halting, so consumers can serve the server-provided data
216-
# while FDv1 takes over.
216+
# while FDv1 takes over. The latch is one-way and terminal: once
217+
# set, any subsequent payload-completing event or error must carry
218+
# the signal forward and halt the stream, even if the failure path
219+
# itself doesn't see the directive header.
217220
fallback_requested = False
221+
222+
def _with_fallback_signal(update: Update) -> Update:
223+
"""Return ``update`` decorated with ``fallback_to_fdv1=True`` when
224+
the directive has been latched. Idempotent if already set."""
225+
if not fallback_requested or update.fallback_to_fdv1:
226+
return update
227+
return Update(
228+
state=update.state,
229+
change_set=update.change_set,
230+
error=update.error,
231+
fallback_to_fdv1=True,
232+
environment_id=update.environment_id,
233+
)
234+
218235
for action in self._sse.all:
219236
if isinstance(action, Fault):
220237
# If the SSE client detects the stream has closed, then it will
@@ -228,9 +245,12 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
228245

229246
(update, should_continue) = self._handle_error(action.error, envid)
230247
if update is not None:
231-
yield update
248+
yield _with_fallback_signal(update)
232249

233-
if not should_continue:
250+
# The FDv1 Fallback Directive is one-way and terminal: if it
251+
# was latched on a prior Start, we must not keep retrying the
252+
# FDv2 endpoint even when the failure itself looks recoverable.
253+
if fallback_requested or not should_continue:
234254
break
235255
continue
236256

@@ -248,16 +268,10 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
248268
self._record_stream_init(False)
249269
self._connection_attempt_start_time = None
250270
if fallback_requested:
251-
# Decorate the completed update with the fallback signal,
271+
# The completed update is the natural moment to honor
272+
# the latched directive: yield once with the signal,
252273
# then halt — the consumer will switch to FDv1.
253-
update = Update(
254-
state=update.state,
255-
change_set=update.change_set,
256-
error=update.error,
257-
fallback_to_fdv1=True,
258-
environment_id=update.environment_id,
259-
)
260-
yield update
274+
yield _with_fallback_signal(update)
261275
break
262276
yield update
263277
except json.decoder.JSONDecodeError as e:
@@ -268,23 +282,25 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
268282

269283
(update, should_continue) = self._handle_error(e, envid)
270284
if update is not None:
271-
yield update
272-
if not should_continue:
285+
yield _with_fallback_signal(update)
286+
if fallback_requested or not should_continue:
273287
break
274288
except Exception as e: # pylint: disable=broad-except
275289
log.info(
276290
"Error while handling stream event; will restart stream: %s", e
277291
)
278292
self._sse.interrupt()
279293

280-
yield Update(
294+
yield _with_fallback_signal(Update(
281295
state=DataSourceState.INTERRUPTED,
282296
error=DataSourceErrorInfo(
283297
DataSourceErrorKind.UNKNOWN, 0, time(), str(e)
284298
),
285299
fallback_to_fdv1=False,
286300
environment_id=envid,
287-
)
301+
))
302+
if fallback_requested:
303+
break
288304

289305
self._sse.close()
290306
# Force-close the underlying urllib3 pool. SSEClient.close() only does a

ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py

Lines changed: 67 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -626,6 +626,73 @@ def test_fallback_header_with_payload_emits_valid_with_fallback(events): # pyli
626626
assert len(updates[0].change_set.changes) == 1
627627

628628

629+
def test_fallback_latched_on_start_carries_through_unrecoverable_fault():
630+
"""Once a Start latches the FDv1 directive, an unrecoverable Fault that
631+
follows must propagate the directive even when the error itself does not
632+
carry the header. The directive is one-way and terminal, so the latched
633+
state from the original Start drives the Update emitted on shutdown --
634+
losing it would silently strand the consumer on FDv2 instead of handing
635+
off to the FDv1 Fallback Synchronizer."""
636+
start_action = Start(headers={_LD_FD_FALLBACK_HEADER: 'true'})
637+
# 401 is unrecoverable and the error carries no fallback header itself.
638+
error = HTTPStatusError(401)
639+
fault_action = Fault(error=error)
640+
641+
builder = list_sse_client([start_action, fault_action])
642+
643+
synchronizer = make_streaming_data_source()
644+
synchronizer._sse_client_builder = builder
645+
updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
646+
647+
assert len(updates) == 1
648+
assert updates[0].state == DataSourceState.OFF
649+
assert updates[0].fallback_to_fdv1 is True
650+
assert updates[0].error is not None
651+
assert updates[0].error.status_code == 401
652+
653+
654+
def test_fallback_latched_on_start_carries_through_recoverable_fault():
655+
"""A recoverable Fault arriving after the directive was latched must also
656+
propagate the signal and halt the stream -- the directive overrides the
657+
ordinary retry policy because it is terminal."""
658+
start_action = Start(headers={_LD_FD_FALLBACK_HEADER: 'true'})
659+
# 408 is recoverable; without the latch we would retry transparently.
660+
fault_action = Fault(error=HTTPStatusError(408))
661+
662+
builder = list_sse_client([start_action, fault_action])
663+
664+
synchronizer = make_streaming_data_source()
665+
synchronizer._sse_client_builder = builder
666+
updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
667+
668+
assert len(updates) == 1
669+
assert updates[0].fallback_to_fdv1 is True
670+
# 408 is recoverable so the Update from _handle_error is INTERRUPTED, but
671+
# the latched directive must still drive the consumer to FDv1.
672+
assert updates[0].state == DataSourceState.INTERRUPTED
673+
674+
675+
def test_fallback_latched_on_start_carries_through_malformed_event(events): # pylint: disable=redefined-outer-name
676+
"""A malformed event (JSONDecodeError) after the directive was latched
677+
must propagate the signal on the resulting Interrupted Update."""
678+
bad_event = Event(event=EventName.PUT_OBJECT, data="not valid json")
679+
start_action = Start(headers={_LD_FD_FALLBACK_HEADER: 'true'})
680+
681+
builder = list_sse_client([start_action, events[EventName.SERVER_INTENT], bad_event])
682+
683+
synchronizer = make_streaming_data_source()
684+
synchronizer._sse_client_builder = builder
685+
updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
686+
687+
# The malformed-event update must surface the latched directive so the
688+
# consumer can hand off to FDv1 instead of trying to keep the FDv2 stream.
689+
assert len(updates) == 1
690+
assert updates[0].state == DataSourceState.INTERRUPTED
691+
assert updates[0].fallback_to_fdv1 is True
692+
assert updates[0].error is not None
693+
assert updates[0].error.kind == DataSourceErrorKind.INVALID_DATA
694+
695+
629696
def test_streaming_closes_underlying_pool_on_fallback(events): # pylint: disable=redefined-outer-name
630697
"""When the FDv1 Fallback Directive engages, the underlying urllib3
631698
connection pool must be torn down so the FDv2 streaming TCP connection

0 commit comments

Comments
 (0)