Skip to content

Commit 98d9140

Browse files
committed
Initial working version
1 parent 745221c commit 98d9140

8 files changed

Lines changed: 435 additions & 1 deletion

File tree

.github/workflows/publish.yml

Lines changed: 32 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,32 @@
1+
name: Publish Python 🐍 distributions 📦 to PyPI
2+
3+
on:
4+
release:
5+
types: [created]
6+
7+
jobs:
8+
build-n-publish:
9+
name: Build and publish Python 🐍 distributions 📦 to PyPI
10+
runs-on: ubuntu-latest
11+
steps:
12+
- uses: actions/checkout@v4
13+
14+
- name: Set up Python
15+
uses: actions/setup-python@v3
16+
with:
17+
python-version: 3.11
18+
19+
- name: Install dependencies
20+
run: |
21+
python -m pip install --upgrade pip
22+
pip install setuptools wheel
23+
24+
- name: Build the package
25+
run: |
26+
python setup.py sdist bdist_wheel
27+
28+
- name: Publish distribution 📦 to PyPI
29+
uses: pypa/gh-action-pypi-publish@release/v1
30+
with:
31+
user: __token__
32+
password: ${{ secrets.PYPI_API_TOKEN }}

README.md

Lines changed: 163 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1 +1,163 @@
1-
# python-appsync-ws-client
1+
2+
# AWS AppSync WebSocket Client
3+
4+
A Python client for subscribing to AWS AppSync GraphQL APIs via WebSockets. This package allows you to connect to the AWS AppSync WebSocket API, handle GraphQL subscriptions, and manage reconnections and retries seamlessly.
5+
6+
## Features
7+
8+
- **GraphQL Subscriptions**: Easily subscribe to GraphQL queries over WebSockets.
9+
- **Automatic Reconnection**: Handles reconnection attempts in case of dropped WebSocket connections.
10+
- **Thread-safe**: Manages multiple subscriptions with thread-safe operations.
11+
- **Callback Handling**: Provides a way to specify callback functions that process subscription data.
12+
13+
## Installation
14+
15+
Install the package via pip:
16+
17+
```bash
18+
pip install appsync-ws-client
19+
```
20+
21+
## Usage
22+
23+
### 1. Initialize the Client
24+
25+
To use the client, provide the WebSocket URL and an authentication function that returns the necessary headers.
26+
27+
```python
28+
from appsync_ws_client.client import GraphQLWebSocketClient
29+
30+
def get_auth_headers():
31+
return {
32+
"host": "xxx.appsync-api.<region>.amazonaws.com",
33+
"Authorization": "<ACCESS_TOKEN>",
34+
}
35+
36+
url = "wss://<your-appsync-endpoint>"
37+
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers)
38+
client.connect()
39+
```
40+
41+
### 2. Subscribing to a GraphQL Query
42+
43+
You can subscribe to a GraphQL query using the `subscribe` method. The subscription requires a GraphQL query, variables (if any), and a callback function to handle the subscription data.
44+
45+
```python
46+
query = '''
47+
subscription OnPriceUpdate {
48+
onPriceUpdate {
49+
id
50+
price
51+
timestamp
52+
}
53+
}
54+
'''
55+
56+
def handle_subscription_data(data):
57+
print("Received subscription data:", data)
58+
59+
subscription_id = client.subscribe(query, variables={}, callback=handle_subscription_data)
60+
```
61+
62+
### 3. Unsubscribing
63+
64+
To unsubscribe from a subscription, use the `unsubscribe` method with the `subscription_id` that was returned when you subscribed.
65+
66+
```python
67+
client.unsubscribe(subscription_id)
68+
```
69+
70+
### 4. Closing the Connection
71+
72+
Ensure you close the WebSocket connection properly when done:
73+
74+
```python
75+
client.close()
76+
```
77+
78+
### 5. Handling Reconnection
79+
80+
The client automatically attempts to reconnect when a WebSocket connection drops. You can control the number of retry attempts by passing `max_retries` to the client. For example:
81+
82+
```python
83+
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers, max_retries=10)
84+
client.connect()
85+
```
86+
87+
## Error Handling
88+
89+
The package will raise the following errors:
90+
91+
- **`TimeoutError`**: Raised when the connection acknowledgment times out.
92+
- **`MaxRetriesExceeded`**: Raised when the maximum number of reconnection attempts is exceeded.
93+
94+
You can also handle WebSocket errors using the client’s internal logging.
95+
96+
## Logging
97+
98+
Logging is built in to help monitor the WebSocket connection and subscription process. Make sure to configure logging in your application as necessary:
99+
100+
```python
101+
import logging
102+
103+
logging.basicConfig(level=logging.INFO)
104+
```
105+
106+
## Example
107+
108+
Here is a full example of setting up the client and subscribing to a GraphQL subscription:
109+
110+
```python
111+
import time
112+
import logging
113+
from appsync_ws_client.client import GraphQLWebSocketClient
114+
115+
logging.basicConfig(level=logging.INFO)
116+
117+
def get_auth_headers():
118+
return {
119+
"host": "xxx.appsync-api.<region>.amazonaws.com",
120+
"Authorization": "<ACCESS_TOKEN>",
121+
}
122+
123+
url = "wss://<your-appsync-endpoint>"
124+
client = GraphQLWebSocketClient(url, auth_function=get_auth_headers)
125+
client.connect()
126+
127+
query = '''
128+
subscription OnPriceUpdate {
129+
onPriceUpdate {
130+
id
131+
price
132+
timestamp
133+
}
134+
}
135+
'''
136+
137+
def handle_subscription_data(data):
138+
print("Received subscription data:", data)
139+
140+
subscription_id = client.subscribe(query, variables={}, callback=handle_subscription_data)
141+
142+
try:
143+
while True:
144+
time.sleep(1) # Keeps the main program alive
145+
except KeyboardInterrupt:
146+
print("Closing WebSocket and shutting down...")
147+
client.close()
148+
149+
150+
# Later, if you want to unsubscribe
151+
client.unsubscribe(subscription_id)
152+
153+
# Always remember to close the connection when done
154+
client.close()
155+
```
156+
157+
## License
158+
159+
This package is licensed under the MIT License. See the [LICENSE](LICENSE) file for more details.
160+
161+
## Contributing
162+
163+
Feel free to open an issue or submit a pull request if you want to contribute!

