22
33import asyncio
44import json
5- import logging
6- import os
75import shutil
86import signal
97import sys
1917from centml .sdk .shell .exceptions import NoPodAvailableError , PodNotFoundError
2018from centml .sdk .shell .renderer import pyte_extract_text , render_dirty
2119
22- _log = logging .getLogger ("centml.sdk.shell" )
23-
2420BEGIN_MARKER = "__CENTML_BEGIN_5f3a__"
2521END_MARKER = "__CENTML_END_5f3a__"
2622
3026PRINTF_END = r"\137\137CENTML_END_5f3a\137\137"
3127
3228
33- def setup_debug_log ():
34- """Configure file-based debug logging (stdout unusable in raw mode)."""
35- log_path = os .environ .get ("CENTML_SHELL_DEBUG_LOG" , "/tmp/centml_shell_debug.log" )
36- handler = logging .FileHandler (log_path , mode = "w" )
37- handler .setFormatter (logging .Formatter ("%(asctime)s.%(msecs)03d %(message)s" , datefmt = "%H:%M:%S" ))
38- _log .addHandler (handler )
39- _log .setLevel (logging .DEBUG )
40- _log .debug ("=== shell debug log started (pid=%d) ===" , os .getpid ())
41-
42-
4329def build_ws_url (api_url , deployment_id , pod_name , shell_type = None ):
4430 """Build the WebSocket URL for a terminal connection."""
4531 parsed = urllib .parse .urlparse (api_url )
@@ -84,9 +70,7 @@ def resolve_pod(cclient, deployment_id, pod_name=None) -> Tuple[str, Optional[st
8470
8571 warning = None
8672 if len (running_pods ) > 1 :
87- warning = (
88- f"Multiple running pods found, connecting to { running_pods [0 ]} . " f"Use --pod to specify a different pod."
89- )
73+ warning = f"Multiple running pods found, connecting to { running_pods [0 ]} ."
9074 return running_pods [0 ], warning
9175
9276
@@ -114,38 +98,28 @@ async def forward_io(ws, screen, stream, shutdown):
11498 stdin_closed = asyncio .Event ()
11599
116100 async def _read_ws ():
117- _log .debug ("[read_ws] started" )
118- msg_count = 0
119101 try :
120102 while True :
121103 raw_msg = await ws .recv ()
122- msg_count += 1
123104 msg = json .loads (raw_msg )
124- keys = list (msg .keys ())
125105 data = msg .get ("data" , "" )
126- data_snippet = repr (data [:120 ]) if data else ""
127- _log .debug ("[read_ws] msg#%d keys=%s data=%s" , msg_count , keys , data_snippet )
128106 if data :
129107 stream .feed (data .replace ("\n " , "\r \n " ))
130108 render_dirty (screen , sys .stdout .buffer )
131109 elif msg .get ("error" ):
132- _log .debug ("[read_ws] error: %s" , msg ["error" ])
133110 stream .feed (f"Error: { msg ['error' ]} \r \n " )
134111 render_dirty (screen , sys .stdout .buffer )
135- except websockets .ConnectionClosed as exc :
136- _log .debug ("[read_ws] ConnectionClosed after %d msgs: %s" , msg_count , exc )
112+ except websockets .ConnectionClosed :
137113 return
138114
139115 async def _read_stdin ():
140- _log .debug ("[read_stdin] started" )
141116 read_queue = asyncio .Queue ()
142117
143118 def _on_stdin_ready ():
144119 data = sys .stdin .buffer .read1 (4096 )
145120 if data :
146121 read_queue .put_nowait (data )
147122 else :
148- _log .debug ("[read_stdin] stdin EOF" )
149123 stdin_closed .set ()
150124
151125 loop .add_reader (stdin_fd , _on_stdin_ready )
@@ -155,7 +129,6 @@ def _on_stdin_ready():
155129 data = await asyncio .wait_for (read_queue .get (), timeout = 0.5 )
156130 except asyncio .TimeoutError :
157131 continue
158- _log .debug ("[read_stdin] sending %d bytes: %s" , len (data ), repr (data [:40 ]))
159132 try :
160133 await ws .send (
161134 json .dumps (
@@ -168,29 +141,20 @@ def _on_stdin_ready():
168141 )
169142 )
170143 except websockets .ConnectionClosed :
171- _log .debug ("[read_stdin] ws closed on send" )
172144 return
173- _log .debug (
174- "[read_stdin] loop exited: stdin_closed=%s shutdown=%s" , stdin_closed .is_set (), shutdown .is_set ()
175- )
176145 finally :
177146 loop .remove_reader (stdin_fd )
178147
179148 async def _watch_shutdown ():
180149 while not shutdown .is_set ():
181150 await asyncio .sleep (0.2 )
182151
183- _log .debug ("[forward_io] creating tasks" )
184152 task_ws = asyncio .create_task (_read_ws ())
185153 task_stdin = asyncio .create_task (_read_stdin ())
186154 task_shutdown = asyncio .create_task (_watch_shutdown ())
187155 tasks = [task_ws , task_stdin , task_shutdown ]
188- task_names = {id (task_ws ): "read_ws" , id (task_stdin ): "read_stdin" , id (task_shutdown ): "watch_shutdown" }
189156
190157 done , pending = await asyncio .wait (tasks , return_when = asyncio .FIRST_COMPLETED )
191- done_names = [task_names [id (t )] for t in done ]
192- pending_names = [task_names [id (t )] for t in pending ]
193- _log .debug ("[forward_io] first completed: done=%s pending=%s" , done_names , pending_names )
194158
195159 for t in pending :
196160 t .cancel ()
@@ -201,9 +165,7 @@ async def _watch_shutdown():
201165 pass
202166 for t in done :
203167 if t .exception () is not None :
204- _log .debug ("[forward_io] task exception: %s" , t .exception ())
205168 raise t .exception ()
206- _log .debug ("[forward_io] returning exit_code=0" )
207169 return 0
208170
209171
@@ -234,9 +196,7 @@ async def interactive_session(ws_url, token):
234196 loop .add_signal_handler (signal .SIGHUP , shutdown .set )
235197
236198 headers = {"Authorization" : f"Bearer { token } " }
237- _log .debug ("[session] connecting to %s" , ws_url )
238199 async with websockets .connect (ws_url , additional_headers = headers , close_timeout = 2 ) as ws :
239- _log .debug ("[session] connected, sending resize %dx%d" , cols , rows )
240200
241201 def _send_resize ():
242202 c , r = shutil .get_terminal_size ()
@@ -252,7 +212,6 @@ def _send_resize():
252212 finally :
253213 loop .remove_signal_handler (signal .SIGWINCH )
254214
255- _log .debug ("[session] exiting with code %d" , exit_code )
256215 return exit_code
257216 finally :
258217 loop .remove_signal_handler (signal .SIGTERM )
@@ -297,20 +256,15 @@ async def exec_session(ws_url, token, command):
297256 buffer = ""
298257 is_capturing = False
299258 is_done = False
300- msg_count = 0
301259 try :
302260 async for raw_msg in ws :
303- msg_count += 1
304261 msg = json .loads (raw_msg )
305- keys = list (msg .keys ())
306- _log .debug ("[exec] msg#%d keys=%s data_len=%d" , msg_count , keys , len (msg .get ("data" , "" )))
307262 if msg .get ("data" ):
308263 buffer += msg ["data" ]
309264 while "\n " in buffer :
310265 line , buffer = buffer .split ("\n " , 1 )
311266 clean = pyte_extract_text (line_stream , line_screen , line .rstrip ("\r " ))
312267 if BEGIN_MARKER in clean :
313- _log .debug ("[exec] BEGIN marker found" )
314268 is_capturing = True
315269 continue
316270 if END_MARKER in clean :
@@ -320,20 +274,16 @@ async def exec_session(ws_url, token, command):
320274 exit_code = int (parts [1 ].strip ())
321275 except ValueError :
322276 pass
323- _log .debug ("[exec] END marker, exit_code=%d" , exit_code )
324277 is_done = True
325278 break
326279 if is_capturing :
327280 sys .stdout .write (line + "\n " )
328281 sys .stdout .flush ()
329282 elif msg .get ("error" ):
330- _log .debug ("[exec] error: %s" , msg ["error" ])
331283 sys .stderr .write (f"Error: { msg ['error' ]} \n " )
332284 return 1
333285 if is_done :
334- _log .debug ("[exec] done, breaking" )
335286 break
336- except websockets .ConnectionClosed as exc :
337- _log .debug ("[exec] ConnectionClosed: %s" , exc )
338- _log .debug ("[exec] returning exit_code=%d" , exit_code )
287+ except websockets .ConnectionClosed :
288+ pass
339289 return exit_code
0 commit comments