|
1 | 1 | from importlib.resources import files |
| 2 | +from sseclient import SSEClient |
2 | 3 | from urllib.parse import quote |
3 | 4 | import asyncio |
4 | 5 | import json |
5 | | -import os |
6 | | -import re |
7 | 6 | import requests |
8 | | -import sseclient |
9 | 7 | import sys |
10 | | -import threading |
11 | 8 | import time |
12 | 9 |
|
13 | 10 | class Subscriber: |
14 | 11 | def __init__(self, use_config): |
15 | 12 | self._use_config = use_config |
16 | | - self._thread = None |
17 | | - self._running = False |
18 | | - self._start() |
19 | | - |
20 | | - def _start(self): |
21 | | - if self._thread is not None: |
22 | | - return |
23 | | - |
24 | | - self._running = True |
25 | | - self._thread = threading.Thread(target=self._subscribe) |
26 | | - self._thread.daemon = True |
27 | | - self._thread.start() |
28 | | - |
29 | | - def stop(self): |
30 | | - if self._thread is None: |
31 | | - return |
32 | | - |
33 | | - self._running = False |
34 | | - self._thread.join() |
35 | | - self._thread = None |
| 13 | + self._subscribe() |
36 | 14 |
|
37 | 15 | def _subscribe(self): |
38 | | - while self._running: |
39 | | - try: |
40 | | - config = self._use_config() |
41 | | - mercure = config['mercure'] |
42 | | - response = requests.get( |
43 | | - f"{mercure['url']}?topic={quote(mercure['subscriber']['topic'])}", |
44 | | - headers={ |
45 | | - 'Authorization': f"Bearer {mercure['subscriber']['token']}", |
46 | | - }, |
47 | | - stream=True, |
48 | | - ) |
49 | | - response.raise_for_status() |
50 | | - |
51 | | - client = sseclient.SSEClient(response) |
52 | | - for event in client.events(): |
53 | | - if not self._running: |
54 | | - break |
55 | | - self._on_event(json.loads(event.data)) |
56 | | - |
57 | | - except Exception as e: |
58 | | - print(f"Error in subscriber: {e}", file=sys.stderr) |
59 | | - if self._running: |
60 | | - time.sleep(5) |
| 16 | + config = self._use_config() |
| 17 | + mercure = config['mercure'] |
| 18 | + try: |
| 19 | + messages = SSEClient( |
| 20 | + f"{mercure['url']}?topic={quote(mercure['subscriber']['topic'])}", |
| 21 | + headers={ |
| 22 | + 'Authorization': f"Bearer {mercure['subscriber']['token']}", |
| 23 | + } |
| 24 | + ) |
| 25 | + for message in messages: |
| 26 | + self._on_event(json.loads(message.data)) |
| 27 | + except Exception as e: |
| 28 | + print(e) |
61 | 29 |
|
62 | 30 | def _on_event(self, event): |
63 | 31 | config = self._use_config() |
@@ -96,7 +64,7 @@ def _on_event(self, event): |
96 | 64 | config['publisher'].publish_state() |
97 | 65 | if not ready: |
98 | 66 | return |
99 | | - if event['type'] in self._get_event_types() and event['type'] in config['callbacks']: |
| 67 | + if event['type'] in config['callbacks']: |
100 | 68 | callback = config['callbacks'][event['type']] |
101 | 69 | arguments_list = [event['args'].get(name) for name in callback.__code__.co_varnames] |
102 | 70 | output = callback(*arguments_list) |
|
0 commit comments