appsync_ws_client/ __init__.py

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1 @@
1+
from .client import GraphQLWebSocketClient # noqa

appsync_ws_client/client.py

Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
import time
2+
import websocket
3+
import json
4+
import base64
5+
import threading
6+
import uuid
7+
import logging
8+
from typing import Callable, Dict, Any
9+
from .exceptions import MaxRetriesExceeded
10+
11+
logger = logging.getLogger(__name__)
12+
13+
14+
class GraphQLWebSocketClient:
15+
def __init__(self, url: str, auth_function: Callable[[], Dict[str, str]], max_retries: int = 5):
16+
self.url = url
17+
self.auth_function = auth_function
18+
self.ws = None
19+
self.subscriptions = {}
20+
self._is_open = False
21+
self._acknowledged_event = threading.Event()
22+
self.max_retries = max_retries
23+
self.retry_count = 0
24+
self.lock = threading.Lock()
25+
26+
def _build_ws_url(self) -> str:
27+
"""
28+
Build the WebSocket URL with authentication headers.
29+
"""
30+
auth_info = self.auth_function()
31+
headers_encoded = base64.b64encode(json.dumps(auth_info).encode()).decode()
32+
all_url = f"{self.url}?header={headers_encoded}&payload={base64.b64encode(json.dumps({}).encode()).decode()}"
33+
logger.debug(f"WebSocket URL: {all_url}")
34+
return all_url
35+
36+
def connect(self):
37+
"""
38+
Establish the WebSocket connection.
39+
"""
40+
try:
41+
ws_url = self._build_ws_url()
42+
self.ws = websocket.WebSocketApp(
43+
ws_url,
44+
subprotocols=["graphql-ws"],
45+
on_open=self._on_open,
46+
on_message=self._on_message,
47+
on_error=self._on_error,
48+
on_close=self._on_close,
49+
)
50+
thread = threading.Thread(target=self.ws.run_forever)
51+
thread.daemon = True
52+
thread.start()
53+
except Exception as e:
54+
logger.error(f"Failed to connect: {e}")
55+
self._attempt_reconnect()
56+
57+
def _on_open(self, ws):
58+
"""
59+
WebSocket open event handler. Sends connection initialization message.
60+
"""
61+
self._is_open = True
62+
self._acknowledged_event.clear()
63+
self._send_message({"type": "connection_init"})
64+
logger.info("WebSocket connection opened.")
65+
self.retry_count = 0
66+
67+
def _on_message(self, ws, message):
68+
"""
69+
WebSocket message event handler. Process incoming messages.
70+
"""
71+
msg = json.loads(message)
72+
message_type = msg.get('type')
73+
74+
if message_type == "connection_ack":
75+
self._acknowledged_event.set()
76+
logger.info("Connection acknowledged.")
77+
elif message_type == "ka":
78+
logger.debug("Keep-alive received.")
79+
elif message_type == "data":
80+
subscription_id = msg.get('id')
81+
if subscription_id and subscription_id in self.subscriptions:
82+
callback = self.subscriptions[subscription_id]['callback']
83+
callback(msg['payload'])
84+
elif message_type == "error":
85+
logger.error(f"Error received: {msg.get('payload')}")
86+
87+
def _on_error(self, ws, error):
88+
"""
89+
WebSocket error event handler.
90+
"""
91+
logger.error(f"WebSocket error occurred: {error}")
92+
if isinstance(error, Exception):
93+
logger.exception("Exception details:", exc_info=error)
94+
self._attempt_reconnect()
95+
96+
def _on_close(self, ws, close_status_code, close_msg):
97+
"""
98+
WebSocket close event handler.
99+
"""
100+
self._is_open = False
101+
self._acknowledged_event.clear()
102+
logger.warning(f"WebSocket closed: {close_status_code}, message: {close_msg}")
103+
self._attempt_reconnect()
104+
105+
def _attempt_reconnect(self):
106+
"""
107+
Attempt to reconnect to the WebSocket.
108+
"""
109+
if self.retry_count < self.max_retries:
110+
self.retry_count += 1
111+
wait_time = 2**self.retry_count + (0.5 * self.retry_count)
112+
logger.info(f"Reconnecting in {wait_time} seconds...")
113+
time.sleep(wait_time)
114+
self.connect()
115+
else:
116+
raise MaxRetriesExceeded("Max retries reached.")
117+
118+
def subscribe(
119+
self, query: str, variables: Dict[str, Any], callback: Callable[[Dict[str, Any]], None], acknowledgment_timeout: int = 10
120+
) -> str:
121+
"""
122+
Subscribe to a GraphQL query via WebSocket.
123+
124+
:param query: GraphQL query as a string.
125+
:param variables: Variables for the GraphQL query.
126+
:param callback: A callback function that handles the incoming data.
127+
:return: The subscription ID (used for unsubscribing later).
128+
"""
129+
# Wait for connection to be acknowledged before subscribing
130+
if not self._acknowledged_event.wait(timeout=acknowledgment_timeout):
131+
logger.error("Connection acknowledgment timeout.")
132+
raise TimeoutError("Connection acknowledgment timeout.")
133+
134+
subscription_id = str(uuid.uuid4())
135+
self.subscriptions[subscription_id] = {'query': query, 'variables': variables, 'callback': callback}
136+
137+
message = {
138+
"id": subscription_id,
139+
"type": "start",
140+
"payload": {
141+
"data": json.dumps({"query": query, "variables": variables}),
142+
"extensions": {"authorization": self.auth_function()},
143+
},
144+
}
145+
self._send_message(message)
146+
logger.info(f"Subscribed with ID: {subscription_id}")
147+
return subscription_id
148+
149+
def unsubscribe(self, subscription_id: str):
150+
"""
151+
Unsubscribe from a subscription.
152+
:param subscription_id: The subscription ID.
153+
"""
154+
with self.lock:
155+
if subscription_id in self.subscriptions:
156+
self._send_message({"id": subscription_id, "type": "stop"})
157+
del self.subscriptions[subscription_id]
158+
logger.info(f"Unsubscribed from: {subscription_id}")
159+
160+
def _send_message(self, message: Dict[str, Any]):
161+
"""
162+
Send a message over the WebSocket.
163+
"""
164+
with self.lock:
165+
if self.ws and self._is_open:
166+
self.ws.send(json.dumps(message))
167+
else:
168+
logger.warning("WebSocket is not open. Cannot send message.")
169+
170+
def isConnectionOpen(self):
171+
"""
172+
Check if the WebSocket connection is open.
173+
"""
174+
return self._is_open
175+
176+
def isAcknowledged(self):
177+
"""
178+
Check if the connection has been acknowledged.
179+
"""
180+
return self._acknowledged_event.is_set()
181+
182+
def close(self):
183+
"""
184+
Close the WebSocket connection.
185+
"""
186+
with self.lock:
187+
if self.ws:
188+
self.ws.close()
189+
self._is_open = False
190+
logger.info("WebSocket connection closed.")
191+
192+
def __del__(self):
193+
self.close()

appsync_ws_client/exceptions.py

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
class MaxRetriesExceeded(Exception):
2+
pass

0 commit comments

Comments
 (0)