From 32a9ead0ad8d9d0c305b257ac4aa54787f433f5e Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 11:25:16 -0400 Subject: [PATCH 1/9] fix: attempt to reconnect to sse server when response ends prematurely. --- .../managers/config_manager.py | 94 +++++++++++++++++-- devcycle_python_sdk/managers/sse_manager.py | 10 +- 2 files changed, 92 insertions(+), 12 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index b8b90b0..caa0dd6 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -39,6 +39,9 @@ def __init__( self._config: Optional[dict] = None self._config_etag: Optional[str] = None self._config_lastmodified: Optional[str] = None + self._sse_reconnecting = False + self._last_reconnect_attempt_time: Optional[float] = None + self._sse_reconnect_lock = threading.Lock() self._config_api_client = ConfigAPIClient(self._sdk_key, self._options) @@ -49,6 +52,46 @@ def __init__( def is_initialized(self) -> bool: return self._config is not None + def _recreate_sse_connection(self): + """Recreate the SSE connection with the current config.""" + with self._sse_reconnect_lock: + if self._config is None or self._options.disable_realtime_updates: + logger.debug("Skipping SSE recreation - no config or updates disabled") + return + + try: + # Close existing connection if present + if self._sse_manager is not None and self._sse_manager.client is not None: + logger.debug("Closing existing SSE connection before recreating") + self._sse_manager.client.close() + if self._sse_manager.read_thread.is_alive(): + self._sse_manager.read_thread.join(timeout=1.0) + + # Create new SSE manager + self._sse_manager = SSEManager( + self.sse_state, + self.sse_error, + self.sse_message, + ) + self._sse_manager.update(self._config) + logger.info("SSE connection recreated successfully") + except Exception as e: + logger.error(f"Devcycle: Failed to recreate SSE connection: {e}") + + def _delayed_sse_reconnect(self): + """Delayed SSE reconnection to allow error state to settle.""" + try: + logger.debug("Waiting 2 seconds before reconnecting SSE...") + time.sleep(2.0) + logger.debug("Attempting to recreate SSE connection") + self._recreate_sse_connection() + except Exception as e: + logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") + finally: + # Always clear the reconnecting flag when done (success or failure) + with self._sse_reconnect_lock: + self._sse_reconnecting = False + def _get_config(self, last_modified: Optional[float] = None): try: lm_header = self._config_lastmodified @@ -87,12 +130,8 @@ def _get_config(self, last_modified: Optional[float] = None): or self._sse_manager.client is None or not self._sse_manager.read_thread.is_alive() ): - self._sse_manager = SSEManager( - self.sse_state, - self.sse_error, - self.sse_message, - ) - self._sse_manager.update(self._config) + logger.info("DevCycle: SSE connection not active, creating new connection") + self._recreate_sse_connection() if ( trigger_on_client_initialized @@ -101,7 +140,6 @@ def _get_config(self, last_modified: Optional[float] = None): try: self._options.on_client_initialized() except Exception as e: - # consume any error logger.warning( f"DevCycle: Error received from on_client_initialized callback: {str(e)}" ) @@ -122,7 +160,6 @@ def run(self): self._get_config() except Exception as e: if self._polling_enabled: - # Only log a warning if we're still polling logger.warning( f"DevCycle: Error polling for config changes: {str(e)}" ) @@ -148,12 +185,51 @@ def sse_message(self, message: ld_eventsource.actions.Event): def sse_error(self, error: ld_eventsource.actions.Fault): self._sse_connected = False - logger.debug(f"DevCycle: Received SSE error: {error}") + logger.debug(f"SSE connection error: {error.error}") + + current_time = time.time() + min_reconnect_interval = 5.0 + + with self._sse_reconnect_lock: + # Check if we're already reconnecting + if self._sse_reconnecting: + logger.debug("Reconnection already in progress, skipping") + return + + # Check if we need to wait for backoff + if (self._last_reconnect_attempt_time is not None and + current_time - self._last_reconnect_attempt_time < min_reconnect_interval): + logger.debug( + f"Skipping reconnection, waiting for backoff period ({min_reconnect_interval}s)" + ) + return + + # Mark that we're now reconnecting + self._sse_reconnecting = True + self._last_reconnect_attempt_time = current_time + + logger.info("Attempting SSE reconnection...") + + # Schedule reconnection in a separate thread + reconnect_thread = threading.Thread( + target=self._delayed_sse_reconnect, + daemon=True + ) + reconnect_thread.start() def sse_state(self, state: Optional[ld_eventsource.actions.Start]): if not self._sse_connected: self._sse_connected = True logger.info("DevCycle: Connected to SSE stream") + + # Clear reconnection state on successful connection + with self._sse_reconnect_lock: + self._sse_reconnecting = False + self._last_reconnect_attempt_time = None + else: + logger.debug("SSE keepalive received") def close(self): self._polling_enabled = False + if self._sse_manager is not None and self._sse_manager.client is not None: + self._sse_manager.client.close() \ No newline at end of file diff --git a/devcycle_python_sdk/managers/sse_manager.py b/devcycle_python_sdk/managers/sse_manager.py index 13b97da..d6f97ea 100644 --- a/devcycle_python_sdk/managers/sse_manager.py +++ b/devcycle_python_sdk/managers/sse_manager.py @@ -45,7 +45,11 @@ def read_events( elif isinstance(event, ld_eventsource.actions.Comment): handle_state(None) except Exception as e: - logger.debug(f"DevCycle: failed to read SSE message: {e}") + logger.error(f"DevCycle SSE: Error in read loop: {e}") + fault_event = ld_eventsource.actions.Fault(error=e) + handle_error(fault_event) + finally: + logger.info("DevCycle SSE: Connection closed") def update(self, config: dict): if self.use_new_config(config["sse"]): @@ -66,6 +70,6 @@ def update(self, config: dict): def use_new_config(self, config: dict) -> bool: new_url = config["hostname"] + config["path"] - if self.url == "" or self.url is None and new_url != "": + if (self.url == "" or self.url is None) and new_url != "": return True - return self.url != new_url + return self.url != new_url \ No newline at end of file From 71687c6e8be22c7c7ff246c9186110cdd44612d7 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 15:04:52 -0400 Subject: [PATCH 2/9] feat: add reconnection with a exponentially growing backoff timer to prevent instances where we try to reconnect forever and spam the server. --- .../managers/config_manager.py | 61 ++++++++++++++----- 1 file changed, 45 insertions(+), 16 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index caa0dd6..11ad658 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -39,10 +39,14 @@ def __init__( self._config: Optional[dict] = None self._config_etag: Optional[str] = None self._config_lastmodified: Optional[str] = None - self._sse_reconnecting = False + + # Exponential backoff configuration + self._sse_reconnect_attempts = 0 + self._min_reconnect_interval = 5.0 # Start at 5 seconds + self._max_reconnect_interval = 300.0 # Cap at 5 minutes self._last_reconnect_attempt_time: Optional[float] = None + self._sse_reconnecting = False self._sse_reconnect_lock = threading.Lock() - self._config_api_client = ConfigAPIClient(self._sdk_key, self._options) self._polling_enabled = True @@ -78,19 +82,19 @@ def _recreate_sse_connection(self): except Exception as e: logger.error(f"Devcycle: Failed to recreate SSE connection: {e}") - def _delayed_sse_reconnect(self): - """Delayed SSE reconnection to allow error state to settle.""" + def _delayed_sse_reconnect(self, delay_seconds: float): + """Delayed SSE reconnection with configurable backoff.""" try: - logger.debug("Waiting 2 seconds before reconnecting SSE...") - time.sleep(2.0) - logger.debug("Attempting to recreate SSE connection") + logger.debug(f"Waiting {delay_seconds}s before reconnecting SSE...") + time.sleep(delay_seconds) + logger.debug("Delay complete, attempting to recreate SSE connection") self._recreate_sse_connection() except Exception as e: - logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") + logger.error(f"Error during delayed SSE reconnection: {e}") finally: - # Always clear the reconnecting flag when done (success or failure) with self._sse_reconnect_lock: self._sse_reconnecting = False + logger.debug("Reconnection attempt completed") def _get_config(self, last_modified: Optional[float] = None): try: @@ -174,6 +178,8 @@ def sse_message(self, message: ld_eventsource.actions.Event): self.sse_state(None) logger.info(f"DevCycle: Received message: {message.data}") sse_message = json.loads(message.data) + + dvc_data = json.loads(sse_message.get("data")) if ( dvc_data.get("type") == "refetchConfig" @@ -182,13 +188,30 @@ def sse_message(self, message: ld_eventsource.actions.Event): ): logger.info("DevCycle: Received refetchConfig message - updating config") self._get_config(dvc_data["lastModified"] / 1000.0) + # Succesfully maintained connection and received ping, reset our connect attempts. + if(dvc_data.get("type") == 'ping'): + self._sse_reconnect_attempts = 0 def sse_error(self, error: ld_eventsource.actions.Fault): + """ + Handle SSE connection errors with exponential backoff reconnection. + + Switches to polling mode (10s intervals) and attempts reconnection with backoff: + 5s → 10s → 20s → 40s → 80s → 160s → 300s (capped at 5 min). + Backoff resets on successful reconnection. + + Thread-safe with _sse_reconnect_lock to prevent concurrent reconnection attempts. + """ + """Handle SSE connection errors with exponential backoff.""" self._sse_connected = False logger.debug(f"SSE connection error: {error.error}") - - current_time = time.time() - min_reconnect_interval = 5.0 + current_time = time.time() + + # Calculate exponential backoff interval (capped at max) + backoff_interval = min( + self._min_reconnect_interval * (2 ** self._sse_reconnect_attempts), + self._max_reconnect_interval + ) with self._sse_reconnect_lock: # Check if we're already reconnecting @@ -198,21 +221,27 @@ def sse_error(self, error: ld_eventsource.actions.Fault): # Check if we need to wait for backoff if (self._last_reconnect_attempt_time is not None and - current_time - self._last_reconnect_attempt_time < min_reconnect_interval): + current_time - self._last_reconnect_attempt_time < backoff_interval): + time_remaining = backoff_interval - (current_time - self._last_reconnect_attempt_time) logger.debug( - f"Skipping reconnection, waiting for backoff period ({min_reconnect_interval}s)" + f"Skipping reconnection attempt, waiting for backoff period " + f"({time_remaining:.1f}s remaining of {backoff_interval:.1f}s)" ) return # Mark that we're now reconnecting self._sse_reconnecting = True self._last_reconnect_attempt_time = current_time + self._sse_reconnect_attempts += 1 - logger.info("Attempting SSE reconnection...") + logger.info( + f"Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " + f"next backoff: {backoff_interval:.1f}s)" + ) - # Schedule reconnection in a separate thread reconnect_thread = threading.Thread( target=self._delayed_sse_reconnect, + args=(backoff_interval,), daemon=True ) reconnect_thread.start() From 801e9454afb16352e74b9aa86646af45cad556e2 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 16:15:46 -0400 Subject: [PATCH 3/9] chore: make sse error logs debug level. fix: bug where client crashes when http error code received. chore: lint errors --- .../managers/config_manager.py | 93 +++++++++---------- devcycle_python_sdk/managers/sse_manager.py | 8 +- 2 files changed, 46 insertions(+), 55 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index 11ad658..01e381f 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -40,7 +40,7 @@ def __init__( self._config_etag: Optional[str] = None self._config_lastmodified: Optional[str] = None - # Exponential backoff configuration + # Exponential backoff configuration self._sse_reconnect_attempts = 0 self._min_reconnect_interval = 5.0 # Start at 5 seconds self._max_reconnect_interval = 300.0 # Cap at 5 minutes @@ -60,13 +60,15 @@ def _recreate_sse_connection(self): """Recreate the SSE connection with the current config.""" with self._sse_reconnect_lock: if self._config is None or self._options.disable_realtime_updates: - logger.debug("Skipping SSE recreation - no config or updates disabled") + logger.debug("Devcycle: Skipping SSE recreation - no config or updates disabled") return try: # Close existing connection if present - if self._sse_manager is not None and self._sse_manager.client is not None: - logger.debug("Closing existing SSE connection before recreating") + if ( + self._sse_manager is not None + and self._sse_manager.client is not None + ): self._sse_manager.client.close() if self._sse_manager.read_thread.is_alive(): self._sse_manager.read_thread.join(timeout=1.0) @@ -78,23 +80,21 @@ def _recreate_sse_connection(self): self.sse_message, ) self._sse_manager.update(self._config) - logger.info("SSE connection recreated successfully") + logger.info("Devcyle: SSE connection created successfully") except Exception as e: - logger.error(f"Devcycle: Failed to recreate SSE connection: {e}") + logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" try: - logger.debug(f"Waiting {delay_seconds}s before reconnecting SSE...") - time.sleep(delay_seconds) - logger.debug("Delay complete, attempting to recreate SSE connection") + logger.debug(f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE...") + time.sleep(delay_seconds) self._recreate_sse_connection() except Exception as e: - logger.error(f"Error during delayed SSE reconnection: {e}") + logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") finally: with self._sse_reconnect_lock: self._sse_reconnecting = False - logger.debug("Reconnection attempt completed") def _get_config(self, last_modified: Optional[float] = None): try: @@ -134,7 +134,9 @@ def _get_config(self, last_modified: Optional[float] = None): or self._sse_manager.client is None or not self._sse_manager.read_thread.is_alive() ): - logger.info("DevCycle: SSE connection not active, creating new connection") + logger.info( + "DevCycle: SSE connection not active, creating new connection" + ) self._recreate_sse_connection() if ( @@ -178,8 +180,7 @@ def sse_message(self, message: ld_eventsource.actions.Event): self.sse_state(None) logger.info(f"DevCycle: Received message: {message.data}") sse_message = json.loads(message.data) - - + dvc_data = json.loads(sse_message.get("data")) if ( dvc_data.get("type") == "refetchConfig" @@ -188,61 +189,51 @@ def sse_message(self, message: ld_eventsource.actions.Event): ): logger.info("DevCycle: Received refetchConfig message - updating config") self._get_config(dvc_data["lastModified"] / 1000.0) - # Succesfully maintained connection and received ping, reset our connect attempts. - if(dvc_data.get("type") == 'ping'): + # SSE connection healthy, reconnect attempts reset. + if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig" : self._sse_reconnect_attempts = 0 def sse_error(self, error: ld_eventsource.actions.Fault): - """ - Handle SSE connection errors with exponential backoff reconnection. - - Switches to polling mode (10s intervals) and attempts reconnection with backoff: - 5s → 10s → 20s → 40s → 80s → 160s → 300s (capped at 5 min). - Backoff resets on successful reconnection. - - Thread-safe with _sse_reconnect_lock to prevent concurrent reconnection attempts. - """ - """Handle SSE connection errors with exponential backoff.""" self._sse_connected = False - logger.debug(f"SSE connection error: {error.error}") - current_time = time.time() - + logger.debug(f"Devcyle: SSE connection error: {error.error}") + current_time = time.time() + # Calculate exponential backoff interval (capped at max) backoff_interval = min( - self._min_reconnect_interval * (2 ** self._sse_reconnect_attempts), - self._max_reconnect_interval + self._min_reconnect_interval * (2**self._sse_reconnect_attempts), + self._max_reconnect_interval, ) with self._sse_reconnect_lock: - # Check if we're already reconnecting if self._sse_reconnecting: - logger.debug("Reconnection already in progress, skipping") + logger.debug("Devcyle: Reconnection already in progress, skipping") return - + # Check if we need to wait for backoff - if (self._last_reconnect_attempt_time is not None and - current_time - self._last_reconnect_attempt_time < backoff_interval): - time_remaining = backoff_interval - (current_time - self._last_reconnect_attempt_time) + if ( + self._last_reconnect_attempt_time is not None + and current_time - self._last_reconnect_attempt_time < backoff_interval + ): + time_remaining = backoff_interval - ( + current_time - self._last_reconnect_attempt_time + ) logger.debug( - f"Skipping reconnection attempt, waiting for backoff period " + f"Devcyle: Skipping reconnection attempt, waiting for backoff period " f"({time_remaining:.1f}s remaining of {backoff_interval:.1f}s)" ) return - - # Mark that we're now reconnecting + self._sse_reconnecting = True self._last_reconnect_attempt_time = current_time self._sse_reconnect_attempts += 1 - - logger.info( - f"Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " + + logger.debug( + f"Devcyle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " f"next backoff: {backoff_interval:.1f}s)" ) - + reconnect_thread = threading.Thread( - target=self._delayed_sse_reconnect, - args=(backoff_interval,), - daemon=True + target=self._delayed_sse_reconnect, args=(backoff_interval,), daemon=True ) reconnect_thread.start() @@ -250,15 +241,15 @@ def sse_state(self, state: Optional[ld_eventsource.actions.Start]): if not self._sse_connected: self._sse_connected = True logger.info("DevCycle: Connected to SSE stream") - - # Clear reconnection state on successful connection + + # Clear reconnection state with self._sse_reconnect_lock: self._sse_reconnecting = False self._last_reconnect_attempt_time = None else: - logger.debug("SSE keepalive received") + logger.debug("Devcyle: SSE keepalive received") def close(self): self._polling_enabled = False if self._sse_manager is not None and self._sse_manager.client is not None: - self._sse_manager.client.close() \ No newline at end of file + self._sse_manager.client.close() diff --git a/devcycle_python_sdk/managers/sse_manager.py b/devcycle_python_sdk/managers/sse_manager.py index d6f97ea..f90880b 100644 --- a/devcycle_python_sdk/managers/sse_manager.py +++ b/devcycle_python_sdk/managers/sse_manager.py @@ -33,8 +33,8 @@ def read_events( handle_error: Callable[[ld_eventsource.actions.Fault], None], handle_message: Callable[[ld_eventsource.actions.Event], None], ): - self.client.start() try: + self.client.start() for event in self.client.all: if isinstance(event, ld_eventsource.actions.Start): handle_state(event) @@ -45,11 +45,11 @@ def read_events( elif isinstance(event, ld_eventsource.actions.Comment): handle_state(None) except Exception as e: - logger.error(f"DevCycle SSE: Error in read loop: {e}") + logger.debug(f"DevCycle SSE: Error in read loop: {e}") fault_event = ld_eventsource.actions.Fault(error=e) handle_error(fault_event) finally: - logger.info("DevCycle SSE: Connection closed") + logger.debug("DevCycle SSE: Connection closed") def update(self, config: dict): if self.use_new_config(config["sse"]): @@ -72,4 +72,4 @@ def use_new_config(self, config: dict) -> bool: new_url = config["hostname"] + config["path"] if (self.url == "" or self.url is None) and new_url != "": return True - return self.url != new_url \ No newline at end of file + return self.url != new_url From 8c27a71eb73331dd9303889ee42be87cada21cae Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 16:17:03 -0400 Subject: [PATCH 4/9] lint error --- devcycle_python_sdk/managers/config_manager.py | 10 +++++++--- 1 file changed, 7 insertions(+), 3 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index 01e381f..d29d39a 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -60,7 +60,9 @@ def _recreate_sse_connection(self): """Recreate the SSE connection with the current config.""" with self._sse_reconnect_lock: if self._config is None or self._options.disable_realtime_updates: - logger.debug("Devcycle: Skipping SSE recreation - no config or updates disabled") + logger.debug( + "Devcycle: Skipping SSE recreation - no config or updates disabled" + ) return try: @@ -87,7 +89,9 @@ def _recreate_sse_connection(self): def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" try: - logger.debug(f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE...") + logger.debug( + f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE..." + ) time.sleep(delay_seconds) self._recreate_sse_connection() except Exception as e: @@ -190,7 +194,7 @@ def sse_message(self, message: ld_eventsource.actions.Event): logger.info("DevCycle: Received refetchConfig message - updating config") self._get_config(dvc_data["lastModified"] / 1000.0) # SSE connection healthy, reconnect attempts reset. - if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig" : + if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig": self._sse_reconnect_attempts = 0 def sse_error(self, error: ld_eventsource.actions.Fault): From fc0f8d782a35e5f11fcf68d804f7028085f93d00 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 17:50:52 -0400 Subject: [PATCH 5/9] chore: remove locking, fix typos --- .../managers/config_manager.py | 106 ++++++++---------- devcycle_python_sdk/managers/sse_manager.py | 1 + example/cloud_client_example.py | 2 +- 3 files changed, 50 insertions(+), 59 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index d29d39a..c34a428 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -46,7 +46,6 @@ def __init__( self._max_reconnect_interval = 300.0 # Cap at 5 minutes self._last_reconnect_attempt_time: Optional[float] = None self._sse_reconnecting = False - self._sse_reconnect_lock = threading.Lock() self._config_api_client = ConfigAPIClient(self._sdk_key, self._options) self._polling_enabled = True @@ -58,47 +57,45 @@ def is_initialized(self) -> bool: def _recreate_sse_connection(self): """Recreate the SSE connection with the current config.""" - with self._sse_reconnect_lock: - if self._config is None or self._options.disable_realtime_updates: - logger.debug( - "Devcycle: Skipping SSE recreation - no config or updates disabled" - ) - return + if self._config is None or self._options.disable_realtime_updates: + logger.debug( + "DevCycle: Skipping SSE recreation - no config or updates disabled" + ) + return - try: - # Close existing connection if present - if ( - self._sse_manager is not None - and self._sse_manager.client is not None - ): - self._sse_manager.client.close() - if self._sse_manager.read_thread.is_alive(): - self._sse_manager.read_thread.join(timeout=1.0) - - # Create new SSE manager - self._sse_manager = SSEManager( - self.sse_state, - self.sse_error, - self.sse_message, - ) - self._sse_manager.update(self._config) - logger.info("Devcyle: SSE connection created successfully") - except Exception as e: - logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") + # Update timestamp right before attempting connection + self._last_reconnect_attempt_time = time.time() + + try: + # Close existing connection if present + if self._sse_manager is not None and self._sse_manager.client is not None: + self._sse_manager.client.close() + if self._sse_manager.read_thread.is_alive(): + self._sse_manager.read_thread.join(timeout=1.0) + + # Create new SSE manager + self._sse_manager = SSEManager( + self.sse_state, + self.sse_error, + self.sse_message, + ) + self._sse_manager.update(self._config) + + except Exception as e: + logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" try: logger.debug( - f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE..." + f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..." ) time.sleep(delay_seconds) self._recreate_sse_connection() except Exception as e: - logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") + logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}") finally: - with self._sse_reconnect_lock: - self._sse_reconnecting = False + self._sse_reconnecting = False def _get_config(self, last_modified: Optional[float] = None): try: @@ -199,45 +196,39 @@ def sse_message(self, message: ld_eventsource.actions.Event): def sse_error(self, error: ld_eventsource.actions.Fault): self._sse_connected = False - logger.debug(f"Devcyle: SSE connection error: {error.error}") + logger.debug(f"DevCyle: SSE connection error: {error.error}") current_time = time.time() + if self._sse_reconnecting: + logger.debug("DevCyle: Reconnection already in progress, skipping") + return + # Calculate exponential backoff interval (capped at max) backoff_interval = min( self._min_reconnect_interval * (2**self._sse_reconnect_attempts), self._max_reconnect_interval, ) - with self._sse_reconnect_lock: - if self._sse_reconnecting: - logger.debug("Devcyle: Reconnection already in progress, skipping") - return - - # Check if we need to wait for backoff - if ( - self._last_reconnect_attempt_time is not None - and current_time - self._last_reconnect_attempt_time < backoff_interval - ): - time_remaining = backoff_interval - ( - current_time - self._last_reconnect_attempt_time - ) + # Check if we need to wait for remaining backoff time + delay_seconds = backoff_interval + if self._last_reconnect_attempt_time is not None: + time_since_last_attempt = current_time - self._last_reconnect_attempt_time + if time_since_last_attempt < backoff_interval: + delay_seconds = backoff_interval - time_since_last_attempt logger.debug( - f"Devcyle: Skipping reconnection attempt, waiting for backoff period " - f"({time_remaining:.1f}s remaining of {backoff_interval:.1f}s)" + f"DevCyle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s" ) - return - self._sse_reconnecting = True - self._last_reconnect_attempt_time = current_time - self._sse_reconnect_attempts += 1 + self._sse_reconnecting = True + self._sse_reconnect_attempts += 1 logger.debug( - f"Devcyle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " - f"next backoff: {backoff_interval:.1f}s)" + f"DevCyle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " + f"backoff: {delay_seconds:.1f}s)" ) reconnect_thread = threading.Thread( - target=self._delayed_sse_reconnect, args=(backoff_interval,), daemon=True + target=self._delayed_sse_reconnect, args=(delay_seconds,), daemon=True ) reconnect_thread.start() @@ -247,11 +238,10 @@ def sse_state(self, state: Optional[ld_eventsource.actions.Start]): logger.info("DevCycle: Connected to SSE stream") # Clear reconnection state - with self._sse_reconnect_lock: - self._sse_reconnecting = False - self._last_reconnect_attempt_time = None + self._sse_reconnecting = False + self._last_reconnect_attempt_time = None else: - logger.debug("Devcyle: SSE keepalive received") + logger.debug("DevCyle: SSE keepalive received") def close(self): self._polling_enabled = False diff --git a/devcycle_python_sdk/managers/sse_manager.py b/devcycle_python_sdk/managers/sse_manager.py index f90880b..180e575 100644 --- a/devcycle_python_sdk/managers/sse_manager.py +++ b/devcycle_python_sdk/managers/sse_manager.py @@ -35,6 +35,7 @@ def read_events( ): try: self.client.start() + logger.info("DevCycle: SSE connection created successfully") for event in self.client.all: if isinstance(event, ld_eventsource.actions.Start): handle_state(event) diff --git a/example/cloud_client_example.py b/example/cloud_client_example.py index 0a72f70..1310f82 100644 --- a/example/cloud_client_example.py +++ b/example/cloud_client_example.py @@ -61,7 +61,7 @@ def main(): client.track(user, event) except Exception as e: - logger.exception(f"Exception when calling Devcycle API: {e}\n") + logger.exception(f"Exception when calling DevCycle API: {e}\n") if __name__ == "__main__": From 7b0e1acfe1c20e483919328607ed7525a33dfca2 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 18:02:53 -0400 Subject: [PATCH 6/9] chore: typo and tests Update test_config_manager.py --- .../managers/config_manager.py | 6 +- test/managers/test_config_manager.py | 241 +++++++++++++++++- 2 files changed, 243 insertions(+), 4 deletions(-) mode change 100644 => 100755 devcycle_python_sdk/managers/config_manager.py diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py old mode 100644 new mode 100755 index c34a428..1b41c2e --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -216,14 +216,14 @@ def sse_error(self, error: ld_eventsource.actions.Fault): if time_since_last_attempt < backoff_interval: delay_seconds = backoff_interval - time_since_last_attempt logger.debug( - f"DevCyle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s" + f"DevCycle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s" ) self._sse_reconnecting = True self._sse_reconnect_attempts += 1 logger.debug( - f"DevCyle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " + f"DevCycle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " f"backoff: {delay_seconds:.1f}s)" ) @@ -241,7 +241,7 @@ def sse_state(self, state: Optional[ld_eventsource.actions.Start]): self._sse_reconnecting = False self._last_reconnect_attempt_time = None else: - logger.debug("DevCyle: SSE keepalive received") + logger.debug("DevCycle: SSE keepalive received") def close(self): self._polling_enabled = False diff --git a/test/managers/test_config_manager.py b/test/managers/test_config_manager.py index e8a2a47..6fd7fa7 100644 --- a/test/managers/test_config_manager.py +++ b/test/managers/test_config_manager.py @@ -6,7 +6,9 @@ from datetime import datetime from email.utils import formatdate from time import mktime -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, Mock + +import ld_eventsource.actions from devcycle_python_sdk import DevCycleLocalOptions from devcycle_python_sdk.managers.config_manager import EnvironmentConfigManager @@ -152,5 +154,242 @@ def test_get_config_unchanged(self, mock_get_config): self.test_local_bucketing.store_config.assert_not_called() +class SSEReconnectionBackoffTest(unittest.TestCase): + """Tests for SSE exponential backoff reconnection behavior""" + + def setUp(self) -> None: + self.sdk_key = "dvc_server_" + str(uuid.uuid4()) + self.test_local_bucketing = MagicMock() + self.test_options = DevCycleLocalOptions( + config_polling_interval_ms=500, disable_realtime_updates=False + ) + self.test_config_json = small_config_json() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_first_sse_error_triggers_immediate_reconnection( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """First error should trigger reconnection with min backoff (5s)""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate first SSE error + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + config_manager.sse_error(error) + + # Verify reconnection attempt counter incremented + self.assertEqual(config_manager._sse_reconnect_attempts, 1) + # Verify reconnecting flag set + self.assertTrue(config_manager._sse_reconnecting) + # Verify thread spawned with min backoff (5.0s) + mock_thread.assert_called_once() + call_args = mock_thread.call_args + self.assertEqual(call_args[1]["args"][0], 5.0) # backoff_interval + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_exponential_backoff_calculation( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Verify exponential backoff: 5s, 10s, 20s, 40s, etc.""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error: 5s backoff (2^0 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 1) + self.assertEqual(mock_thread.call_args[1]["args"][0], 5.0) + + # Simulate reconnect completing + config_manager._sse_reconnecting = False + mock_time.return_value += 10.0 # Advance time beyond backoff + + # Second error: 10s backoff (2^1 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 2) + self.assertEqual(mock_thread.call_args[1]["args"][0], 10.0) + + # Simulate reconnect completing + config_manager._sse_reconnecting = False + mock_time.return_value += 15.0 + + # Third error: 20s backoff (2^2 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 3) + self.assertEqual(mock_thread.call_args[1]["args"][0], 20.0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_backoff_caps_at_max_interval( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Verify backoff caps at max interval (300s)""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # Simulate many failures to reach max backoff + # 2^6 * 5 = 320s > 300s (max), so should cap at 300s + config_manager._sse_reconnect_attempts = 6 + config_manager.sse_error(error) + + # Should be capped at 300s + self.assertEqual(mock_thread.call_args[1]["args"][0], 300.0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + def test_concurrent_errors_only_spawn_one_reconnection( + self, mock_thread, mock_sse_manager, mock_get_config + ): + """Multiple rapid errors should only spawn one reconnection thread""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error spawns reconnection + config_manager.sse_error(error) + self.assertEqual(mock_thread.call_count, 1) + self.assertTrue(config_manager._sse_reconnecting) + + # Second error while reconnecting should be skipped + config_manager.sse_error(error) + # Still only 1 thread spawned + self.assertEqual(mock_thread.call_count, 1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_error_within_backoff_uses_remaining_time( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Error within backoff period should schedule reconnect with remaining time""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error at t=1000, backoff=5s + config_manager.sse_error(error) + self.assertEqual(mock_thread.call_args[1]["args"][0], 5.0) + + # Simulate reconnect completing and updating timestamp + config_manager._sse_reconnecting = False + config_manager._last_reconnect_attempt_time = 1005.0 # Simulates reconnect at t=1005 + + # Second error at t=1008 (3s after reconnect, within 10s backoff) + mock_time.return_value = 1008.0 + config_manager.sse_error(error) + + # Should use remaining time: 10s backoff - 3s elapsed = 7s + self.assertAlmostEqual(mock_thread.call_args[1]["args"][0], 7.0, places=1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_successful_connection_resets_attempts( + self, mock_sse_manager, mock_get_config + ): + """Successful SSE connection should reset reconnection attempts""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate multiple failures + config_manager._sse_reconnect_attempts = 5 + config_manager._sse_reconnecting = True + config_manager._last_reconnect_attempt_time = 1000.0 + + # Simulate successful ping message (which resets attempts) + message = ld_eventsource.actions.Event( + event="message", + data='{"data": "{\\"type\\": \\"ping\\", \\"lastModified\\": 1234567890000}"}', + ) + config_manager.sse_message(message) + + # Attempts should be reset + self.assertEqual(config_manager._sse_reconnect_attempts, 0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_successful_state_resets_reconnection_flags( + self, mock_sse_manager, mock_get_config + ): + """Successful SSE state should clear reconnection flags""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate reconnection in progress + config_manager._sse_reconnecting = True + config_manager._last_reconnect_attempt_time = 1000.0 + config_manager._sse_connected = False + + # Simulate successful connection + state = ld_eventsource.actions.Start() + config_manager.sse_state(state) + + # Should clear reconnection state + self.assertFalse(config_manager._sse_reconnecting) + self.assertIsNone(config_manager._last_reconnect_attempt_time) + self.assertTrue(config_manager._sse_connected) + + config_manager.close() + + + if __name__ == "__main__": unittest.main() From 4baf61eef9c4e5710179105327b5dab13329df36 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 18:04:16 -0400 Subject: [PATCH 7/9] lint --- test/managers/test_config_manager.py | 7 ++++--- 1 file changed, 4 insertions(+), 3 deletions(-) diff --git a/test/managers/test_config_manager.py b/test/managers/test_config_manager.py index 6fd7fa7..1e2fbaa 100644 --- a/test/managers/test_config_manager.py +++ b/test/managers/test_config_manager.py @@ -6,7 +6,7 @@ from datetime import datetime from email.utils import formatdate from time import mktime -from unittest.mock import patch, MagicMock, Mock +from unittest.mock import patch, MagicMock import ld_eventsource.actions @@ -319,7 +319,9 @@ def test_error_within_backoff_uses_remaining_time( # Simulate reconnect completing and updating timestamp config_manager._sse_reconnecting = False - config_manager._last_reconnect_attempt_time = 1005.0 # Simulates reconnect at t=1005 + config_manager._last_reconnect_attempt_time = ( + 1005.0 # Simulates reconnect at t=1005 + ) # Second error at t=1008 (3s after reconnect, within 10s backoff) mock_time.return_value = 1008.0 @@ -390,6 +392,5 @@ def test_successful_state_resets_reconnection_flags( config_manager.close() - if __name__ == "__main__": unittest.main() From c3f4c273030118d8665d2283751bf9a0b3ce45e9 Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 18:15:08 -0400 Subject: [PATCH 8/9] typos --- devcycle_python_sdk/managers/config_manager.py | 4 ++-- 1 file changed, 2 insertions(+), 2 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index 1b41c2e..f09b639 100755 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -196,11 +196,11 @@ def sse_message(self, message: ld_eventsource.actions.Event): def sse_error(self, error: ld_eventsource.actions.Fault): self._sse_connected = False - logger.debug(f"DevCyle: SSE connection error: {error.error}") + logger.debug(f"DevCycle: SSE connection error: {error.error}") current_time = time.time() if self._sse_reconnecting: - logger.debug("DevCyle: Reconnection already in progress, skipping") + logger.debug("DevCylce: Reconnection already in progress, skipping") return # Calculate exponential backoff interval (capped at max) From 371cf7d64df93560eacbcaf359e478d4804c4e6d Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 18:17:59 -0400 Subject: [PATCH 9/9] Update config_manager.py --- devcycle_python_sdk/managers/config_manager.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index f09b639..cfa16f7 100755 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -200,7 +200,7 @@ def sse_error(self, error: ld_eventsource.actions.Fault): current_time = time.time() if self._sse_reconnecting: - logger.debug("DevCylce: Reconnection already in progress, skipping") + logger.debug("DevCycle: Reconnection already in progress, skipping") return # Calculate exponential backoff interval (capped at max)