From 54d6b8467d289df581e07caa4f9b911c3b702639 Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 11:37:25 +0000 Subject: [PATCH 1/6] Adding support for websockets --- src/main.py | 8 +++++++- 1 file changed, 7 insertions(+), 1 deletion(-) diff --git a/src/main.py b/src/main.py index c7887b7..a29fa25 100644 --- a/src/main.py +++ b/src/main.py @@ -13,6 +13,7 @@ from web3 import AsyncWeb3, Web3 from web3.middleware import ExtraDataToPOAMiddleware, SignAndSendRawMiddlewareBuilder from web3.providers.rpc.async_rpc import AsyncHTTPProvider +from web3.providers.persistent import WebSocketProvider # Shutdown flag for graceful termination (thread-safe for signal handlers) shutdown_event = threading.Event() @@ -185,7 +186,10 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As attempt = 0 while attempt < max_attempts: try: - w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) + if rpc_url.startswith(('ws://', 'wss://')): + w3 = AsyncWeb3[WebSocketProvider](WebSocketProvider(rpc_url)) + else: + w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) if await w3.is_connected(): w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) return w3 @@ -446,6 +450,8 @@ async def main(): finally: # Cleanup logger.info("Cleaning up") + if w3 and hasattr(w3.provider, 'close'): + await w3.provider.close() await health_runner.cleanup() logger.info("Shutdown complete") From 314ae393d5022a766e0f75eda4cc67b91e69529c Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 11:39:15 +0000 Subject: [PATCH 2/6] resolve ci job rules --- .github/workflows/docker-publish.yaml | 2 +- .github/workflows/python-app.yaml | 1 - 2 files changed, 1 insertion(+), 2 deletions(-) diff --git a/.github/workflows/docker-publish.yaml b/.github/workflows/docker-publish.yaml index 3c8e0fc..cb42a7a 100644 --- a/.github/workflows/docker-publish.yaml +++ b/.github/workflows/docker-publish.yaml @@ -2,7 +2,7 @@ name: Docker on: push: - branches: [ main, github-actions ] + branches: [ '*' ] tags: [ 'v*.*.*' ] pull_request: branches: [ main ] diff --git a/.github/workflows/python-app.yaml b/.github/workflows/python-app.yaml index 088da8e..a5278f3 100644 --- a/.github/workflows/python-app.yaml +++ b/.github/workflows/python-app.yaml @@ -2,7 +2,6 @@ name: Python application on: push: - branches: [ main, github-actions ] pull_request: branches: [ main ] From ad1908d366dfb335073c0aa1f84e4022a1889ee9 Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 12:34:01 +0000 Subject: [PATCH 3/6] attempt to capture actual error --- src/main.py | 73 ++++++++++++++++++++++++++++++++++++++--------------- 1 file changed, 52 insertions(+), 21 deletions(-) diff --git a/src/main.py b/src/main.py index a29fa25..818e13d 100644 --- a/src/main.py +++ b/src/main.py @@ -184,37 +184,68 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As AsyncWeb3 instance if successful, None otherwise """ attempt = 0 + last_error = None + while attempt < max_attempts: + attempt += 1 + is_final_attempt = attempt >= max_attempts + try: if rpc_url.startswith(('ws://', 'wss://')): w3 = AsyncWeb3[WebSocketProvider](WebSocketProvider(rpc_url)) else: w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) - if await w3.is_connected(): + + is_connected = await w3.is_connected() + if is_connected: w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) + logger.info("Successfully connected to RPC node at %s", rpc_url) return w3 - attempt += 1 - wait_time = 2 ** attempt - logger.warning( - "Failed to connect to RPC node (attempt %s/%s), retrying in %s seconds", - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) + + # If not connected, try to get diagnostic info on final attempt + if is_final_attempt: + try: + await w3.eth.chain_id + except Exception as diagnostic_error: + last_error = diagnostic_error + logger.error( + "Connection diagnostic on final attempt: %s", + diagnostic_error + ) + else: + wait_time = 2 ** attempt + logger.warning( + "Failed to connect to RPC node (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) + except Exception as e: # pylint: disable=broad-exception-caught - attempt += 1 - wait_time = 2 ** attempt - logger.error( - "Error connecting to RPC node: %s (attempt %s/%s), retrying in %s seconds", - e, - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) + last_error = e # Always capture the error + if is_final_attempt: + logger.error( + "Error connecting to RPC node on final attempt: %s", + e + ) + else: + wait_time = 2 ** attempt + logger.error( + "Error connecting to RPC node: %s (attempt %s/%s), retrying in %s seconds", + e, + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) - logger.error("Unable to connect to RPC node after maximum retry attempts") + logger.error( + "Unable to connect to RPC node at %s after %s retry attempts%s", + rpc_url, + max_attempts, + f": {last_error}" if last_error else "" + ) return None async def liveness_handler(_request): From b78d721bd4526bccacedcef0ef252b1bbceaeb0e Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 12:47:58 +0000 Subject: [PATCH 4/6] init websocket connection --- src/main.py | 88 +++++++++++++++++++++++++++++------------------------ 1 file changed, 49 insertions(+), 39 deletions(-) diff --git a/src/main.py b/src/main.py index 818e13d..b3ef62a 100644 --- a/src/main.py +++ b/src/main.py @@ -8,8 +8,14 @@ import threading from datetime import datetime, timezone from typing import Optional +from web3.providers.rpc.async_rpc import AsyncHTTPProvider +from web3.providers.persistent.websocket import WebSocketProvider +from web3.providers.rpc.async_rpc import AsyncHTTPProvider +from web3.providers.persistent.websocket import WebSocketProvider +from web3.providers.rpc.async_rpc import AsyncHTTPProvider from aiohttp import web +from eth_utils.address import is_canonical_address from web3 import AsyncWeb3, Web3 from web3.middleware import ExtraDataToPOAMiddleware, SignAndSendRawMiddlewareBuilder from web3.providers.rpc.async_rpc import AsyncHTTPProvider @@ -185,67 +191,71 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As """ attempt = 0 last_error = None + is_websocket = rpc_url.startswith(('ws://', 'wss://')) while attempt < max_attempts: attempt += 1 is_final_attempt = attempt >= max_attempts try: - if rpc_url.startswith(('ws://', 'wss://')): + # Create provider + if is_websocket: w3 = AsyncWeb3[WebSocketProvider](WebSocketProvider(rpc_url)) else: w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) - is_connected = await w3.is_connected() - if is_connected: - w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) - logger.info("Successfully connected to RPC node at %s", rpc_url) - return w3 - - # If not connected, try to get diagnostic info on final attempt - if is_final_attempt: - try: - await w3.eth.chain_id - except Exception as diagnostic_error: - last_error = diagnostic_error - logger.error( - "Connection diagnostic on final attempt: %s", - diagnostic_error + # Check connection + try: + if is_websocket: + await w3.eth.chain_id # Trigger websocket connection + else: + if not await w3.is_connected(): + raise ConnectionError("RPC node not connected") + except Exception as conn_error: + last_error = conn_error + if is_final_attempt: + logger.error("Connection failed on final attempt: %s", conn_error) + break + else: + wait_time = 2 ** attempt + logger.warning( + "Failed to connect (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time ) - else: - wait_time = 2 ** attempt - logger.warning( - "Failed to connect to RPC node (attempt %s/%s), retrying in %s seconds", - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) + await asyncio.sleep(wait_time) + continue + + # Successfully connected + w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) + logger.info("Successfully connected to RPC node at %s", rpc_url) + return w3 except Exception as e: # pylint: disable=broad-exception-caught - last_error = e # Always capture the error + last_error = e if is_final_attempt: - logger.error( - "Error connecting to RPC node on final attempt: %s", - e - ) + logger.error("Error connecting on final attempt: %s", e) + break else: wait_time = 2 ** attempt logger.error( - "Error connecting to RPC node: %s (attempt %s/%s), retrying in %s seconds", - e, + "Error connecting (attempt %s/%s), retrying in %s seconds: %s", attempt, max_attempts, - wait_time + wait_time, + e ) await asyncio.sleep(wait_time) - logger.error( - "Unable to connect to RPC node at %s after %s retry attempts%s", - rpc_url, - max_attempts, - f": {last_error}" if last_error else "" - ) + # Final error message, if we didn't connect + if last_error: + logger.error( + "Unable to connect to RPC node at %s after %s attempts: %s", + rpc_url, + max_attempts, + last_error + ) return None async def liveness_handler(_request): From 672bcf1518c49d0c12f21c6d352be624d67cc515 Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 12:58:25 +0000 Subject: [PATCH 5/6] init websocket connection --- src/main.py | 65 ++++++++++++++++++++++++++++++++--------------------- 1 file changed, 39 insertions(+), 26 deletions(-) diff --git a/src/main.py b/src/main.py index b3ef62a..e52601a 100644 --- a/src/main.py +++ b/src/main.py @@ -13,6 +13,8 @@ from web3.providers.rpc.async_rpc import AsyncHTTPProvider from web3.providers.persistent.websocket import WebSocketProvider from web3.providers.rpc.async_rpc import AsyncHTTPProvider +from web3.providers.persistent.websocket import WebSocketProvider +from web3.providers.rpc.async_rpc import AsyncHTTPProvider from aiohttp import web from eth_utils.address import is_canonical_address @@ -198,34 +200,44 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As is_final_attempt = attempt >= max_attempts try: - # Create provider if is_websocket: + # https://web3py.readthedocs.io/en/stable/providers.html#web3.providers.persistent.WebSocketProvider w3 = AsyncWeb3[WebSocketProvider](WebSocketProvider(rpc_url)) + try: + await w3.provider.connect() + if not await w3.is_connected(): + raise ConnectionError("WebSocket connection not established") + except Exception as conn_error: + last_error = conn_error + if is_final_attempt: + logger.error("WebSocket connection failed on final attempt: %s", conn_error) + break + else: + wait_time = 2 ** attempt + logger.warning( + "Failed to connect (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) + continue else: w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) - - # Check connection - try: - if is_websocket: - await w3.eth.chain_id # Trigger websocket connection - else: - if not await w3.is_connected(): - raise ConnectionError("RPC node not connected") - except Exception as conn_error: - last_error = conn_error - if is_final_attempt: - logger.error("Connection failed on final attempt: %s", conn_error) - break - else: - wait_time = 2 ** attempt - logger.warning( - "Failed to connect (attempt %s/%s), retrying in %s seconds", - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) - continue + if not await w3.is_connected(): + if is_final_attempt: + logger.error("HTTP connection failed on final attempt") + break + else: + wait_time = 2 ** attempt + logger.warning( + "Failed to connect (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) + continue # Successfully connected w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) @@ -491,8 +503,9 @@ async def main(): finally: # Cleanup logger.info("Cleaning up") - if w3 and hasattr(w3.provider, 'close'): - await w3.provider.close() + is_websocket = args.rpc_url.startswith(('ws://', 'wss://')) + if is_websocket: + await w3.provider.disconnect() await health_runner.cleanup() logger.info("Shutdown complete") From 65cf4e67b90221886642d2bfc14b293e44125294 Mon Sep 17 00:00:00 2001 From: Sam Clusker <9279784+samclusker@users.noreply.github.com> Date: Sun, 7 Dec 2025 15:16:34 +0000 Subject: [PATCH 6/6] Final solution --- src/main.py | 91 +++++++++++++++++++++-------------------------------- 1 file changed, 35 insertions(+), 56 deletions(-) diff --git a/src/main.py b/src/main.py index e52601a..88f0c0e 100644 --- a/src/main.py +++ b/src/main.py @@ -8,16 +8,8 @@ import threading from datetime import datetime, timezone from typing import Optional -from web3.providers.rpc.async_rpc import AsyncHTTPProvider -from web3.providers.persistent.websocket import WebSocketProvider -from web3.providers.rpc.async_rpc import AsyncHTTPProvider -from web3.providers.persistent.websocket import WebSocketProvider -from web3.providers.rpc.async_rpc import AsyncHTTPProvider -from web3.providers.persistent.websocket import WebSocketProvider -from web3.providers.rpc.async_rpc import AsyncHTTPProvider from aiohttp import web -from eth_utils.address import is_canonical_address from web3 import AsyncWeb3, Web3 from web3.middleware import ExtraDataToPOAMiddleware, SignAndSendRawMiddlewareBuilder from web3.providers.rpc.async_rpc import AsyncHTTPProvider @@ -192,7 +184,6 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As AsyncWeb3 instance if successful, None otherwise """ attempt = 0 - last_error = None is_websocket = rpc_url.startswith(('ws://', 'wss://')) while attempt < max_attempts: @@ -205,39 +196,36 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As w3 = AsyncWeb3[WebSocketProvider](WebSocketProvider(rpc_url)) try: await w3.provider.connect() - if not await w3.is_connected(): - raise ConnectionError("WebSocket connection not established") - except Exception as conn_error: - last_error = conn_error + except Exception as conn_error: # pylint: disable=broad-exception-caught if is_final_attempt: logger.error("WebSocket connection failed on final attempt: %s", conn_error) break - else: - wait_time = 2 ** attempt - logger.warning( - "Failed to connect (attempt %s/%s), retrying in %s seconds", - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) - continue + wait_time = 2 ** attempt + logger.warning( + "Failed to connect (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) + continue else: w3 = AsyncWeb3[AsyncHTTPProvider](AsyncHTTPProvider(rpc_url)) - if not await w3.is_connected(): - if is_final_attempt: - logger.error("HTTP connection failed on final attempt") - break - else: - wait_time = 2 ** attempt - logger.warning( - "Failed to connect (attempt %s/%s), retrying in %s seconds", - attempt, - max_attempts, - wait_time - ) - await asyncio.sleep(wait_time) - continue + + # Verify connection + if not await w3.is_connected(): + if is_final_attempt: + logger.error("Connection verification failed on final attempt") + break + wait_time = 2 ** attempt + logger.warning( + "Failed to connect (attempt %s/%s), retrying in %s seconds", + attempt, + max_attempts, + wait_time + ) + await asyncio.sleep(wait_time) + continue # Successfully connected w3.middleware_onion.inject(ExtraDataToPOAMiddleware, layer=0) @@ -245,29 +233,20 @@ async def connect_with_retry(rpc_url: str, max_attempts: int = 3) -> Optional[As return w3 except Exception as e: # pylint: disable=broad-exception-caught - last_error = e if is_final_attempt: logger.error("Error connecting on final attempt: %s", e) break - else: - wait_time = 2 ** attempt - logger.error( - "Error connecting (attempt %s/%s), retrying in %s seconds: %s", - attempt, - max_attempts, - wait_time, - e - ) - await asyncio.sleep(wait_time) - # Final error message, if we didn't connect - if last_error: - logger.error( - "Unable to connect to RPC node at %s after %s attempts: %s", - rpc_url, - max_attempts, - last_error - ) + wait_time = 2 ** attempt + logger.error( + "Error connecting (attempt %s/%s), retrying in %s seconds: %s", + attempt, + max_attempts, + wait_time, + e + ) + await asyncio.sleep(wait_time) + return None async def liveness_handler(_request):