Skip to content
Open
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
55 changes: 51 additions & 4 deletions runner/src/runner/live/process/process.py
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,9 @@ def is_alive(self):
async def stop(self):
self.done.set()

# Save PID before any cleanup — process.pid may become None after join
child_pid = self.process.pid

is_terminating = False
if not self.is_alive():
logging.info("Process already not alive")
Expand All @@ -90,10 +93,43 @@ async def stop(self):
logging.error("Failed to terminate process, killing")
self.process.kill()
if not await self._wait_stop(2):
logging.error(
f"Failed to kill process self_pid={os.getpid()} child_pid={self.process.pid} is_alive={self.process.is_alive()}"
)
raise RuntimeError("Failed to kill process")
# SIGKILL to the process PID failed (likely D-state or grandchildren holding
# resources in the process group). Try killing the entire process group so any
# grandchild processes spawned by the pipeline (e.g. ComfyUI workers) are also
# reaped. This works because process_loop() calls os.setsid() on startup, making
# the child the leader of its own process group.
killed_via_pgid = False
if child_pid is not None:
try:
pgid = os.getpgid(child_pid)
logging.warning(
f"Attempting process group kill: pgid={pgid} child_pid={child_pid}"
)
os.killpg(pgid, signal.SIGKILL)
if await self._wait_stop(3):
logging.info("Process group kill succeeded")
killed_via_pgid = True
else:
logging.error("Process group kill did not terminate process within 3s")
except (ProcessLookupError, PermissionError, OSError) as e:
logging.warning(f"Process group kill failed: {e}")

if not killed_via_pgid:
# Log /proc status to diagnose D-state or zombie
if child_pid is not None and is_linux:
try:
with open(f"/proc/{child_pid}/status") as f:
proc_status = f.read()
logging.error(
f"Process /proc/{child_pid}/status at kill failure:\n{proc_status}"
)
except Exception:
pass

logging.error(
f"Failed to kill process self_pid={os.getpid()} child_pid={child_pid} is_alive={self.process.is_alive()}"
)
raise RuntimeError("Failed to kill process")

logging.info("Pipeline process cleanup complete")

Expand Down Expand Up @@ -179,6 +215,17 @@ def get_recent_logs(self, n=None) -> list[str]:
return logs[-n:] if n is not None else logs # Only limit if n is specified

def process_loop(self):
# Become the leader of a new session and process group so that a subsequent
# os.killpg() from the parent can reach all grandchildren (e.g. ComfyUI worker
# subprocesses). This is called before any subprocess is spawned so every
# descendant inherits the new PGID unless it explicitly calls setsid() itself.
if is_unix:
try:
os.setsid()
except OSError:
# Already a process group leader (shouldn't happen in spawn mode, but safe)
pass

_setup_signal_handlers(self.done)
_setup_parent_death_signal()
_start_parent_watchdog(self.done)
Expand Down
Loading