forked from RosettaTechnologies/AnkiBrain
-
Notifications
You must be signed in to change notification settings - Fork 1
Expand file tree
/
Copy pathExternalScriptManager.py
More file actions
93 lines (77 loc) · 2.99 KB
/
Copy pathExternalScriptManager.py
File metadata and controls
93 lines (77 loc) · 2.99 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
import asyncio
import atexit
import json
import platform
import subprocess
from InterprocessCommand import InterprocessCommand
class ExternalScriptManager:
def __init__(self, python_path, script_path):
self.python_path = python_path
self.script_path = script_path
self.process = None
self.lock = asyncio.Lock()
async def start(self):
creationflags = 0
if platform.system() == 'Windows':
creationflags = subprocess.CREATE_NO_WINDOW
self.process = await asyncio.create_subprocess_exec(
self.python_path,
self.script_path,
stdin=asyncio.subprocess.PIPE,
stdout=asyncio.subprocess.PIPE,
stderr=asyncio.subprocess.PIPE,
creationflags=creationflags,
limit=1024 * 1024 * 1024 * 1024 # 1 GB
)
atexit.register(self.process.terminate)
# Wait for the ready message from external script.
print('Waiting for ChatAI Ready Message')
ready_msg = await self.process.stdout.readline()
# async def read_all(stream):
# output = []
# while True:
# line = await stream.readline()
# if not line:
# break
# output.append(line.decode().strip())
# return '\n'.join(output)
#
# error_msg = await read_all(self.process.stderr)
# print(error_msg)
ready_data = json.loads(ready_msg.decode().strip())
if ready_data['status'] == 'success':
print('Completed startup of ChatAI module')
else:
raise Exception('Error starting ChatAI module')
async def stop(self):
if self.process is not None:
self.process.terminate()
await self.process.wait()
def terminate_sync(self):
if self.process is None:
return
print('Terminating ChatAI subprocess...')
self.process.terminate()
async def call(self, input_data: dict[str, str]) -> dict[str, str]:
try:
data_str: str = json.dumps(input_data)
async with self.lock: # Acquire lock before writing and draining
self.process.stdin.write(data_str.encode() + b'\n')
await self.process.stdin.drain()
output_str = await self.process.stdout.readline()
async with self.lock: # Acquire lock again before loading the json
output_data = json.loads(output_str.decode().strip())
# Handle module error.
if output_data['cmd'] == InterprocessCommand.SUBMODULE_ERROR.value:
error_msg = output_data['data']['error']
raise Exception(error_msg)
return output_data
except Exception as e:
raise Exception(str(e))
# print(e)
# return {
# 'cmd': 'SUBMODULE_ERROR',
# 'data': {
# 'error': str(e)
# }
# }