-
Notifications
You must be signed in to change notification settings - Fork 6
fix: sse error handling #107
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
32a9ead
71687c6
801e945
8c27a71
fc0f8d7
7b0e1ac
4baf61e
c3f4c27
371cf7d
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -40,6 +40,12 @@ def __init__( | |
| self._config_etag: Optional[str] = None | ||
| self._config_lastmodified: Optional[str] = None | ||
|
|
||
| # Exponential backoff configuration | ||
| self._sse_reconnect_attempts = 0 | ||
| self._min_reconnect_interval = 5.0 # Start at 5 seconds | ||
| self._max_reconnect_interval = 300.0 # Cap at 5 minutes | ||
| self._last_reconnect_attempt_time: Optional[float] = None | ||
| self._sse_reconnecting = False | ||
| self._config_api_client = ConfigAPIClient(self._sdk_key, self._options) | ||
|
|
||
| self._polling_enabled = True | ||
|
|
@@ -49,6 +55,48 @@ def __init__( | |
| def is_initialized(self) -> bool: | ||
| return self._config is not None | ||
|
|
||
| def _recreate_sse_connection(self): | ||
| """Recreate the SSE connection with the current config.""" | ||
| if self._config is None or self._options.disable_realtime_updates: | ||
| logger.debug( | ||
| "DevCycle: Skipping SSE recreation - no config or updates disabled" | ||
| ) | ||
| return | ||
|
|
||
| # Update timestamp right before attempting connection | ||
| self._last_reconnect_attempt_time = time.time() | ||
|
|
||
| try: | ||
| # Close existing connection if present | ||
| if self._sse_manager is not None and self._sse_manager.client is not None: | ||
| self._sse_manager.client.close() | ||
| if self._sse_manager.read_thread.is_alive(): | ||
| self._sse_manager.read_thread.join(timeout=1.0) | ||
|
|
||
| # Create new SSE manager | ||
| self._sse_manager = SSEManager( | ||
| self.sse_state, | ||
| self.sse_error, | ||
| self.sse_message, | ||
| ) | ||
| self._sse_manager.update(self._config) | ||
|
|
||
| except Exception as e: | ||
| logger.debug(f"DevCycle: Failed to recreate SSE connection: {e}") | ||
|
|
||
| def _delayed_sse_reconnect(self, delay_seconds: float): | ||
| """Delayed SSE reconnection with configurable backoff.""" | ||
| try: | ||
| logger.debug( | ||
| f"DevCycle: Waiting {delay_seconds}s before reconnecting SSE..." | ||
| ) | ||
| time.sleep(delay_seconds) | ||
| self._recreate_sse_connection() | ||
| except Exception as e: | ||
| logger.error(f"DevCycle: Error during delayed SSE reconnection: {e}") | ||
| finally: | ||
| self._sse_reconnecting = False | ||
|
|
||
| def _get_config(self, last_modified: Optional[float] = None): | ||
| try: | ||
| lm_header = self._config_lastmodified | ||
|
|
@@ -87,12 +135,10 @@ def _get_config(self, last_modified: Optional[float] = None): | |
| or self._sse_manager.client is None | ||
| or not self._sse_manager.read_thread.is_alive() | ||
| ): | ||
| self._sse_manager = SSEManager( | ||
| self.sse_state, | ||
| self.sse_error, | ||
| self.sse_message, | ||
| logger.info( | ||
| "DevCycle: SSE connection not active, creating new connection" | ||
| ) | ||
| self._sse_manager.update(self._config) | ||
| self._recreate_sse_connection() | ||
luxscious marked this conversation as resolved.
Show resolved
Hide resolved
|
||
|
|
||
| if ( | ||
| trigger_on_client_initialized | ||
|
|
@@ -101,7 +147,6 @@ def _get_config(self, last_modified: Optional[float] = None): | |
| try: | ||
| self._options.on_client_initialized() | ||
| except Exception as e: | ||
| # consume any error | ||
| logger.warning( | ||
| f"DevCycle: Error received from on_client_initialized callback: {str(e)}" | ||
| ) | ||
|
|
@@ -122,7 +167,6 @@ def run(self): | |
| self._get_config() | ||
| except Exception as e: | ||
| if self._polling_enabled: | ||
| # Only log a warning if we're still polling | ||
| logger.warning( | ||
| f"DevCycle: Error polling for config changes: {str(e)}" | ||
| ) | ||
|
|
@@ -137,6 +181,7 @@ def sse_message(self, message: ld_eventsource.actions.Event): | |
| self.sse_state(None) | ||
| logger.info(f"DevCycle: Received message: {message.data}") | ||
| sse_message = json.loads(message.data) | ||
|
|
||
| dvc_data = json.loads(sse_message.get("data")) | ||
| if ( | ||
| dvc_data.get("type") == "refetchConfig" | ||
|
|
@@ -145,15 +190,60 @@ def sse_message(self, message: ld_eventsource.actions.Event): | |
| ): | ||
| logger.info("DevCycle: Received refetchConfig message - updating config") | ||
| self._get_config(dvc_data["lastModified"] / 1000.0) | ||
| # SSE connection healthy, reconnect attempts reset. | ||
| if dvc_data.get("type") == "ping" or dvc_data.get("type") == "refetchConfig": | ||
| self._sse_reconnect_attempts = 0 | ||
|
|
||
| def sse_error(self, error: ld_eventsource.actions.Fault): | ||
| self._sse_connected = False | ||
| logger.debug(f"DevCycle: Received SSE error: {error}") | ||
| logger.debug(f"DevCycle: SSE connection error: {error.error}") | ||
| current_time = time.time() | ||
|
|
||
| if self._sse_reconnecting: | ||
| logger.debug("DevCycle: Reconnection already in progress, skipping") | ||
| return | ||
|
|
||
| # Calculate exponential backoff interval (capped at max) | ||
| backoff_interval = min( | ||
| self._min_reconnect_interval * (2**self._sse_reconnect_attempts), | ||
| self._max_reconnect_interval, | ||
| ) | ||
|
|
||
| # Check if we need to wait for remaining backoff time | ||
| delay_seconds = backoff_interval | ||
| if self._last_reconnect_attempt_time is not None: | ||
| time_since_last_attempt = current_time - self._last_reconnect_attempt_time | ||
| if time_since_last_attempt < backoff_interval: | ||
| delay_seconds = backoff_interval - time_since_last_attempt | ||
| logger.debug( | ||
| f"DevCycle: Within backoff period, scheduling reconnection in {delay_seconds:.1f}s" | ||
| ) | ||
|
|
||
| self._sse_reconnecting = True | ||
| self._sse_reconnect_attempts += 1 | ||
|
|
||
| logger.debug( | ||
| f"DevCycle: Attempting SSE reconnection (attempt #{self._sse_reconnect_attempts}, " | ||
| f"backoff: {delay_seconds:.1f}s)" | ||
| ) | ||
|
|
||
| reconnect_thread = threading.Thread( | ||
| target=self._delayed_sse_reconnect, args=(delay_seconds,), daemon=True | ||
| ) | ||
| reconnect_thread.start() | ||
|
|
||
| def sse_state(self, state: Optional[ld_eventsource.actions.Start]): | ||
| if not self._sse_connected: | ||
| self._sse_connected = True | ||
| logger.info("DevCycle: Connected to SSE stream") | ||
|
|
||
| # Clear reconnection state | ||
| self._sse_reconnecting = False | ||
| self._last_reconnect_attempt_time = None | ||
| else: | ||
| logger.debug("DevCycle: SSE keepalive received") | ||
|
|
||
| def close(self): | ||
| self._polling_enabled = False | ||
| if self._sse_manager is not None and self._sse_manager.client is not None: | ||
| self._sse_manager.client.close() | ||
luxscious marked this conversation as resolved.
Show resolved
Hide resolved
luxscious marked this conversation as resolved.
Show resolved
Hide resolved
Comment on lines
158
to
+249
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -33,8 +33,9 @@ def read_events( | |
| handle_error: Callable[[ld_eventsource.actions.Fault], None], | ||
| handle_message: Callable[[ld_eventsource.actions.Event], None], | ||
| ): | ||
| self.client.start() | ||
| try: | ||
| self.client.start() | ||
| logger.info("DevCycle: SSE connection created successfully") | ||
| for event in self.client.all: | ||
| if isinstance(event, ld_eventsource.actions.Start): | ||
| handle_state(event) | ||
|
|
@@ -45,7 +46,11 @@ def read_events( | |
| elif isinstance(event, ld_eventsource.actions.Comment): | ||
| handle_state(None) | ||
| except Exception as e: | ||
| logger.debug(f"DevCycle: failed to read SSE message: {e}") | ||
| logger.debug(f"DevCycle SSE: Error in read loop: {e}") | ||
| fault_event = ld_eventsource.actions.Fault(error=e) | ||
| handle_error(fault_event) | ||
| finally: | ||
|
Comment on lines
48
to
+52
|
||
| logger.debug("DevCycle SSE: Connection closed") | ||
|
|
||
| def update(self, config: dict): | ||
| if self.use_new_config(config["sse"]): | ||
|
|
@@ -66,6 +71,6 @@ def update(self, config: dict): | |
|
|
||
| def use_new_config(self, config: dict) -> bool: | ||
| new_url = config["hostname"] + config["path"] | ||
| if self.url == "" or self.url is None and new_url != "": | ||
| if (self.url == "" or self.url is None) and new_url != "": | ||
| return True | ||
| return self.url != new_url | ||
Uh oh!
There was an error while loading. Please reload this page.