Skip to content

Commit cafcca1

Browse files
committed
chore: Drop streaming builder shim and dedupe initializer check
Two follow-up cleanups from review of the FDv1 Fallback Directive work: - Drop the test-only shim in StreamingDataSource.sync() that accepted either a bare SSEClient or a (client, pool) tuple from the injected builder. The SseClientBuilder type alias already declares the tuple return; annotating the field as SseClientBuilder lets pyright align, and the lone test fixture (list_sse_client) now returns (client, None) like the production create_sse_client does. - Hoist basis.change_set.selector.is_defined() into a local in FDv2._run_initializers() so it isn't computed twice on the success-with-fallback path. Also fixes a typo in the adjacent comment ("if an only if" -> "if and only if").
1 parent b301213 commit cafcca1

3 files changed

Lines changed: 9 additions & 15 deletions

File tree

ldclient/impl/datasourcev2/streaming.py

Lines changed: 2 additions & 8 deletions
Original file line numberDiff line numberDiff line change
@@ -163,7 +163,7 @@ def __init__(self,
163163
self.__http_options = http_options
164164
self.__initial_reconnect_delay = initial_reconnect_delay
165165

166-
self._sse_client_builder = create_sse_client
166+
self._sse_client_builder: SseClientBuilder = create_sse_client
167167
self._config = config
168168
self._sse: Optional[SSEClient] = None
169169
self._sse_pool: Optional[urllib3.PoolManager] = None
@@ -187,19 +187,13 @@ def sync(self, ss: SelectorStore) -> Generator[Update, None, None]:
187187
Update objects until the connection is closed or an unrecoverable error
188188
occurs.
189189
"""
190-
builder_result = self._sse_client_builder(
190+
self._sse, self._sse_pool = self._sse_client_builder(
191191
self.__uri,
192192
self.__http_options,
193193
self.__initial_reconnect_delay,
194194
self._config,
195195
ss
196196
)
197-
# Tests may inject a builder that returns either an SSEClient directly
198-
# or a (client, pool) tuple. Accept both.
199-
if isinstance(builder_result, tuple):
200-
self._sse, self._sse_pool = builder_result
201-
else:
202-
self._sse, self._sse_pool = builder_result, None
203197

204198
if self._sse is None:
205199
log.error("Failed to create SSE client for streaming updates.")

ldclient/impl/datasystem/fdv2.py

Lines changed: 4 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -486,8 +486,9 @@ def _run_initializers(self, set_on_ready: Event) -> bool:
486486
# Apply the basis to the store
487487
self._store.apply(basis.change_set, basis.persist)
488488

489-
# Set ready event if an only if a selector is defined for the changeset
490-
if basis.change_set.selector.is_defined():
489+
# Set ready event if and only if a selector is defined for the changeset
490+
selector_defined = basis.change_set.selector.is_defined()
491+
if selector_defined:
491492
set_on_ready.set()
492493

493494
if basis.fallback_to_fdv1:
@@ -497,7 +498,7 @@ def _run_initializers(self, set_on_ready: Event) -> bool:
497498
)
498499
return True
499500

500-
if basis.change_set.selector.is_defined():
501+
if selector_defined:
501502
return False
502503
except Exception as e:
503504
log.error("Initializer failed with exception: %s", e)

ldclient/testing/impl/datasourcev2/test_streaming_synchronizer.py

Lines changed: 3 additions & 4 deletions
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,6 @@
1313
from ldclient.config import Config, HTTPConfig
1414
from ldclient.impl.datasourcev2.streaming import (
1515
STREAMING_ENDPOINT,
16-
SSEClient,
1716
SseClientBuilder,
1817
StreamingDataSource
1918
)
@@ -48,8 +47,8 @@ def builder(
4847
initial_reconnect_delay: float,
4948
config: Config, # pylint: disable=unused-argument
5049
ss: SelectorStore # pylint: disable=unused-argument
51-
) -> SSEClient:
52-
return ListBasedSseClient(events)
50+
):
51+
return ListBasedSseClient(events), None
5352

5453
return builder
5554

@@ -743,7 +742,7 @@ def builder(*_args, **_kwargs):
743742
]), tracking_pool
744743

745744
synchronizer = make_streaming_data_source()
746-
synchronizer._sse_client_builder = builder
745+
synchronizer._sse_client_builder = builder # type: ignore[assignment]
747746
updates = list(synchronizer.sync(MockSelectorStore(Selector.no_selector())))
748747

749748
assert len(updates) == 1

0 commit comments

Comments
 (0)