33from urllib .parse import quote
44import asyncio
55import json
6- import requests
76import sys
8- import time
7+ import nest_asyncio
98
109class Subscriber :
1110 def __init__ (self , use_config ):
1211 self ._use_config = use_config
1312 self ._subscribe ()
1413
1514 def _subscribe (self ):
15+ nest_asyncio .apply ()
1616 config = self ._use_config ()
1717 mercure = config ['mercure' ]
18- try :
19- messages = SSEClient (
20- f"{ mercure ['url' ]} ?topic={ quote (mercure ['subscriber' ]['topic' ])} " ,
21- headers = {
22- 'Authorization' : f"Bearer { mercure ['subscriber' ]['token' ]} " ,
23- }
24- )
25- for message in messages :
26- self ._on_event (json .loads (message .data ))
27- except Exception as e :
28- print (e )
18+ messages = SSEClient (
19+ f"{ mercure ['url' ]} ?topic={ quote (mercure ['subscriber' ]['topic' ])} " ,
20+ headers = {
21+ 'Authorization' : f"Bearer { mercure ['subscriber' ]['token' ]} "
22+ }
23+ )
24+ for message in messages :
25+ try :
26+ event = json .loads (message .data )
27+ except json .JSONDecodeError as e :
28+ print (f"Failed to decode JSON: { e } " , file = sys .stderr )
29+ continue
30+ self ._on_event (event )
2931
3032 def _on_event (self , event ):
3133 config = self ._use_config ()
3234 if event ['type' ] == 'ping' :
3335 config ['publisher' ].on_ping ()
3436 return
3537 if event ['type' ] == 'function_call' :
36- for fn in config ['functions' ]:
37- if fn .__name__ != event ['name' ]:
38+ for function in config ['functions' ]:
39+ if function .__name__ != event ['name' ]:
3840 continue
3941 arguments_list = [
4042 event ['args' ].get (name ) or event ['defaultArgs' ].get (name )
41- for name in fn .__code__ .co_varnames
43+ for name in function .__code__ .co_varnames
4244 ]
43- output = fn (* arguments_list )
45+ output = function (* arguments_list )
4446 if asyncio .iscoroutine (output ):
4547 output = asyncio .run (output )
4648 event ['output' ] = output
4749 config ['publisher' ].publish_event (event )
4850 return
49- print (f'Function { fn .__name__ } not found.' , file = sys .stderr )
51+ print (f'Function { function .__name__ } not found.' , file = sys .stderr )
5052 return
5153 if event ['type' ] == 'boot' :
5254 for name , value in event ['parameters' ].items ():
5355 for parameter in config ['parameters' ]:
5456 if parameter .name != name :
5557 continue
56- parameter .set_value (value )
58+ if isinstance (value , str ):
59+ parameter .set_value (value )
60+ else :
61+ parameter .set_value ("" )
5762 ready = True
5863 for parameter in config ['parameters' ]:
5964 if parameter .is_optional :
@@ -69,8 +74,4 @@ def _on_event(self, event):
6974 arguments_list = [event ['args' ].get (name ) for name in callback .__code__ .co_varnames ]
7075 output = callback (* arguments_list )
7176 if asyncio .iscoroutine (output ):
72- output = asyncio .run (output )
73-
74- def _get_event_types (self ):
75- with files ('talkops.data' ).joinpath ('event-types.json' ).open ('r' , encoding = 'utf-8' ) as f :
76- return json .load (f )
77+ asyncio .run (output )
0 commit comments