Skip to content

Commit 636ee42

Browse files
committed
update
1 parent d475006 commit 636ee42

File tree

4 files changed

+30
-43
lines changed

4 files changed

+30
-43
lines changed

setup.py

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -2,7 +2,7 @@
22

33
setup(
44
name='talkops',
5-
version='1.3.1',
5+
version='1.3.2',
66
author='PicoUX',
77
description="TalkOps SDK",
88
long_description=open('README.md',encoding='utf-8').read(),
@@ -16,7 +16,7 @@
1616
'Issues': 'https://github.com/talkops/sdk-python/issues',
1717
'Source': 'https://github.com/talkops/sdk-python',
1818
},
19-
install_requires=['aiohttp-sse-client', 'jinja2', 'nest_asyncio'],
19+
install_requires=['aiofiles', 'aiohttp-sse-client', 'jinja2', 'nest_asyncio'],
2020
keywords=['sdk'],
2121
classifiers=[
2222
"Programming Language :: Python :: 3",

talkops/extension.py

Lines changed: 1 addition & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -29,7 +29,6 @@ def __init__(self, token=None):
2929
self._publisher = None
3030
self._software_version = None
3131
self._started = False
32-
self._subscriber = None
3332
self._token = token or os.environ.get('TALKOPS_TOKEN')
3433
self._website = None
3534

@@ -77,7 +76,7 @@ async def _setup(self):
7776
'functionSchemas': self._function_schemas,
7877
}
7978
)
80-
self._subscriber = Subscriber(
79+
Subscriber(
8180
lambda: {
8281
'callbacks': self._callbacks,
8382
'extension': self,
@@ -87,10 +86,6 @@ async def _setup(self):
8786
'publisher': self._publisher,
8887
}
8988
)
90-
await asyncio.gather(
91-
self._publisher.start(),
92-
self._subscriber.start()
93-
)
9489

9590
def start(self):
9691
if self._started:

talkops/publisher.py

Lines changed: 16 additions & 34 deletions
Original file line numberDiff line numberDiff line change
@@ -1,8 +1,8 @@
11
from urllib.parse import urlencode
2+
import aiofiles.os
23
import aiohttp
34
import asyncio
45
import json
5-
import sys
66
import time
77

88
class Publisher:
@@ -11,38 +11,20 @@ def __init__(self, use_config, use_state):
1111
self._use_state = use_state
1212
self._last_event_state = None
1313
self._last_ping_at = None
14-
15-
async def start(self):
16-
self._original_stdout_write = sys.stdout.write
17-
self._original_stderr_write = sys.stderr.write
18-
def stdout_wrapper(chunk):
19-
data = chunk.strip()
20-
if(data != ""):
21-
asyncio.create_task(self.publish_event({
22-
'type': 'stdout',
23-
'data': data,
24-
'time': time.time()
25-
}))
26-
return self._original_stdout_write(chunk)
27-
def stderr_wrapper(chunk):
28-
if "KeyboardInterrupt" not in chunk:
29-
data = chunk.strip()
30-
if(data != ""):
31-
asyncio.create_task(self.publish_event({
32-
'type': 'stderr',
33-
'data': data,
34-
'time': time.time()
35-
}))
36-
return self._original_stderr_write(chunk)
37-
sys.stdout.write = stdout_wrapper
38-
sys.stderr.write = stderr_wrapper
39-
4014
asyncio.create_task(self._publish_data({'type': 'init'}))
41-
await asyncio.sleep(0.5)
42-
asyncio.create_task(self._periodic_publish_state())
15+
asyncio.create_task(self._publish_state_periodically())
16+
17+
async def _generate_event_state(self):
18+
state = self._use_state()
19+
try:
20+
stat_result = await aiofiles.os.stat("error.log")
21+
state["hasErrors"] = stat_result.st_size > 0
22+
except FileNotFoundError:
23+
state["hasErrors"] = False
24+
return {"type": "state", "state": state}
4325

4426
async def publish_state(self):
45-
event = {'type': 'state', 'state': self._use_state()}
27+
event = await self._generate_event_state()
4628
self._last_event_state = json.dumps(event)
4729
await self.publish_event(event)
4830

@@ -76,11 +58,11 @@ async def _publish_data(self, data):
7658
raise Exception(f"Failed to publish event: {response.status}")
7759
return await response.text()
7860

79-
async def _periodic_publish_state(self):
61+
async def _publish_state_periodically(self):
8062
while True:
81-
event = {'type': 'state', 'state': self._use_state()}
63+
await asyncio.sleep(0.5)
64+
event = await self._generate_event_state()
8265
last_event_state = json.dumps(event)
8366
if self._last_event_state != last_event_state:
84-
await self.publish_event(event)
8567
self._last_event_state = last_event_state
86-
await asyncio.sleep(1.0)
68+
asyncio.create_task(self.publish_event(event))

talkops/subscriber.py

Lines changed: 11 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,4 +1,5 @@
11
from aiohttp_sse_client.client import EventSource
2+
from pathlib import Path
23
from urllib.parse import quote
34
import asyncio
45
import json
@@ -7,8 +8,9 @@
78
class Subscriber:
89
def __init__(self, use_config):
910
self._use_config = use_config
11+
asyncio.create_task(self._subscribe())
1012

11-
async def start(self):
13+
async def _subscribe(self):
1214
config = self._use_config()
1315
mercure = config['mercure']
1416
while True:
@@ -31,6 +33,14 @@ async def _on_event(self, event):
3133
if event['type'] == 'ping':
3234
asyncio.create_task(config['publisher'].on_ping())
3335
return
36+
if event['type'] == 'debug':
37+
file_path = 'error.log'
38+
file = Path(file_path)
39+
event['data'] = file.read_text(encoding='utf-8')
40+
if event['data']:
41+
asyncio.create_task(config['publisher'].publish_event(event))
42+
file.write_text('', encoding='utf-8')
43+
return
3444
if event['type'] == 'function_call':
3545
for function in config['functions']:
3646
if function.__name__ != event['name']:

0 commit comments

Comments
 (0)