diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py old mode 100644 new mode 100755 index b8b90b0..cfa16f7 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -40,6 +40,12 @@ def __init__( self._config_etag: Optional[str] = None self._config_lastmodified: Optional[str] = None + # Exponential backoff configuration + self._sse_reconnect_attempts = 0 + self._min_reconnect_interval = 5.0 # Start at 5 seconds + self._max_reconnect_interval = 300.0 # Cap at 5 minutes + self._last_reconnect_attempt_time: Optional[float] = None + self._sse_reconnecting = False self._config_api_client = ConfigAPIClient(self._sdk_key, self._options) self._polling_enabled = True @@ -49,6 +55,48 @@ def __init__( def is_initialized(self) -> bool: return self._config is not None + def _recreate_sse_connection(self): + """Recreate the SSE connection with the current config.""" + if self._config is None or self._options.disable_realtime_updates: + logger.debug( + "DevCycle: Skipping SSE recreation - no config or updates disabled" + ) + return + + # Update timestamp right before attempting connection + self._last_reconnect_attempt_time = time.time() + + try: + # Close existing connection if present + if self._sse_manager is not None and self._sse_manager.client is not None: + self._sse_manager.client.close() + if self._sse_manager.read_thread.is_alive(): + self._sse_manager.read_thread.join(timeout=1.0) + + # Create new SSE manager + self._sse_manager = SSEManager( + self.sse_state, + self.sse_error, + self.sse_message, + ) + self._sse_manager.update(self._config) + + except Exception as e: + logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") + + def _delayed_sse_reconnect(self, delay_seconds: float): + """Delayed SSE reconnection with configurable backoff.""" + try: + logger.debug( + f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..." + ) + time.sleep(delay_seconds) + self._recreate_sse_connection() + except Exception as e: + logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}") + finally: + self._sse_reconnecting = False + def _get_config(self, last_modified: Optional[float] = None): try: lm_header = self._config_lastmodified @@ -87,12 +135,10 @@ def _get_config(self, last_modified: Optional[float] = None): or self._sse_manager.client is None or not self._sse_manager.read_thread.is_alive() ): - self._sse_manager = SSEManager( - self.sse_state, - self.sse_error, - self.sse_message, + logger.info( + "DevCycle: SSE connection not active, creating new connection" ) - self._sse_manager.update(self._config) + self._recreate_sse_connection() if ( trigger_on_client_initialized @@ -101,7 +147,6 @@ def _get_config(self, last_modified: Optional[float] = None): try: self._options.on_client_initialized() except Exception as e: - # consume any error logger.warning( f"DevCycle: Error received from on_client_initialized callback: {str(e)}" ) @@ -122,7 +167,6 @@ def run(self): self._get_config() except Exception as e: if self._polling_enabled: - # Only log a warning if we're still polling logger.warning( f"DevCycle: Error polling for config changes: {str(e)}" ) @@ -137,6 +181,7 @@ def sse_message(self, message: ld_eventsource.actions.Event): self.sse_state(None) logger.info(f"DevCycle: Received message: {message.data}") sse_message = json.loads(message.data) + dvc_data = json.loads(sse_message.get("data")) if ( dvc_data.get("type") == "refetchConfig" @@ -145,15 +190,60 @@ def sse_message(self, message: ld_eventsource.actions.Event): ): logger.info("DevCycle: Received refetchConfig message - updating config") self._get_config(dvc_data["lastModified"] / 1000.0) + # SSE connection healthy, reconnect attempts reset. + if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig": + self._sse_reconnect_attempts = 0 def sse_error(self, error: ld_eventsource.actions.Fault): self._sse_connected = False - logger.debug(f"DevCycle: Received SSE error: {error}") + logger.debug(f"DevCycle: SSE connection error: {error.error}") + current_time = time.time() + + if self._sse_reconnecting: + logger.debug("DevCycle: Reconnection already in progress, skipping") + return + + # Calculate exponential backoff interval (capped at max) + backoff_interval = min( + self._min_reconnect_interval * (2**self._sse_reconnect_attempts), + self._max_reconnect_interval, + ) + + # Check if we need to wait for remaining backoff time + delay_seconds = backoff_interval + if self._last_reconnect_attempt_time is not None: + time_since_last_attempt = current_time - self._last_reconnect_attempt_time + if time_since_last_attempt < backoff_interval: + delay_seconds = backoff_interval - time_since_last_attempt + logger.debug( + f"DevCycle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s" + ) + + self._sse_reconnecting = True + self._sse_reconnect_attempts += 1 + + logger.debug( + f"DevCycle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " + f"backoff: {delay_seconds:.1f}s)" + ) + + reconnect_thread = threading.Thread( + target=self._delayed_sse_reconnect, args=(delay_seconds,), daemon=True + ) + reconnect_thread.start() def sse_state(self, state: Optional[ld_eventsource.actions.Start]): if not self._sse_connected: self._sse_connected = True logger.info("DevCycle: Connected to SSE stream") + # Clear reconnection state + self._sse_reconnecting = False + self._last_reconnect_attempt_time = None + else: + logger.debug("DevCycle: SSE keepalive received") + def close(self): self._polling_enabled = False + if self._sse_manager is not None and self._sse_manager.client is not None: + self._sse_manager.client.close() diff --git a/devcycle_python_sdk/managers/sse_manager.py b/devcycle_python_sdk/managers/sse_manager.py index 13b97da..180e575 100644 --- a/devcycle_python_sdk/managers/sse_manager.py +++ b/devcycle_python_sdk/managers/sse_manager.py @@ -33,8 +33,9 @@ def read_events( handle_error: Callable[[ld_eventsource.actions.Fault], None], handle_message: Callable[[ld_eventsource.actions.Event], None], ): - self.client.start() try: + self.client.start() + logger.info("DevCycle: SSE connection created successfully") for event in self.client.all: if isinstance(event, ld_eventsource.actions.Start): handle_state(event) @@ -45,7 +46,11 @@ def read_events( elif isinstance(event, ld_eventsource.actions.Comment): handle_state(None) except Exception as e: - logger.debug(f"DevCycle: failed to read SSE message: {e}") + logger.debug(f"DevCycle SSE: Error in read loop: {e}") + fault_event = ld_eventsource.actions.Fault(error=e) + handle_error(fault_event) + finally: + logger.debug("DevCycle SSE: Connection closed") def update(self, config: dict): if self.use_new_config(config["sse"]): @@ -66,6 +71,6 @@ def update(self, config: dict): def use_new_config(self, config: dict) -> bool: new_url = config["hostname"] + config["path"] - if self.url == "" or self.url is None and new_url != "": + if (self.url == "" or self.url is None) and new_url != "": return True return self.url != new_url diff --git a/example/cloud_client_example.py b/example/cloud_client_example.py index 0a72f70..1310f82 100644 --- a/example/cloud_client_example.py +++ b/example/cloud_client_example.py @@ -61,7 +61,7 @@ def main(): client.track(user, event) except Exception as e: - logger.exception(f"Exception when calling Devcycle API: {e}\n") + logger.exception(f"Exception when calling DevCycle API: {e}\n") if __name__ == "__main__": diff --git a/test/managers/test_config_manager.py b/test/managers/test_config_manager.py index e8a2a47..1e2fbaa 100644 --- a/test/managers/test_config_manager.py +++ b/test/managers/test_config_manager.py @@ -8,6 +8,8 @@ from time import mktime from unittest.mock import patch, MagicMock +import ld_eventsource.actions + from devcycle_python_sdk import DevCycleLocalOptions from devcycle_python_sdk.managers.config_manager import EnvironmentConfigManager from test.fixture.data import small_config_json @@ -152,5 +154,243 @@ def test_get_config_unchanged(self, mock_get_config): self.test_local_bucketing.store_config.assert_not_called() +class SSEReconnectionBackoffTest(unittest.TestCase): + """Tests for SSE exponential backoff reconnection behavior""" + + def setUp(self) -> None: + self.sdk_key = "dvc_server_" + str(uuid.uuid4()) + self.test_local_bucketing = MagicMock() + self.test_options = DevCycleLocalOptions( + config_polling_interval_ms=500, disable_realtime_updates=False + ) + self.test_config_json = small_config_json() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_first_sse_error_triggers_immediate_reconnection( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """First error should trigger reconnection with min backoff (5s)""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate first SSE error + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + config_manager.sse_error(error) + + # Verify reconnection attempt counter incremented + self.assertEqual(config_manager._sse_reconnect_attempts, 1) + # Verify reconnecting flag set + self.assertTrue(config_manager._sse_reconnecting) + # Verify thread spawned with min backoff (5.0s) + mock_thread.assert_called_once() + call_args = mock_thread.call_args + self.assertEqual(call_args[1]["args"][0], 5.0) # backoff_interval + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_exponential_backoff_calculation( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Verify exponential backoff: 5s, 10s, 20s, 40s, etc.""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error: 5s backoff (2^0 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 1) + self.assertEqual(mock_thread.call_args[1]["args"][0], 5.0) + + # Simulate reconnect completing + config_manager._sse_reconnecting = False + mock_time.return_value += 10.0 # Advance time beyond backoff + + # Second error: 10s backoff (2^1 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 2) + self.assertEqual(mock_thread.call_args[1]["args"][0], 10.0) + + # Simulate reconnect completing + config_manager._sse_reconnecting = False + mock_time.return_value += 15.0 + + # Third error: 20s backoff (2^2 * 5) + config_manager.sse_error(error) + self.assertEqual(config_manager._sse_reconnect_attempts, 3) + self.assertEqual(mock_thread.call_args[1]["args"][0], 20.0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_backoff_caps_at_max_interval( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Verify backoff caps at max interval (300s)""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # Simulate many failures to reach max backoff + # 2^6 * 5 = 320s > 300s (max), so should cap at 300s + config_manager._sse_reconnect_attempts = 6 + config_manager.sse_error(error) + + # Should be capped at 300s + self.assertEqual(mock_thread.call_args[1]["args"][0], 300.0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + def test_concurrent_errors_only_spawn_one_reconnection( + self, mock_thread, mock_sse_manager, mock_get_config + ): + """Multiple rapid errors should only spawn one reconnection thread""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error spawns reconnection + config_manager.sse_error(error) + self.assertEqual(mock_thread.call_count, 1) + self.assertTrue(config_manager._sse_reconnecting) + + # Second error while reconnecting should be skipped + config_manager.sse_error(error) + # Still only 1 thread spawned + self.assertEqual(mock_thread.call_count, 1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + @patch("threading.Thread") + @patch("time.time") + def test_error_within_backoff_uses_remaining_time( + self, mock_time, mock_thread, mock_sse_manager, mock_get_config + ): + """Error within backoff period should schedule reconnect with remaining time""" + mock_time.return_value = 1000.0 + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + error = ld_eventsource.actions.Fault(error=Exception("Connection failed")) + + # First error at t=1000, backoff=5s + config_manager.sse_error(error) + self.assertEqual(mock_thread.call_args[1]["args"][0], 5.0) + + # Simulate reconnect completing and updating timestamp + config_manager._sse_reconnecting = False + config_manager._last_reconnect_attempt_time = ( + 1005.0 # Simulates reconnect at t=1005 + ) + + # Second error at t=1008 (3s after reconnect, within 10s backoff) + mock_time.return_value = 1008.0 + config_manager.sse_error(error) + + # Should use remaining time: 10s backoff - 3s elapsed = 7s + self.assertAlmostEqual(mock_thread.call_args[1]["args"][0], 7.0, places=1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_successful_connection_resets_attempts( + self, mock_sse_manager, mock_get_config + ): + """Successful SSE connection should reset reconnection attempts""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate multiple failures + config_manager._sse_reconnect_attempts = 5 + config_manager._sse_reconnecting = True + config_manager._last_reconnect_attempt_time = 1000.0 + + # Simulate successful ping message (which resets attempts) + message = ld_eventsource.actions.Event( + event="message", + data='{"data": "{\\"type\\": \\"ping\\", \\"lastModified\\": 1234567890000}"}', + ) + config_manager.sse_message(message) + + # Attempts should be reset + self.assertEqual(config_manager._sse_reconnect_attempts, 0) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_successful_state_resets_reconnection_flags( + self, mock_sse_manager, mock_get_config + ): + """Successful SSE state should clear reconnection flags""" + mock_get_config.return_value = (self.test_config_json, "etag", None) + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Simulate reconnection in progress + config_manager._sse_reconnecting = True + config_manager._last_reconnect_attempt_time = 1000.0 + config_manager._sse_connected = False + + # Simulate successful connection + state = ld_eventsource.actions.Start() + config_manager.sse_state(state) + + # Should clear reconnection state + self.assertFalse(config_manager._sse_reconnecting) + self.assertIsNone(config_manager._last_reconnect_attempt_time) + self.assertTrue(config_manager._sse_connected) + + config_manager.close() + + if __name__ == "__main__": unittest.main()