From ad86e2c50cf9410e9c8ed68abc448705cc9c9be3 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:43:13 +0000 Subject: [PATCH 1/6] Initial plan From a3124e3090092eb3d0b2b573f1af6e55b57ba112 Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:46:48 +0000 Subject: [PATCH 2/6] Fix lock inversion in SSE reconnection by moving blocking operations outside lock Co-authored-by: JamieSinn <1538232+JamieSinn@users.noreply.github.com> --- .../managers/config_manager.py | 32 ++++++++++++------- 1 file changed, 20 insertions(+), 12 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index d29d39a..7fb1aa2 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -58,6 +58,7 @@ def is_initialized(self) -> bool: def _recreate_sse_connection(self): """Recreate the SSE connection with the current config.""" + # Acquire lock to check state and save references to old connection with self._sse_reconnect_lock: if self._config is None or self._options.disable_realtime_updates: logger.debug( @@ -65,26 +66,33 @@ def _recreate_sse_connection(self): ) return - try: - # Close existing connection if present - if ( - self._sse_manager is not None - and self._sse_manager.client is not None - ): - self._sse_manager.client.close() - if self._sse_manager.read_thread.is_alive(): - self._sse_manager.read_thread.join(timeout=1.0) + # Save references to old SSE manager and config while holding the lock + old_sse_manager = self._sse_manager + current_config = self._config + + # Perform potentially blocking operations outside the lock to avoid deadlock + # The SSE read thread may call sse_error/sse_state which need the lock + try: + if old_sse_manager is not None and old_sse_manager.client is not None: + old_sse_manager.client.close() + if old_sse_manager.read_thread.is_alive(): + old_sse_manager.read_thread.join(timeout=1.0) + except Exception as e: + logger.debug(f"Devcycle: Error closing old SSE connection: {e}") + # Re-acquire lock to create new connection and update state + try: + with self._sse_reconnect_lock: # Create new SSE manager self._sse_manager = SSEManager( self.sse_state, self.sse_error, self.sse_message, ) - self._sse_manager.update(self._config) + self._sse_manager.update(current_config) logger.info("Devcyle: SSE connection created successfully") - except Exception as e: - logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") + except Exception as e: + logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" From 09f56d1282724076f4f6e6f32e6b2a698656f86d Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:51:04 +0000 Subject: [PATCH 3/6] Address code review feedback: add race condition protection and fix typo Co-authored-by: JamieSinn <1538232+JamieSinn@users.noreply.github.com> --- .../managers/config_manager.py | 24 +++++++++++++++---- 1 file changed, 19 insertions(+), 5 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index 7fb1aa2..1f8bc9f 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -67,6 +67,9 @@ def _recreate_sse_connection(self): return # Save references to old SSE manager and config while holding the lock + # Note: current_config may become stale if another thread updates _config + # between releasing and reacquiring the lock, but this is acceptable as + # the SSE stream will receive updates to sync to the latest config old_sse_manager = self._sse_manager current_config = self._config @@ -90,7 +93,7 @@ def _recreate_sse_connection(self): self.sse_message, ) self._sse_manager.update(current_config) - logger.info("Devcyle: SSE connection created successfully") + logger.info("DevCycle: SSE connection created successfully") except Exception as e: logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") @@ -146,10 +149,21 @@ def _get_config(self, last_modified: Optional[float] = None): or self._sse_manager.client is None or not self._sse_manager.read_thread.is_alive() ): - logger.info( - "DevCycle: SSE connection not active, creating new connection" - ) - self._recreate_sse_connection() + # Only recreate if not already reconnecting from error handler + with self._sse_reconnect_lock: + if not self._sse_reconnecting: + logger.info( + "DevCycle: SSE connection not active, creating new connection" + ) + should_recreate = True + else: + logger.debug( + "DevCycle: SSE reconnection already scheduled, skipping" + ) + should_recreate = False + + if should_recreate: + self._recreate_sse_connection() if ( trigger_on_client_initialized From eb7844f2074ff479c969cc51012532dbe0c6670b Mon Sep 17 00:00:00 2001 From: Gabriella Gerges Date: Tue, 27 Jan 2026 16:51:56 -0400 Subject: [PATCH 4/6] correct case for DevCycle --- devcycle_python_sdk/managers/config_manager.py | 10 +++++----- example/cloud_client_example.py | 2 +- 2 files changed, 6 insertions(+), 6 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index 7fb1aa2..76bfd73 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -62,7 +62,7 @@ def _recreate_sse_connection(self): with self._sse_reconnect_lock: if self._config is None or self._options.disable_realtime_updates: logger.debug( - "Devcycle: Skipping SSE recreation - no config or updates disabled" + "DevCycle: Skipping SSE recreation - no config or updates disabled" ) return @@ -78,7 +78,7 @@ def _recreate_sse_connection(self): if old_sse_manager.read_thread.is_alive(): old_sse_manager.read_thread.join(timeout=1.0) except Exception as e: - logger.debug(f"Devcycle: Error closing old SSE connection: {e}") + logger.debug(f"DevCycle: Error closing old SSE connection: {e}") # Re-acquire lock to create new connection and update state try: @@ -92,18 +92,18 @@ def _recreate_sse_connection(self): self._sse_manager.update(current_config) logger.info("Devcyle: SSE connection created successfully") except Exception as e: - logger.debug(f"Devcycle: Failed to recreate SSE connection: {e}") + logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") def _delayed_sse_reconnect(self, delay_seconds: float): """Delayed SSE reconnection with configurable backoff.""" try: logger.debug( - f"Devcycle: Waiting {delay_seconds}s before reconnecting SSE..." + f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..." ) time.sleep(delay_seconds) self._recreate_sse_connection() except Exception as e: - logger.error(f"Devcycle: Error during delayed SSE reconnection: {e}") + logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}") finally: with self._sse_reconnect_lock: self._sse_reconnecting = False diff --git a/example/cloud_client_example.py b/example/cloud_client_example.py index 0a72f70..1310f82 100644 --- a/example/cloud_client_example.py +++ b/example/cloud_client_example.py @@ -61,7 +61,7 @@ def main(): client.track(user, event) except Exception as e: - logger.exception(f"Exception when calling Devcycle API: {e}\n") + logger.exception(f"Exception when calling DevCycle API: {e}\n") if __name__ == "__main__": From 741b526f7ed78c1fbafa0996d25e13f3371bb7fa Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:56:04 +0000 Subject: [PATCH 5/6] Fix race conditions: set _sse_manager=None and use latest config Co-authored-by: JamieSinn <1538232+JamieSinn@users.noreply.github.com> --- devcycle_python_sdk/managers/config_manager.py | 18 +++++++++++------- 1 file changed, 11 insertions(+), 7 deletions(-) diff --git a/devcycle_python_sdk/managers/config_manager.py b/devcycle_python_sdk/managers/config_manager.py index ced4a10..f428772 100644 --- a/devcycle_python_sdk/managers/config_manager.py +++ b/devcycle_python_sdk/managers/config_manager.py @@ -66,12 +66,9 @@ def _recreate_sse_connection(self): ) return - # Save references to old SSE manager and config while holding the lock - # Note: current_config may become stale if another thread updates _config - # between releasing and reacquiring the lock, but this is acceptable as - # the SSE stream will receive updates to sync to the latest config + # Save reference to old SSE manager and clear it to prevent concurrent access old_sse_manager = self._sse_manager - current_config = self._config + self._sse_manager = None # Perform potentially blocking operations outside the lock to avoid deadlock # The SSE read thread may call sse_error/sse_state which need the lock @@ -86,13 +83,20 @@ def _recreate_sse_connection(self): # Re-acquire lock to create new connection and update state try: with self._sse_reconnect_lock: - # Create new SSE manager + # Re-read config to ensure we use the latest version + if self._config is None: + logger.debug( + "DevCycle: Config was cleared during SSE reconnection, skipping" + ) + return + + # Create new SSE manager with the current config self._sse_manager = SSEManager( self.sse_state, self.sse_error, self.sse_message, ) - self._sse_manager.update(current_config) + self._sse_manager.update(self._config) logger.info("DevCycle: SSE connection created successfully") except Exception as e: logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") From 89a846778fa0f98ae8f9fabb9717bd048d8fa47c Mon Sep 17 00:00:00 2001 From: "copilot-swe-agent[bot]" <198982749+Copilot@users.noreply.github.com> Date: Tue, 27 Jan 2026 20:58:39 +0000 Subject: [PATCH 6/6] Add comprehensive tests for concurrent SSE reconnection scenarios Co-authored-by: JamieSinn <1538232+JamieSinn@users.noreply.github.com> --- test/managers/test_config_manager.py | 233 ++++++++++++++++++++++++++- 1 file changed, 232 insertions(+), 1 deletion(-) diff --git a/test/managers/test_config_manager.py b/test/managers/test_config_manager.py index e8a2a47..b3a3c90 100644 --- a/test/managers/test_config_manager.py +++ b/test/managers/test_config_manager.py @@ -3,10 +3,11 @@ import time import unittest import uuid +import threading from datetime import datetime from email.utils import formatdate from time import mktime -from unittest.mock import patch, MagicMock +from unittest.mock import patch, MagicMock, Mock, PropertyMock from devcycle_python_sdk import DevCycleLocalOptions from devcycle_python_sdk.managers.config_manager import EnvironmentConfigManager @@ -151,6 +152,236 @@ def test_get_config_unchanged(self, mock_get_config): self.assertDictEqual(config_manager._config, self.test_config_json) self.test_local_bucketing.store_config.assert_not_called() + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_clears_old_manager(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection sets _sse_manager to None before blocking operations.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates for this test + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager with a thread + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Track when join is called and what _sse_manager is at that point + manager_during_join = [] + + def track_join(timeout=None): + manager_during_join.append(config_manager._sse_manager) + + mock_thread.join = track_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify that _sse_manager was None during the join (blocking operation) + self.assertEqual(len(manager_during_join), 1) + self.assertIsNone(manager_during_join[0]) + + # Verify old manager was closed + mock_old_sse.client.close.assert_called_once() + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_uses_latest_config(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection uses the latest config when re-acquiring lock.""" + initial_config = self.test_config_json.copy() + mock_get_config.return_value = ( + initial_config, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Update config during the blocking operation + updated_config = initial_config.copy() + updated_config["updated"] = True + + def delayed_config_update(timeout=None): + # Simulate config update happening during join + config_manager._config = updated_config + time.sleep(0.05) + + mock_thread.join = delayed_config_update + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Create a new mock SSE manager for the recreation + mock_new_sse = MagicMock() + mock_sse_manager_class.return_value = mock_new_sse + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify the new SSE manager was created and updated with the latest config + mock_sse_manager_class.assert_called_once() + mock_new_sse.update.assert_called_once() + + # The config passed to update should be the updated one + call_args = mock_new_sse.update.call_args[0][0] + self.assertIn("updated", call_args) + self.assertTrue(call_args["updated"]) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_concurrent_calls(self, mock_sse_manager_class, mock_get_config): + """Test that concurrent calls to _recreate_sse_connection are handled safely.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager with slow close/join + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Make join slow to allow concurrent calls + def slow_join(timeout=None): + time.sleep(0.2) + + mock_thread.join = slow_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Mock new SSE managers + mock_new_sse_1 = MagicMock() + mock_new_sse_2 = MagicMock() + mock_sse_manager_class.side_effect = [mock_new_sse_1, mock_new_sse_2] + + # Track completion + results = [] + + def call_recreate(index): + try: + config_manager._recreate_sse_connection() + results.append(f"completed_{index}") + except Exception as e: + results.append(f"error_{index}: {e}") + + # Start two concurrent recreate calls + thread1 = threading.Thread(target=call_recreate, args=(1,)) + thread2 = threading.Thread(target=call_recreate, args=(2,)) + + thread1.start() + time.sleep(0.05) # Small delay to ensure thread1 starts first + thread2.start() + + thread1.join(timeout=2.0) + thread2.join(timeout=2.0) + + # Both should complete without errors + self.assertEqual(len(results), 2) + self.assertIn("completed_1", results) + self.assertIn("completed_2", results) + + # At least one SSE manager should be created + self.assertGreaterEqual(mock_sse_manager_class.call_count, 1) + + config_manager.close() + + @patch("devcycle_python_sdk.api.config_client.ConfigAPIClient.get_config") + @patch("devcycle_python_sdk.managers.config_manager.SSEManager") + def test_recreate_sse_connection_skips_if_config_cleared(self, mock_sse_manager_class, mock_get_config): + """Test that _recreate_sse_connection skips if config is cleared during reconnection.""" + mock_get_config.return_value = ( + self.test_config_json, + self.test_etag, + self.test_lastmodified, + ) + + # Enable realtime updates + self.test_options.disable_realtime_updates = False + + config_manager = EnvironmentConfigManager( + self.sdk_key, self.test_options, self.test_local_bucketing + ) + time.sleep(0.1) + + # Create a mock SSE manager + mock_old_sse = MagicMock() + mock_old_sse.client = MagicMock() + mock_thread = MagicMock() + mock_thread.is_alive.return_value = True + + # Clear config during join + def clear_config_during_join(timeout=None): + config_manager._config = None + time.sleep(0.05) + + mock_thread.join = clear_config_during_join + mock_old_sse.read_thread = mock_thread + + config_manager._sse_manager = mock_old_sse + + # Reset the mock after initialization + mock_sse_manager_class.reset_mock() + + # Call _recreate_sse_connection + config_manager._recreate_sse_connection() + + # Verify old manager was closed + mock_old_sse.client.close.assert_called_once() + + # Verify no new SSE manager was created (because config was None) + mock_sse_manager_class.assert_not_called() + + # _sse_manager should still be None + self.assertIsNone(config_manager._sse_manager) + + config_manager.close() + if __name__ == "__main__": unittest.main()