From 9d5157474df8d3447fa20b62085c3afc84631f8d Mon Sep 17 00:00:00 2001 From: Brandon Cui Date: Tue, 10 Mar 2026 14:45:12 -0700 Subject: [PATCH 1/4] Add NVCF/DesktopEnv backend for CUA data collection Cherry-picked from bcui/init_nvcf_data: - debug_env_controller.py: rewritten to use OSWorld DesktopEnv - module_data_collector.py: DesktopEnv integration + NVCF pre-download - parallel_collect_trajectories.py: runtime->env, NVCF support - cleanup_nvcf.py: utility to list/cleanup NVCF functions --- cua/cleanup_nvcf.py | 112 +++++++++++ cua/modules/debug_env_controller.py | 266 +++++++++++++++++---------- cua/modules/module_data_collector.py | 75 +++++--- cua/parallel_collect_trajectories.py | 53 +++--- 4 files changed, 367 insertions(+), 139 deletions(-) create mode 100644 cua/cleanup_nvcf.py diff --git a/cua/cleanup_nvcf.py b/cua/cleanup_nvcf.py new file mode 100644 index 000000000..447298c9a --- /dev/null +++ b/cua/cleanup_nvcf.py @@ -0,0 +1,112 @@ +#!/usr/bin/env python3 +"""List and clean up NVCF functions. + +Usage: + # List all functions in the org: + python cleanup_nvcf.py --list + + # Undeploy and delete only YOUR pool functions (nvcf-pool-*): + python cleanup_nvcf.py --cleanup + + # Undeploy and delete ALL functions (careful — includes other users'): + python cleanup_nvcf.py --cleanup --all + + # Undeploy and delete a specific function: + python cleanup_nvcf.py --delete FUNCTION_ID VERSION_ID +""" +import argparse +import os +import sys + +sys.path.insert(0, "/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server") + +from openhands.nvidia.os_world.nvcf import OSWorldDeployer + +POOL_NAME_PREFIX = "nvcf-pool-" + + +def main(): + parser = argparse.ArgumentParser(description="NVCF function cleanup utility") + parser.add_argument("--list", action="store_true", help="List all functions in the org") + parser.add_argument("--cleanup", action="store_true", + help="Undeploy and delete pool functions (nvcf-pool-* only, unless --all)") + parser.add_argument("--all", action="store_true", + help="With --cleanup: delete ALL functions, not just pool ones") + parser.add_argument("--delete", nargs=2, metavar=("FUNC_ID", "VER_ID"), + help="Delete a specific function") + args = parser.parse_args() + + api_key = os.environ.get("NGC_API_KEY") + org = os.environ.get("NGC_ORG") + if not api_key or not org: + print("ERROR: Set NGC_API_KEY and NGC_ORG environment variables") + sys.exit(1) + + deployer = OSWorldDeployer(api_key=api_key, org_name=org) + + if args.list or (not args.cleanup and not args.delete): + print("Listing all private NVCF functions in org...\n") + result = deployer.list_functions() + functions = result.get("functions", []) + if not functions: + print("No functions found.") + return + for fn in functions: + fn_id = fn.get("id", "?") + name = fn.get("name", "?") + status = fn.get("status", "?") + ver_id = fn.get("versionId", "?") + mine = " <-- pool" if name.startswith(POOL_NAME_PREFIX) else "" + print(f" {name:40s} status={status:10s} fn={fn_id} ver={ver_id}{mine}") + print(f"\nTotal: {len(functions)} functions") + + if args.delete: + fn_id, ver_id = args.delete + print(f"Undeploying {fn_id}...") + try: + deployer.undeploy(fn_id, ver_id, graceful=True) + print("Undeployed. Deleting...") + except Exception as e: + print(f"Undeploy failed (may already be undeployed): {e}") + try: + deployer.delete_function(fn_id, ver_id) + print("Deleted.") + except Exception as e: + print(f"Delete failed: {e}") + + if args.cleanup: + result = deployer.list_functions() + functions = result.get("functions", []) + + if not args.all: + # Only clean up pool functions + functions = [f for f in functions if f.get("name", "").startswith(POOL_NAME_PREFIX)] + print(f"Cleaning up {len(functions)} pool functions (nvcf-pool-*)...\n") + else: + print(f"Cleaning up ALL {len(functions)} functions...\n") + + if not functions: + print("Nothing to clean up.") + return + + for fn in functions: + fn_id = fn.get("id", "?") + ver_id = fn.get("versionId", "?") + name = fn.get("name", "?") + status = fn.get("status", "?") + print(f" Cleaning up: {name} ({fn_id}) status={status}") + try: + deployer.undeploy(fn_id, ver_id, graceful=True) + print(f" Undeployed") + except Exception as e: + print(f" Undeploy skipped: {e}") + try: + deployer.delete_function(fn_id, ver_id) + print(f" Deleted") + except Exception as e: + print(f" Delete failed: {e}") + print(f"\nDone. Cleaned up {len(functions)} functions.") + + +if __name__ == "__main__": + main() diff --git a/cua/modules/debug_env_controller.py b/cua/modules/debug_env_controller.py index 71c113fb4..5bf9ab85e 100644 --- a/cua/modules/debug_env_controller.py +++ b/cua/modules/debug_env_controller.py @@ -1,123 +1,203 @@ import logging +import os +import sys import re -from typing import Dict, Tuple - -import ipdb - -from examples.setup import SetupController -from openhands.core.config import OpenHandsConfig -from openhands.events import EventStream -from openhands.events.action.os import OSWorldInteractiveAction -from openhands.events.observation import ErrorObservation -from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime -from openhands.storage import get_file_store +import uuid +import time +import threading +import requests +from typing import Dict, List, Optional, Tuple + +# Ensure OSWorld is importable +_osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" +if _osworld_path not in sys.path: + sys.path.insert(0, _osworld_path) + +from desktop_env.desktop_env import DesktopEnv from openhands.core.logger import openhands_logger # Create a child logger logger = openhands_logger.getChild('env_controller') logger.setLevel(logging.DEBUG) +# Semaphore to limit concurrent downloads (shared with setup.py via env var) +_DOWNLOAD_SEMAPHORE = threading.Semaphore(int(os.environ.get('OSWORLD_MAX_CONCURRENT_DOWNLOADS', '3'))) + class EnvController: """ - Static Wrapper class that interfaces with OSWorldSingularityRuntime. + Static wrapper class that interfaces with OSWorld's DesktopEnv. + Replaces the previous OpenHands runtime-based approach with OSWorld's + native DesktopEnv + NVCFProvider for NVCF deployments. """ + @staticmethod - async def initialize_runtime(job_id: str, vm_image_path: str, os_type: str, - osworld_setup: Dict) -> OSWorldSingularityRuntime: + def pre_download_setup_files(osworld_setup: Dict, cache_dir: str = "/tmp/osworld_cache") -> bool: """ - Initialize OSWorldSingularityRuntime. - Used by DataCollector._init_worker to boot up the VM. + Pre-download all setup files to local cache BEFORE deploying NVCF. + This avoids wasting NVCF resources if downloads fail (e.g., HF 429 errors). + + Returns True if all downloads succeeded, False otherwise. """ - config = OpenHandsConfig() - config.runtime = "osworld" - config.sandbox.base_container_image = "ubuntu:24.04" - config.sandbox.run_as_fakeroot = False - config.sandbox.runtime_container_image = None # Trigger auto-build - - # Unique event stream per trajectory - file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') - event_stream = EventStream(sid=job_id, file_store=file_store) - - logger.debug(f"[initialize_runtime] Creating runtime for {job_id}") - logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") - logger.debug(f"[initialize_runtime] Base image: {config.sandbox.base_container_image}") - - runtime = OSWorldSingularityRuntime( - config=config, - event_stream=event_stream, - sid=job_id, - os_type=os_type, - vm_image_path=vm_image_path, - attach_to_existing=False, - ) + config_list = osworld_setup.get("config", []) + if not config_list: + return True + + os.makedirs(cache_dir, exist_ok=True) + + # Build headers with HF token if available + dl_headers = {} + hf_token = os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') + if hf_token: + dl_headers['Authorization'] = f'Bearer {hf_token}' + + for cfg in config_list: + if cfg.get("type") != "download": + continue + + files = cfg.get("parameters", {}).get("files", []) + for f in files: + url = f.get("url", "") + path = f.get("path", "") + if not url or not path: + continue + + cache_path = os.path.join(cache_dir, "{:}_{:}".format( + uuid.uuid5(uuid.NAMESPACE_URL, url), + os.path.basename(path))) + + if os.path.exists(cache_path): + logger.info(f"[pre_download] Cache hit: {cache_path}") + continue + + logger.info(f"[pre_download] Downloading {url} to cache...") + max_retries = 8 + downloaded = False + last_error = None + + with _DOWNLOAD_SEMAPHORE: + for i in range(max_retries): + try: + backoff = min(2 ** i + 1, 60) + if i > 0: + logger.info(f"[pre_download] Waiting {backoff}s before retry {i+1}/{max_retries}") + time.sleep(backoff) + + response = requests.get(url, stream=True, timeout=300, headers=dl_headers) + response.raise_for_status() + + downloaded_size = 0 + with open(cache_path, 'wb') as fh: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + fh.write(chunk) + downloaded_size += len(chunk) + + logger.info(f"[pre_download] Downloaded {downloaded_size / (1024*1024):.2f} MB to {cache_path}") + downloaded = True + break + + except requests.RequestException as e: + last_error = e + logger.warning(f"[pre_download] Failed {url}: {e} ({max_retries - i - 1} retries left)") + if os.path.exists(cache_path): + os.remove(cache_path) + + if not downloaded: + logger.error(f"[pre_download] All retries exhausted for {url}. Last error: {last_error}") + return False + + return True - logger.debug(f"[initialize_runtime] Runtime object created, connecting to VM...") - - await runtime.connect() - logger.debug(f"[initialize_runtime] ✓ Runtime initialized and connected for {job_id}") - logger.debug(f"[initialize_runtime] VM URL: {runtime.osworld_vm_url if hasattr(runtime, 'osworld_vm_url') else 'N/A'}") - - if osworld_setup and os_type == "linux": - logger.debug(f"[initialize_runtime] Setting up OSWorld...") - logger.debug(f"[initialize_runtime OSWorld Setup: {osworld_setup}") - setup_controller = SetupController( - vm_ip="127.0.0.1", - server_port=runtime._vm_server_port, - chromium_port=runtime._chromium_port, - cache_dir="/tmp/osworld_example", # might need to be changed to a unique directory for each job - client_password="password", - runtime=runtime - ) - await setup_controller.setup(osworld_setup['config']) - logger.debug(f"[initialize_runtime] ✓ OSWorld setup completed") - else: - logger.debug(f"[initialize_runtime] No OSWorld setup provided") + @staticmethod + async def initialize_runtime( + job_id: str, + vm_image_path: str, + os_type: str, + osworld_setup: Dict, + runtime_type: str = "singularity", + nvcf_function_id: Optional[str] = None, + nvcf_version_id: Optional[str] = None, + nvcf_api_key: Optional[str] = None, + nvcf_org: Optional[str] = None, + ): + """ + Initialize runtime using OSWorld's DesktopEnv. - return runtime + For NVCF runtime: creates DesktopEnv(provider_name='nvcf') which + auto-deploys an NVCF function and starts a local proxy. - @staticmethod - def execute_pyautogui_command(runtime: OSWorldSingularityRuntime, pyautogui_command: str): - pyautogui_action = OSWorldInteractiveAction( - method="execute_python_command", - params={ - "command": pyautogui_command, - } + For singularity runtime: creates DesktopEnv(provider_name='singularity') + which uses the local KVM-based approach. + """ + logger.debug(f"[initialize_runtime] Creating {runtime_type} DesktopEnv for {job_id}") + + if runtime_type == "nvcf": + # Set env vars that OSWorld's NVCFProvider reads + if nvcf_api_key: + os.environ.setdefault("NGC_API_KEY", nvcf_api_key) + if nvcf_org: + os.environ.setdefault("NGC_ORG", nvcf_org) + if nvcf_function_id: + os.environ["NVCF_FUNCTION_ID"] = nvcf_function_id + if nvcf_version_id: + os.environ["NVCF_VERSION_ID"] = nvcf_version_id + + provider_name = "nvcf" + else: + provider_name = "singularity" + + env = DesktopEnv( + provider_name=provider_name, + path_to_vm=vm_image_path if runtime_type != "nvcf" else "", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, ) - result = runtime.run_action(pyautogui_action) - if not isinstance(result, ErrorObservation): - logger.debug("[execute_pyautogui_command] ✓ Action complete") - else: - logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") + logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") - @staticmethod - def get_screen_size(runtime: OSWorldSingularityRuntime) -> Tuple[int, int]: - observation = runtime.run_action(OSWorldInteractiveAction( - method="get_vm_screen_size", - params={}, - thought="" - )) + # DesktopEnv.reset() handles: start emulator, NVCF deploy, proxy, snapshot revert, setup + env.reset(task_config=osworld_setup) - assert hasattr(observation, "content"), "get_screen_size failed." + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") - match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) - width, height = int(match.group(1)), int(match.group(2)) + return env + + @staticmethod + def execute_pyautogui_command(env, pyautogui_command: str): + """Execute a pyautogui command on the remote VM via OSWorld's PythonController.""" + try: + env.controller.execute_python_command(pyautogui_command) + logger.debug("[execute_pyautogui_command] Action complete") + except Exception as e: + logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") - return width, height + @staticmethod + def get_screen_size(env) -> Tuple[int, int]: + """Get the screen size of the remote VM.""" + try: + size = env.controller.get_vm_screen_size() + if isinstance(size, tuple) and len(size) == 2: + return size + # Fallback: parse from string if needed + if isinstance(size, str): + match = re.search(r"(\d+)\D+(\d+)", size) + if match: + return int(match.group(1)), int(match.group(2)) + except Exception as e: + logger.warning(f"[get_screen_size] Failed: {e}, using defaults") + + return env.screen_width, env.screen_height @staticmethod - def get_screenshot(runtime: OSWorldSingularityRuntime) -> bytes: + def get_screenshot(env) -> bytes: """ - Returns the current screenshot from the runtime, in base64 format. - If screenshot_path is set, save the screenshot as png. + Returns the current screenshot from the DesktopEnv as bytes. """ - screenshot = runtime.get_vm_screenshot() + screenshot = env.controller.get_screenshot() if not screenshot: - logger.debug("✗ Failed to get screenshot from runtime.") - raise RuntimeError("Failed to get screenshot from runtime.") - + logger.debug("Failed to get screenshot from DesktopEnv.") + raise RuntimeError("Failed to get screenshot from DesktopEnv.") return screenshot - - - diff --git a/cua/modules/module_data_collector.py b/cua/modules/module_data_collector.py index eb9233493..2c3d75772 100644 --- a/cua/modules/module_data_collector.py +++ b/cua/modules/module_data_collector.py @@ -12,8 +12,6 @@ from pathlib import Path from typing import Optional, Dict, Any, Tuple -import ipdb - from modules.actors.debug_uitars_actor import UITarsActor from modules.debug_planner import Planner from modules.debug_env_controller import EnvController @@ -42,6 +40,13 @@ def __init__(self, args: Namespace): self.vm_image_path = args.vm_image_path self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows' + # Runtime type: "singularity" (local KVM) or "nvcf" (NVCF via OSWorld DesktopEnv) + self.runtime_type = getattr(args, 'runtime', 'singularity') + + # NVCF credentials (passed via env vars to OSWorld's NVCFProvider) + self.nvcf_api_key = getattr(args, 'nvcf_api_key', None) + self.nvcf_org = getattr(args, 'nvcf_org', None) + self.max_steps_per_trajectory = args.max_steps_per_trajectory self.max_steps_per_goal = args.max_steps_per_goal @@ -117,10 +122,15 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path): logger.debug(f"✓ [save_trajectory] Saved to {str(trajectory_save_dir / 'trajectory.json')}") - async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: + async def init_runtime_for_job(self, trajectory_idx: int, + nvcf_function_id: str = None, + nvcf_version_id: str = None) -> Tuple: """ Stage 1: Initialize the VM and OSWorld setup. - Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) + Returns: (env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) + + Uses OSWorld's DesktopEnv which handles NVCF deploy, local proxy, + and environment setup internally. """ # Create unique IDs job_id = f"job_{trajectory_idx:04d}" @@ -140,13 +150,35 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: else: osworld_setup_ready = True - # Initialize Runtime (Async) - runtime = await EnvController.initialize_runtime( - job_id, self.vm_image_path, self.os_type, osworld_setup + logger.info(f"[job {trajectory_idx:04d}] Sampled OSWorld config: id={osworld_setup.get('id', 'unknown')}, " + f"snapshot={osworld_setup.get('snapshot', 'unknown')}, " + f"apps={osworld_setup.get('related_apps', [])}, " + f"instruction={osworld_setup.get('instruction', '')[:80]}") + + # Pre-download setup files to local cache BEFORE deploying NVCF. + # This avoids wasting expensive NVCF GPU resources if downloads fail. + if self.runtime_type == "nvcf": + logger.info(f"[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...") + download_ok = EnvController.pre_download_setup_files(osworld_setup) + if not download_ok: + raise RuntimeError( + f"[job {trajectory_idx:04d}] Setup file pre-download failed. " + f"Skipping NVCF deploy to avoid wasting resources." + ) + logger.info(f"[job {trajectory_idx:04d}] Pre-download complete, proceeding with NVCF deploy.") + + # Initialize DesktopEnv (handles NVCF deploy + proxy + setup internally) + env = await EnvController.initialize_runtime( + job_id, self.vm_image_path, self.os_type, osworld_setup, + runtime_type=self.runtime_type, + nvcf_function_id=nvcf_function_id, + nvcf_version_id=nvcf_version_id, + nvcf_api_key=self.nvcf_api_key, + nvcf_org=self.nvcf_org, ) # Get screen size - width, height = EnvController.get_screen_size(runtime) + width, height = EnvController.get_screen_size(env) # Prepare Metadata trajectory = { @@ -160,25 +192,24 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: 'steps': [], } - return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup + return env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup - async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict): + async def collect_trajectory(self, env, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict): """ Stage 2: Run the Agent Loop (Goal Generation -> Action Execution). + `env` is an OSWorld DesktopEnv instance. """ # Wait for UI initialization time.sleep(3.0) # Initial Screenshot - screenshot_bytes = EnvController.get_screenshot(runtime) + screenshot_bytes = EnvController.get_screenshot(env) image_filename = trajectory_save_dir / f"0-0.png" save_image(screenshot_bytes, image_filename, logger) # --- 1. Generate High Level Goal --- # - # todo implement the verification mechanism for goal achievability using requirements - # generate goal in a separate loop - prev_requirements = [] # will be a list of tuple [("condition 1", "verdict 1"), ...] - example_goals = random.sample(self.example_instructions, 1) # for now, we sample 1 example goal + prev_requirements = [] + example_goals = random.sample(self.example_instructions, 1) goal, requirements = self.planner.generate_goal_with_long_horizon( screenshot_bytes, osworld_setup["config"], example_goals, prev_requirements, ) @@ -231,21 +262,19 @@ async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_di ) if action_result is None: - # UI-TARS action generation failed (failed to meet the requirement) - # in this case, save only up to the current trajectory break pyautogui_command = action_result["pyautogui_command"] action_generation = action_result["action_generation"] # Execute - EnvController.execute_pyautogui_command(runtime, pyautogui_command) + EnvController.execute_pyautogui_command(env, pyautogui_command) # Wait & Observe time.sleep(3.0) # Capture new state - screenshot_bytes = EnvController.get_screenshot(runtime) + screenshot_bytes = EnvController.get_screenshot(env) # Save step info action_idx = len(step_for_this_subgoal['actions']) @@ -276,11 +305,11 @@ async def single_trajectory_job(self, trajectory_idx: int): Simply chains the two stages sequentially in the main thread. """ # 1. Init - runtime, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx) + env, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx) try: # 2. Collect - await self.collect_trajectory(runtime, trajectory_data, save_dir, setup) + await self.collect_trajectory(env, trajectory_data, save_dir, setup) finally: - # Cleanup for debug mode - runtime.close() + # Cleanup + env.close() diff --git a/cua/parallel_collect_trajectories.py b/cua/parallel_collect_trajectories.py index 53f91753f..da5a0de87 100644 --- a/cua/parallel_collect_trajectories.py +++ b/cua/parallel_collect_trajectories.py @@ -11,7 +11,7 @@ from openhands.core.logger import openhands_logger # Configure logging -openhands_logger.setLevel(logging.WARNING) +openhands_logger.setLevel(logging.DEBUG) logger = openhands_logger.getChild('parallel_collector') logger.setLevel(logging.INFO) @@ -35,7 +35,7 @@ def __init__(self, index: int): self.error: Optional[str] = None # Runtime objects (populated during execution) - self.runtime: Any = None + self.env: Any = None # OSWorld DesktopEnv instance self.trajectory_data: Optional[Dict] = None self.save_dir: Any = None self.osworld_setup: Any = None @@ -46,6 +46,7 @@ def __init__(self, args, data_collector: DataCollector): self.data_collector = data_collector self.max_parallel = args.max_parallel self.max_trajectories = args.max_trajectories + self.runtime_type = getattr(args, 'runtime', 'singularity') # Queues self.init_queue: queue.Queue = queue.Queue() @@ -65,9 +66,10 @@ def __init__(self, args, data_collector: DataCollector): self._server_running = False # For sequential VM start-ups to mitigate boot storm + # NVCF uses pre-deployed VMs so no boot storm delay needed self._launch_lock = threading.Lock() self._last_launch_time = 0 - self._launch_delay_seconds = 15.0 # Wait 15s between starts + self._launch_delay_seconds = 0.0 if self.runtime_type == "nvcf" else 15.0 def start_workers(self): self._server_running = True @@ -112,23 +114,16 @@ async def _init_worker(self, worker_id: int): job_details = self.jobs[job_idx] # Wait for available runtime slot - # logger.debug(f"[init-{worker_id}] Waiting for slot for job {job_idx}") await asyncio.to_thread(self._runtime_semaphore.acquire) # Rate Limit Logic: Prevent Boot Storm wait_time = 0.0 with self._launch_lock: now = time.time() - # The earliest this worker can start is either NOW, - # or 15s after the last scheduled launch. target_start_time = max(now, self._last_launch_time + self._launch_delay_seconds) - wait_time = target_start_time - now - - # Reserve this slot by updating the global timestamp immediately self._last_launch_time = target_start_time - # Perform the wait asynchronously (outside the lock) if wait_time > 0: if wait_time > 1.0: logger.info(f"[init-{worker_id}] Delayed boot-up: waiting {wait_time:.1f}s...") @@ -141,14 +136,14 @@ async def _init_worker(self, worker_id: int): try: # --- call init_runtime_for_job --- # - # This creates the runtime and runs setup - runtime, traj_data, save_dir, traj_id, setup = \ + # This creates the DesktopEnv, deploys NVCF (if needed), and runs setup + env, traj_data, save_dir, traj_id, setup = \ await self.data_collector.init_runtime_for_job(job_idx) # Store details in the pre-allocated object - job_details.job_id = traj_id # Using traj_id as primary ID + job_details.job_id = traj_id job_details.trajectory_id = traj_id - job_details.runtime = runtime + job_details.env = env job_details.trajectory_data = traj_data job_details.save_dir = save_dir job_details.osworld_setup = setup @@ -159,8 +154,15 @@ async def _init_worker(self, worker_id: int): except Exception as e: logger.error(f"[init-{worker_id}] Failed setup for job {job_idx}: {e}") job_details.error = str(e) - job_details.completed = False # Failed - job_details.event.set() # Signal main thread we are done (failed) + job_details.completed = False + job_details.event.set() + + # Close DesktopEnv if it was created + if job_details.env: + try: + job_details.env.close() + except Exception: + pass # Release semaphore immediately on failure self._runtime_semaphore.release() @@ -184,7 +186,7 @@ async def _collect_worker(self, worker_id: int): # --- call collect_trajectory --- # This runs the Planner/Actor loop await self.data_collector.collect_trajectory( - job_details.runtime, + job_details.env, job_details.trajectory_data, job_details.save_dir, job_details.osworld_setup @@ -196,10 +198,9 @@ async def _collect_worker(self, worker_id: int): logger.error(f"[collect-{worker_id}] Error in {traj_id}: {e}") job_details.error = str(e) finally: - # Cleanup Runtime - if job_details.runtime: - # Run close in background thread to not block loop - threading.Thread(target=job_details.runtime.close, daemon=True).start() + # Cleanup DesktopEnv (closes NVCF proxy, undeploys function) + if job_details.env: + threading.Thread(target=job_details.env.close, daemon=True).start() # Release semaphore (allows new Init worker to proceed) self._runtime_semaphore.release() @@ -269,6 +270,10 @@ def parse_args(): parser.add_argument("--planner_node", type=str, required=True) parser.add_argument("--actor_node", type=str, required=True) + # Runtime selection + parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf"], default="singularity", + help="Runtime backend: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)") + # Environment & Setup parser.add_argument("--vm_image_path", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server/OS_images/Ubuntu.qcow2") @@ -295,8 +300,7 @@ def parse_args(): # Parallel specific args parser.add_argument("--max_parallel", type=int, default=24, help="Max concurrent VMs") parser.add_argument( - "--max_trajectories", type=int, default=10000, help="Total trajectories to generate" - ) + "--max_trajectories", type=int, default=10000, help="Total trajectories to generate") return parser.parse_args() @@ -309,6 +313,9 @@ async def main(): logger.info("DataCollector initialized (datasets loaded)") # 2. Start Parallel Generator + # Each worker's DesktopEnv manages its own NVCF function lifecycle + # (deploy, local proxy, health monitoring, undeploy on close) + # No centralized NVCFPool needed - OSWorld's NVCFProvider handles everything. generator = ParallelTrajectoryGenerator(args, data_collector) await generator.run() From 9dad613d8f2f466845b65cf563727b4e0c8f1764 Mon Sep 17 00:00:00 2001 From: Brandon Cui Date: Wed, 11 Mar 2026 09:19:42 -0700 Subject: [PATCH 2/4] Add dual-backend (singularity+NVCF) support for Kimi data collection - Rewrite env controllers to support both singularity and NVCF backends via duck-type dispatch (_is_desktop_env) - Add --runtime flag to parallel_collect_kimi.py - Add NVCF launcher scripts (run_parallel_kimi_nvcf.sh, run_collector_kimi_nvcf.sh) - Add collect_trajectories_sbatch.sh from bcui/init_nvcf_data - Set OSWORLD_SETUP_CACHE_DIR and NVCF_FUNCTION_NAME_PREFIX env vars - Install all required Python deps (openai, Pillow, jsonlines, pandas, requests) with --break-system-packages for PEP 668 containers - Remove ipdb import from data_collector.py Co-Authored-By: Claude Opus 4.6 --- cua/cleanup_nvcf.py | 2 +- cua/modules/debug_env_controller.py | 209 ++++++++---- cua/modules_kimi/data_collector.py | 28 +- cua/modules_kimi/env_controller.py | 333 +++++++++++++++----- cua/parallel_collect_kimi.py | 13 +- cua/scripts/collect_trajectories_sbatch.sh | 253 +++++++++++++++ cua/scripts/kimi/run_collector_kimi_nvcf.sh | 168 ++++++++++ cua/scripts/kimi/run_parallel_kimi_nvcf.sh | 210 ++++++++++++ 8 files changed, 1057 insertions(+), 159 deletions(-) create mode 100755 cua/scripts/collect_trajectories_sbatch.sh create mode 100755 cua/scripts/kimi/run_collector_kimi_nvcf.sh create mode 100755 cua/scripts/kimi/run_parallel_kimi_nvcf.sh diff --git a/cua/cleanup_nvcf.py b/cua/cleanup_nvcf.py index 447298c9a..9af03afec 100644 --- a/cua/cleanup_nvcf.py +++ b/cua/cleanup_nvcf.py @@ -22,7 +22,7 @@ from openhands.nvidia.os_world.nvcf import OSWorldDeployer -POOL_NAME_PREFIX = "nvcf-pool-" +POOL_NAME_PREFIX = "data-collection-" def main(): diff --git a/cua/modules/debug_env_controller.py b/cua/modules/debug_env_controller.py index 5bf9ab85e..e37f320a2 100644 --- a/cua/modules/debug_env_controller.py +++ b/cua/modules/debug_env_controller.py @@ -8,12 +8,6 @@ import requests from typing import Dict, List, Optional, Tuple -# Ensure OSWorld is importable -_osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" -if _osworld_path not in sys.path: - sys.path.insert(0, _osworld_path) - -from desktop_env.desktop_env import DesktopEnv from openhands.core.logger import openhands_logger # Create a child logger @@ -24,11 +18,16 @@ _DOWNLOAD_SEMAPHORE = threading.Semaphore(int(os.environ.get('OSWORLD_MAX_CONCURRENT_DOWNLOADS', '3'))) +def _is_desktop_env(env_or_runtime) -> bool: + """Check if the object is a DesktopEnv instance (duck-type check to avoid hard import).""" + return hasattr(env_or_runtime, 'controller') and hasattr(env_or_runtime, 'reset') + + class EnvController: """ - Static wrapper class that interfaces with OSWorld's DesktopEnv. - Replaces the previous OpenHands runtime-based approach with OSWorld's - native DesktopEnv + NVCFProvider for NVCF deployments. + Static wrapper class that interfaces with either: + - OSWorldSingularityRuntime (runtime_type='singularity') + - OSWorld DesktopEnv (runtime_type='nvcf') """ @staticmethod @@ -122,82 +121,158 @@ async def initialize_runtime( nvcf_org: Optional[str] = None, ): """ - Initialize runtime using OSWorld's DesktopEnv. - - For NVCF runtime: creates DesktopEnv(provider_name='nvcf') which - auto-deploys an NVCF function and starts a local proxy. + Initialize runtime. - For singularity runtime: creates DesktopEnv(provider_name='singularity') - which uses the local KVM-based approach. + runtime_type='singularity': uses OSWorldSingularityRuntime (local KVM). + runtime_type='nvcf': uses OSWorld DesktopEnv with NVCFProvider. """ - logger.debug(f"[initialize_runtime] Creating {runtime_type} DesktopEnv for {job_id}") + logger.debug(f"[initialize_runtime] Creating {runtime_type} runtime for {job_id}") if runtime_type == "nvcf": - # Set env vars that OSWorld's NVCFProvider reads + # Lazy import DesktopEnv only when needed + _osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" + if _osworld_path not in sys.path: + sys.path.insert(0, _osworld_path) + from desktop_env.desktop_env import DesktopEnv + if nvcf_api_key: os.environ.setdefault("NGC_API_KEY", nvcf_api_key) if nvcf_org: os.environ.setdefault("NGC_ORG", nvcf_org) if nvcf_function_id: os.environ["NVCF_FUNCTION_ID"] = nvcf_function_id + # Set function name prefix for NVCF deployments + os.environ.setdefault("OSWORLD_SETUP_CACHE_DIR", "/tmp/osworld_cache") + fn_prefix = os.environ.get("NVCF_FUNCTION_NAME_PREFIX", "data-collection") + os.environ.setdefault("NVCF_FUNCTION_NAME_PREFIX", fn_prefix) + if nvcf_version_id: os.environ["NVCF_VERSION_ID"] = nvcf_version_id - provider_name = "nvcf" - else: - provider_name = "singularity" - - env = DesktopEnv( - provider_name=provider_name, - path_to_vm=vm_image_path if runtime_type != "nvcf" else "", - action_space="pyautogui", - headless=True, - os_type="Ubuntu" if os_type == "linux" else os_type, - require_a11y_tree=False, - ) + env = DesktopEnv( + provider_name="nvcf", + path_to_vm="", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, + ) - logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") + logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") + env.reset(task_config=osworld_setup) + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") + return env - # DesktopEnv.reset() handles: start emulator, NVCF deploy, proxy, snapshot revert, setup - env.reset(task_config=osworld_setup) - - logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") - - return env + else: + # Singularity backend + from examples.setup import SetupController + from openhands.core.config import OpenHandsConfig + from openhands.events import EventStream + from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime + from openhands.storage import get_file_store + + config = OpenHandsConfig() + config.runtime = "osworld" + config.sandbox.base_container_image = "ubuntu:24.04" + config.sandbox.run_as_fakeroot = False + config.sandbox.runtime_container_image = None + + file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') + event_stream = EventStream(sid=job_id, file_store=file_store) + + logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") + + runtime = OSWorldSingularityRuntime( + config=config, + event_stream=event_stream, + sid=job_id, + os_type=os_type, + vm_image_path=vm_image_path, + attach_to_existing=False, + ) + + await runtime.connect() + logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") + + if osworld_setup and os_type == "linux" and osworld_setup.get('config'): + logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") + setup_controller = SetupController( + vm_ip="127.0.0.1", + server_port=runtime._vm_server_port, + chromium_port=runtime._chromium_port, + cache_dir="/tmp/osworld_example", + client_password="password", + runtime=runtime + ) + try: + await setup_controller.setup(osworld_setup['config']) + logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") + except Exception as e: + logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") + raise + else: + logger.debug(f"[initialize_runtime] No OSWorld setup provided") + + return runtime @staticmethod - def execute_pyautogui_command(env, pyautogui_command: str): - """Execute a pyautogui command on the remote VM via OSWorld's PythonController.""" - try: - env.controller.execute_python_command(pyautogui_command) - logger.debug("[execute_pyautogui_command] Action complete") - except Exception as e: - logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") + def execute_pyautogui_command(env_or_runtime, pyautogui_command: str): + """Execute a pyautogui command. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + env_or_runtime.controller.execute_python_command(pyautogui_command) + logger.debug("[execute_pyautogui_command] Action complete") + except Exception as e: + logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") + else: + from openhands.events.action.os import OSWorldInteractiveAction + from openhands.events.observation import ErrorObservation + action = OSWorldInteractiveAction( + method="execute_python_command", + params={"command": pyautogui_command}, + ) + result = env_or_runtime.run_action(action) + if not isinstance(result, ErrorObservation): + logger.debug("[execute_pyautogui_command] Action complete") + else: + logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") @staticmethod - def get_screen_size(env) -> Tuple[int, int]: - """Get the screen size of the remote VM.""" - try: - size = env.controller.get_vm_screen_size() - if isinstance(size, tuple) and len(size) == 2: - return size - # Fallback: parse from string if needed - if isinstance(size, str): - match = re.search(r"(\d+)\D+(\d+)", size) - if match: - return int(match.group(1)), int(match.group(2)) - except Exception as e: - logger.warning(f"[get_screen_size] Failed: {e}, using defaults") - - return env.screen_width, env.screen_height + def get_screen_size(env_or_runtime) -> Tuple[int, int]: + """Get screen size. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + size = env_or_runtime.controller.get_vm_screen_size() + if isinstance(size, tuple) and len(size) == 2: + return size + if isinstance(size, str): + match = re.search(r"(\d+)\D+(\d+)", size) + if match: + return int(match.group(1)), int(match.group(2)) + except Exception as e: + logger.warning(f"[get_screen_size] Failed: {e}, using defaults") + return env_or_runtime.screen_width, env_or_runtime.screen_height + else: + from openhands.events.action.os import OSWorldInteractiveAction + observation = env_or_runtime.run_action(OSWorldInteractiveAction( + method="get_vm_screen_size", + params={}, + thought="" + )) + assert hasattr(observation, "content"), "get_screen_size failed." + match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) + return int(match.group(1)), int(match.group(2)) @staticmethod - def get_screenshot(env) -> bytes: - """ - Returns the current screenshot from the DesktopEnv as bytes. - """ - screenshot = env.controller.get_screenshot() - if not screenshot: - logger.debug("Failed to get screenshot from DesktopEnv.") - raise RuntimeError("Failed to get screenshot from DesktopEnv.") - return screenshot + def get_screenshot(env_or_runtime) -> bytes: + """Get screenshot. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + screenshot = env_or_runtime.controller.get_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from DesktopEnv.") + return screenshot + else: + screenshot = env_or_runtime.get_vm_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from runtime.") + return screenshot diff --git a/cua/modules_kimi/data_collector.py b/cua/modules_kimi/data_collector.py index d75800bf4..0db547468 100644 --- a/cua/modules_kimi/data_collector.py +++ b/cua/modules_kimi/data_collector.py @@ -16,7 +16,6 @@ from pathlib import Path from typing import Optional, Dict, Any, Tuple -import ipdb from modules_kimi.kimi_actor import KimiActor from modules_kimi.env_controller import EnvController @@ -44,6 +43,9 @@ def __init__(self, args: Namespace): self.vm_image_path = args.vm_image_path self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows' + # Runtime type: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv) + self.runtime_type = getattr(args, 'runtime', 'singularity') + self.max_steps_per_trajectory = args.max_steps_per_trajectory # Output directory: cua/trajectories/kimi/-- @@ -85,7 +87,7 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path): logger.debug(f"Saved trajectory to {trajectory_save_dir / 'trajectory.json'}") - async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: + async def init_runtime_for_job(self, trajectory_idx: int, nvcf_function_id: str = None, nvcf_version_id: str = None) -> Tuple: """ Stage 1: Initialize the VM and OSWorld setup. Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) @@ -109,13 +111,27 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: # else: # osworld_setup_ready = True + # Pre-download setup files before NVCF deploy to avoid wasting GPU resources + if self.runtime_type == 'nvcf': + logger.info(f'[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...') + download_ok = EnvController.pre_download_setup_files(osworld_setup) + if not download_ok: + raise RuntimeError( + f'[job {trajectory_idx:04d}] Setup file pre-download failed. ' + f'Skipping NVCF deploy to avoid wasting resources.' + ) + logger.info(f'[job {trajectory_idx:04d}] Pre-download complete.') + # Initialize runtime - runtime = await EnvController.initialize_runtime( - job_id, self.vm_image_path, self.os_type, osworld_setup + env_or_runtime = await EnvController.initialize_runtime( + job_id, self.vm_image_path, self.os_type, osworld_setup, + runtime_type=self.runtime_type, + nvcf_function_id=nvcf_function_id, + nvcf_version_id=nvcf_version_id, ) # Get screen size - width, height = EnvController.get_screen_size(runtime) + width, height = EnvController.get_screen_size(env_or_runtime) trajectory = { "trajectory_id": trajectory_id, @@ -129,7 +145,7 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: "steps": [], } - return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup + return env_or_runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup async def collect_trajectory( self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict diff --git a/cua/modules_kimi/env_controller.py b/cua/modules_kimi/env_controller.py index fbd8d0594..e83ca9cea 100644 --- a/cua/modules_kimi/env_controller.py +++ b/cua/modules_kimi/env_controller.py @@ -1,108 +1,273 @@ import logging +import os +import sys import re -from typing import Dict, Tuple - -from examples.setup import SetupController -from openhands.core.config import OpenHandsConfig -from openhands.events import EventStream -from openhands.events.action.os import OSWorldInteractiveAction -from openhands.events.observation import ErrorObservation -from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime -from openhands.storage import get_file_store +import uuid +import time +import threading +import requests +from typing import Dict, Optional, Tuple + from openhands.core.logger import openhands_logger logger = openhands_logger.getChild('kimi_env_controller') logger.setLevel(logging.DEBUG) +# Semaphore to limit concurrent downloads +_DOWNLOAD_SEMAPHORE = threading.Semaphore(int(os.environ.get('OSWORLD_MAX_CONCURRENT_DOWNLOADS', '3'))) + + +def _is_desktop_env(env_or_runtime) -> bool: + """Check if the object is a DesktopEnv instance (duck-type check to avoid hard import).""" + return hasattr(env_or_runtime, 'controller') and hasattr(env_or_runtime, 'reset') + class EnvController: """ - Static wrapper class that interfaces with OSWorldSingularityRuntime. - Copied from modules/debug_env_controller.py for self-containment. + Static wrapper class that interfaces with either: + - OSWorldSingularityRuntime (runtime_type='singularity') + - OSWorld DesktopEnv (runtime_type='nvcf') """ @staticmethod - async def initialize_runtime(job_id: str, vm_image_path: str, os_type: str, - osworld_setup: Dict) -> OSWorldSingularityRuntime: - config = OpenHandsConfig() - config.runtime = "osworld" - config.sandbox.base_container_image = "ubuntu:24.04" - config.sandbox.run_as_fakeroot = False - config.sandbox.runtime_container_image = None - - file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') - event_stream = EventStream(sid=job_id, file_store=file_store) - - logger.debug(f"[initialize_runtime] Creating runtime for {job_id}") - logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") - - runtime = OSWorldSingularityRuntime( - config=config, - event_stream=event_stream, - sid=job_id, - os_type=os_type, - vm_image_path=vm_image_path, - attach_to_existing=False, - ) - - await runtime.connect() - logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") - - if osworld_setup and os_type == "linux" and osworld_setup.get('config'): - logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") - setup_controller = SetupController( - vm_ip="127.0.0.1", - server_port=runtime._vm_server_port, - chromium_port=runtime._chromium_port, - cache_dir="/tmp/osworld_example", - client_password="password", - runtime=runtime - ) - try: - await setup_controller.setup(osworld_setup['config']) - logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") - except Exception as e: - logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") - logger.error(f"[initialize_runtime] [{job_id}] Failed config: {osworld_setup.get('config', [])}") - raise - else: - logger.debug(f"[initialize_runtime] No OSWorld setup provided") + def pre_download_setup_files(osworld_setup: Dict, cache_dir: str = "/tmp/osworld_cache") -> bool: + """ + Pre-download all setup files to local cache BEFORE deploying NVCF. + Returns True if all downloads succeeded, False otherwise. + """ + config_list = osworld_setup.get("config", []) + if not config_list: + return True + + os.makedirs(cache_dir, exist_ok=True) + + dl_headers = {} + hf_token = os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') + if hf_token: + dl_headers['Authorization'] = f'Bearer {hf_token}' + + for cfg in config_list: + if cfg.get("type") != "download": + continue + + files = cfg.get("parameters", {}).get("files", []) + for f in files: + url = f.get("url", "") + path = f.get("path", "") + if not url or not path: + continue - return runtime + cache_path = os.path.join(cache_dir, "{:}_{:}".format( + uuid.uuid5(uuid.NAMESPACE_URL, url), + os.path.basename(path))) + + if os.path.exists(cache_path): + logger.info(f"[pre_download] Cache hit: {cache_path}") + continue + + logger.info(f"[pre_download] Downloading {url} to cache...") + max_retries = 8 + downloaded = False + last_error = None + + with _DOWNLOAD_SEMAPHORE: + for i in range(max_retries): + try: + backoff = min(2 ** i + 1, 60) + if i > 0: + logger.info(f"[pre_download] Waiting {backoff}s before retry {i+1}/{max_retries}") + time.sleep(backoff) + + response = requests.get(url, stream=True, timeout=300, headers=dl_headers) + response.raise_for_status() + + downloaded_size = 0 + with open(cache_path, 'wb') as fh: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + fh.write(chunk) + downloaded_size += len(chunk) + + logger.info(f"[pre_download] Downloaded {downloaded_size / (1024*1024):.2f} MB to {cache_path}") + downloaded = True + break + + except requests.RequestException as e: + last_error = e + logger.warning(f"[pre_download] Failed {url}: {e} ({max_retries - i - 1} retries left)") + if os.path.exists(cache_path): + os.remove(cache_path) + + if not downloaded: + logger.error(f"[pre_download] All retries exhausted for {url}. Last error: {last_error}") + return False + + return True @staticmethod - def execute_pyautogui_command(runtime: OSWorldSingularityRuntime, pyautogui_command: str): - pyautogui_action = OSWorldInteractiveAction( - method="execute_python_command", - params={"command": pyautogui_command}, - ) - result = runtime.run_action(pyautogui_action) - - if not isinstance(result, ErrorObservation): - logger.debug("[execute_pyautogui_command] Action complete") + async def initialize_runtime( + job_id: str, + vm_image_path: str, + os_type: str, + osworld_setup: Dict, + runtime_type: str = "singularity", + nvcf_function_id: Optional[str] = None, + nvcf_version_id: Optional[str] = None, + nvcf_api_key: Optional[str] = None, + nvcf_org: Optional[str] = None, + ): + """ + Initialize runtime. + + runtime_type='singularity': uses OSWorldSingularityRuntime (local KVM). + runtime_type='nvcf': uses OSWorld DesktopEnv with NVCFProvider. + """ + logger.debug(f"[initialize_runtime] Creating {runtime_type} runtime for {job_id}") + + if runtime_type == "nvcf": + _osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" + if _osworld_path not in sys.path: + sys.path.insert(0, _osworld_path) + from desktop_env.desktop_env import DesktopEnv + + if nvcf_api_key: + os.environ.setdefault("NGC_API_KEY", nvcf_api_key) + if nvcf_org: + os.environ.setdefault("NGC_ORG", nvcf_org) + if nvcf_function_id: + os.environ["NVCF_FUNCTION_ID"] = nvcf_function_id + # Set function name prefix for NVCF deployments + os.environ.setdefault("OSWORLD_SETUP_CACHE_DIR", "/tmp/osworld_cache") + fn_prefix = os.environ.get("NVCF_FUNCTION_NAME_PREFIX", "data-collection") + os.environ.setdefault("NVCF_FUNCTION_NAME_PREFIX", fn_prefix) + + if nvcf_version_id: + os.environ["NVCF_VERSION_ID"] = nvcf_version_id + + env = DesktopEnv( + provider_name="nvcf", + path_to_vm="", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, + ) + + logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") + env.reset(task_config=osworld_setup) + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") + return env + else: - logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") + # Singularity backend + from examples.setup import SetupController + from openhands.core.config import OpenHandsConfig + from openhands.events import EventStream + from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime + from openhands.storage import get_file_store - @staticmethod - def get_screen_size(runtime: OSWorldSingularityRuntime) -> Tuple[int, int]: - observation = runtime.run_action(OSWorldInteractiveAction( - method="get_vm_screen_size", - params={}, - thought="" - )) + config = OpenHandsConfig() + config.runtime = "osworld" + config.sandbox.base_container_image = "ubuntu:24.04" + config.sandbox.run_as_fakeroot = False + config.sandbox.runtime_container_image = None + + file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') + event_stream = EventStream(sid=job_id, file_store=file_store) + + logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") + + runtime = OSWorldSingularityRuntime( + config=config, + event_stream=event_stream, + sid=job_id, + os_type=os_type, + vm_image_path=vm_image_path, + attach_to_existing=False, + ) - assert hasattr(observation, "content"), "get_screen_size failed." + await runtime.connect() + logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") - match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) - width, height = int(match.group(1)), int(match.group(2)) + if osworld_setup and os_type == "linux" and osworld_setup.get('config'): + logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") + setup_controller = SetupController( + vm_ip="127.0.0.1", + server_port=runtime._vm_server_port, + chromium_port=runtime._chromium_port, + cache_dir="/tmp/osworld_example", + client_password="password", + runtime=runtime + ) + try: + await setup_controller.setup(osworld_setup['config']) + logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") + except Exception as e: + logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") + raise + else: + logger.debug(f"[initialize_runtime] No OSWorld setup provided") - return width, height + return runtime @staticmethod - def get_screenshot(runtime: OSWorldSingularityRuntime) -> bytes: - screenshot = runtime.get_vm_screenshot() - if not screenshot: - logger.debug("Failed to get screenshot from runtime.") - raise RuntimeError("Failed to get screenshot from runtime.") + def execute_pyautogui_command(env_or_runtime, pyautogui_command: str): + """Execute a pyautogui command. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + env_or_runtime.controller.execute_python_command(pyautogui_command) + logger.debug("[execute_pyautogui_command] Action complete") + except Exception as e: + logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") + else: + from openhands.events.action.os import OSWorldInteractiveAction + from openhands.events.observation import ErrorObservation + action = OSWorldInteractiveAction( + method="execute_python_command", + params={"command": pyautogui_command}, + ) + result = env_or_runtime.run_action(action) + if not isinstance(result, ErrorObservation): + logger.debug("[execute_pyautogui_command] Action complete") + else: + logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") - return screenshot + @staticmethod + def get_screen_size(env_or_runtime) -> Tuple[int, int]: + """Get screen size. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + size = env_or_runtime.controller.get_vm_screen_size() + if isinstance(size, tuple) and len(size) == 2: + return size + if isinstance(size, str): + match = re.search(r"(\d+)\D+(\d+)", size) + if match: + return int(match.group(1)), int(match.group(2)) + except Exception as e: + logger.warning(f"[get_screen_size] Failed: {e}, using defaults") + return env_or_runtime.screen_width, env_or_runtime.screen_height + else: + from openhands.events.action.os import OSWorldInteractiveAction + observation = env_or_runtime.run_action(OSWorldInteractiveAction( + method="get_vm_screen_size", + params={}, + thought="" + )) + assert hasattr(observation, "content"), "get_screen_size failed." + match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) + return int(match.group(1)), int(match.group(2)) + + @staticmethod + def get_screenshot(env_or_runtime) -> bytes: + """Get screenshot. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + screenshot = env_or_runtime.controller.get_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from DesktopEnv.") + return screenshot + else: + screenshot = env_or_runtime.get_vm_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from runtime.") + return screenshot diff --git a/cua/parallel_collect_kimi.py b/cua/parallel_collect_kimi.py index b1ec1c29a..a41fe9b24 100644 --- a/cua/parallel_collect_kimi.py +++ b/cua/parallel_collect_kimi.py @@ -59,6 +59,7 @@ def __init__(self, args, data_collector: DataCollector): self.data_collector = data_collector self.max_parallel = args.max_parallel self.max_trajectories = args.max_trajectories + self.runtime_type = getattr(args, 'runtime', 'singularity') # Queues self.init_queue: queue.Queue = queue.Queue() @@ -80,7 +81,7 @@ def __init__(self, args, data_collector: DataCollector): # For sequential VM start-ups to mitigate boot storm self._launch_lock = threading.Lock() self._last_launch_time = 0 - self._launch_delay_seconds = 15.0 # Wait 15s between starts + self._launch_delay_seconds = 0.0 if self.runtime_type == "nvcf" else 15.0 def start_workers(self): self._server_running = True @@ -164,6 +165,12 @@ async def _init_worker(self, worker_id: int): job_details.completed = False job_details.event.set() + if job_details.runtime: + try: + job_details.runtime.close() + except Exception: + pass + self._runtime_semaphore.release() with self._active_runtime_lock: self._active_runtime_count -= 1 @@ -261,6 +268,10 @@ def parse_args(): parser.add_argument("--kimi_model_name", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Kimi-K2.5") + # Runtime selection + parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf"], default="singularity", + help="Runtime backend: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)") + # Environment & Setup parser.add_argument("--vm_image_path", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server/OS_images/Ubuntu.qcow2") diff --git a/cua/scripts/collect_trajectories_sbatch.sh b/cua/scripts/collect_trajectories_sbatch.sh new file mode 100755 index 000000000..208a33768 --- /dev/null +++ b/cua/scripts/collect_trajectories_sbatch.sh @@ -0,0 +1,253 @@ +#!/bin/bash +# ============================================================================ +# 2-Node Batch Trajectory Collection +# ============================================================================ +# Node 0 (Planner): Runs Qwen3-VL-235B vLLM server (tp=8, all 8 GPUs) +# Node 1 (Actor): 2x UI-TARS-1.5-7B vLLM servers (tp=4 each, GPUs 0-3 + 4-7) +# + round-robin load balancer on port 8000 +# + trajectory collection +# +# Usage: sbatch collect_trajectories_sbatch.sh +# ============================================================================ + +#SBATCH --job-name=traj_collect +#SBATCH --account=llmservice_fm_vision +#SBATCH --reservation=sla_res_osworld_agent_vlm +#SBATCH --partition=batch_block1 +#SBATCH --gpus-per-node=8 +#SBATCH --nodes=2 +#SBATCH --ntasks-per-node=1 +#SBATCH --mem=0 +#SBATCH --time=04:00:00 +#SBATCH --exclusive +#SBATCH --output=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/scripts/logs-refactor/traj_collect_%j.out +#SBATCH --error=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/scripts/logs-refactor/traj_collect_%j.err + +set -euo pipefail + +# ============================================================================ +# Configuration +# ============================================================================ +IMAGE="/lustre/fsw/portfolios/nvr/users/bcui/images/cua-vllm-0.13.0.sqsh" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" +PROJECT_DIR="$PROJECT_ROOT/cua" + +PLANNER_MODEL="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Qwen3-VL-235B-A22B-Thinking" +ACTOR_MODEL="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/huggingface_models/UI-TARS-1.5-7B" + +PLANNER_PORT=8000 +ACTOR_PORT=8000 # Load balancer port (what the code talks to) +ACTOR_PORT_1=8001 # Actor replica 1 (GPUs 0-3) +ACTOR_PORT_2=8002 # Actor replica 2 (GPUs 4-7) + +# Trajectory collection settings +MAX_PARALLEL=16 +MAX_TRAJECTORIES=1024 + +# Log file name (timestamped) +TIMESTAMP=$(date +%m-%d-%H%M) +LOG_FILE="$PROJECT_DIR/${TIMESTAMP}-logs.log" + +# ============================================================================ +# Resolve node assignments +# ============================================================================ +ALL_NODES=$(scontrol show hostnames "$SLURM_JOB_NODELIST") +PLANNER_NODE=$(echo "$ALL_NODES" | head -n 1) +ACTOR_NODE=$(echo "$ALL_NODES" | tail -n 1) + +echo "[sbatch] Job ID: $SLURM_JOB_ID" +echo "[sbatch] Planner Node: $PLANNER_NODE" +echo "[sbatch] Actor Node: $ACTOR_NODE" +echo "[sbatch] Log file: $LOG_FILE" + +mkdir -p "$PROJECT_DIR/scripts/logs" + +# ============================================================================ +# Launch Planner vLLM server on Node 0 +# ============================================================================ +echo "[sbatch] Starting Planner vLLM server on $PLANNER_NODE..." +srun --nodes=1 --ntasks=1 --nodelist="$PLANNER_NODE" \ + --container-image="$IMAGE" \ + --container-mounts=/lustre:/lustre \ + --container-writable \ + bash -c " + vllm serve $PLANNER_MODEL \ + --api-key gen \ + --tensor-parallel-size 8 \ + --enable-expert-parallel \ + --limit-mm-per-prompt.video 0 \ + --limit-mm-per-prompt.image 3 \ + --async-scheduling \ + --max-model-len 65536 \ + --gpu-memory-utilization 0.9 \ + > $PROJECT_DIR/scripts/logs/planner_${SLURM_JOB_ID}.log 2>&1 + " & +PLANNER_SRUN_PID=$! + +# ============================================================================ +# Launch Actor vLLM server + trajectory collection on Node 1 +# ============================================================================ +echo "[sbatch] Starting 2x Actor vLLM servers + collection on $ACTOR_NODE..." +srun --nodes=1 --ntasks=1 --nodelist="$ACTOR_NODE" \ + --container-image="$IMAGE" \ + --container-mounts=/lustre:/lustre \ + --container-writable \ + bash -c " + # --- Start Actor vLLM replica 1 (GPUs 0-3) --- + CUDA_VISIBLE_DEVICES=0,1,2,3 vllm serve $ACTOR_MODEL \ + --served-model-name ByteDance-Seed/UI-TARS-1.5-7B \ + --api-key gen \ + --port $ACTOR_PORT_1 \ + --tensor-parallel-size 4 \ + --limit-mm-per-prompt.image 5 \ + --limit-mm-per-prompt.video 0 \ + --max-model-len 65536 \ + --disable-log-requests \ + --disable-log-stats \ + > $PROJECT_DIR/scripts/logs/actor1_${SLURM_JOB_ID}.log 2>&1 & + ACTOR1_PID=\$! + + # --- Start Actor vLLM replica 2 (GPUs 4-7) --- + CUDA_VISIBLE_DEVICES=4,5,6,7 vllm serve $ACTOR_MODEL \ + --served-model-name ByteDance-Seed/UI-TARS-1.5-7B \ + --api-key gen \ + --port $ACTOR_PORT_2 \ + --tensor-parallel-size 4 \ + --limit-mm-per-prompt.image 5 \ + --limit-mm-per-prompt.video 0 \ + --max-model-len 65536 \ + --disable-log-requests \ + --disable-log-stats \ + > $PROJECT_DIR/scripts/logs/actor2_${SLURM_JOB_ID}.log 2>&1 & + ACTOR2_PID=\$! + + # --- Start round-robin load balancer on port $ACTOR_PORT --- + python3 -c ' +import http.server, http.client, threading, sys, io + +backends = [(\"localhost\", $ACTOR_PORT_1), (\"localhost\", $ACTOR_PORT_2)] +counter = 0 +lock = threading.Lock() + +class LBHandler(http.server.BaseHTTPRequestHandler): + def do_ANY(self, method): + global counter + with lock: + host, port = backends[counter % len(backends)] + counter += 1 + + content_length = int(self.headers.get(\"Content-Length\", 0)) + body = self.rfile.read(content_length) if content_length > 0 else None + + try: + conn = http.client.HTTPConnection(host, port, timeout=300) + conn.request(method, self.path, body=body, headers=dict(self.headers)) + resp = conn.getresponse() + resp_body = resp.read() + + self.send_response(resp.status) + for k, v in resp.getheaders(): + if k.lower() not in (\"transfer-encoding\",): + self.send_header(k, v) + self.end_headers() + self.wfile.write(resp_body) + conn.close() + except Exception as e: + self.send_response(502) + self.end_headers() + self.wfile.write(f\"LB error: {e}\".encode()) + + def do_GET(self): self.do_ANY(\"GET\") + def do_POST(self): self.do_ANY(\"POST\") + def do_PUT(self): self.do_ANY(\"PUT\") + def do_DELETE(self): self.do_ANY(\"DELETE\") + def log_message(self, format, *args): pass # silence logs + +server = http.server.ThreadingHTTPServer((\"0.0.0.0\", $ACTOR_PORT), LBHandler) +print(f\"[LB] Round-robin load balancer on port $ACTOR_PORT -> {backends}\", flush=True) +server.serve_forever() +' > $PROJECT_DIR/scripts/logs/actor_lb_${SLURM_JOB_ID}.log 2>&1 & + LB_PID=\$! + + # --- Wait for all servers to be healthy --- + echo '[actor-node] Waiting for vLLM servers to become healthy...' + + wait_for_server() { + local host=\$1 + local port=\$2 + local name=\$3 + local max_wait=600 + local elapsed=0 + + while [ \$elapsed -lt \$max_wait ]; do + if curl -sf http://\${host}:\${port}/health > /dev/null 2>&1; then + echo \"[actor-node] \$name server healthy (\${elapsed}s)\" + return 0 + fi + sleep 10 + elapsed=\$((elapsed + 10)) + if [ \$((elapsed % 60)) -eq 0 ]; then + echo \"[actor-node] Still waiting for \$name (\${elapsed}s)...\" + fi + done + echo \"[actor-node] ERROR: \$name server did not start within \${max_wait}s\" + return 1 + } + + # Wait for both actor replicas and planner + wait_for_server localhost $ACTOR_PORT_1 'Actor-1 (GPU 0-3)' + ACTOR1_OK=\$? + + wait_for_server localhost $ACTOR_PORT_2 'Actor-2 (GPU 4-7)' + ACTOR2_OK=\$? + + wait_for_server $PLANNER_NODE $PLANNER_PORT Planner + PLANNER_OK=\$? + + if [ \$ACTOR1_OK -ne 0 ] || [ \$ACTOR2_OK -ne 0 ] || [ \$PLANNER_OK -ne 0 ]; then + echo '[actor-node] ERROR: One or more servers failed to start.' + echo 'Planner log:' && tail -20 $PROJECT_DIR/scripts/logs/planner_${SLURM_JOB_ID}.log 2>/dev/null + echo 'Actor-1 log:' && tail -20 $PROJECT_DIR/scripts/logs/actor1_${SLURM_JOB_ID}.log 2>/dev/null + echo 'Actor-2 log:' && tail -20 $PROJECT_DIR/scripts/logs/actor2_${SLURM_JOB_ID}.log 2>/dev/null + kill \$ACTOR1_PID \$ACTOR2_PID \$LB_PID 2>/dev/null + exit 1 + fi + + echo '[actor-node] All servers healthy. Starting trajectory collection...' + + # --- Run trajectory collection --- + cd $PROJECT_DIR + source cua_env_reqs/bin/activate + export PYTHONPATH=$PROJECT_ROOT:\$PYTHONPATH + export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache + + python parallel_collect_trajectories.py \ + --planner_node $PLANNER_NODE \ + --actor_node $ACTOR_NODE \ + --runtime nvcf \ + --max_parallel $MAX_PARALLEL \ + --max_trajectories $MAX_TRAJECTORIES \ + 2>&1 | tee $LOG_FILE + + COLLECT_EXIT=\$? + + # Cleanup + kill \$ACTOR1_PID \$ACTOR2_PID \$LB_PID 2>/dev/null + echo \"[actor-node] Collection finished (exit code: \$COLLECT_EXIT)\" + exit \$COLLECT_EXIT + " & +ACTOR_SRUN_PID=$! + +# ============================================================================ +# Wait for completion +# ============================================================================ +# Wait for the actor srun (which runs collection). When it finishes, kill planner. +wait $ACTOR_SRUN_PID +COLLECT_EXIT=$? + +echo "[sbatch] Actor node finished (exit: $COLLECT_EXIT). Stopping planner..." +kill $PLANNER_SRUN_PID 2>/dev/null +wait $PLANNER_SRUN_PID 2>/dev/null + +echo "[sbatch] Done. Log: $LOG_FILE" +exit $COLLECT_EXIT diff --git a/cua/scripts/kimi/run_collector_kimi_nvcf.sh b/cua/scripts/kimi/run_collector_kimi_nvcf.sh new file mode 100755 index 000000000..b4d9d961d --- /dev/null +++ b/cua/scripts/kimi/run_collector_kimi_nvcf.sh @@ -0,0 +1,168 @@ +#!/bin/bash +# ============================================================================ +# Kimi NVCF Collector Launcher (SSH+Enroot Pattern) +# ============================================================================ +# Same as run_collector_kimi.sh but passes --runtime nvcf. +# No /dev/kvm reservation needed — VMs are remote NVCF instances. +# +# Required env vars: +# MODEL_NODE - hostname of the Kimi vLLM server head node +# NGC_API_KEY - NVCF API key +# NGC_ORG - NVCF organization +# +# Optional env vars: +# MAX_PARALLEL - parallel VMs per collector (default: 10) +# MAX_TRAJECTORIES - trajectories to collect (default: 10000) +# TRAJECTORY_SAVE_DIR - output directory +# +# Optional arg: +# $1 = collector index (for log naming, default: 0) +# +# Usage: +# MODEL_NODE=pool0-03161 NGC_API_KEY=nvapi-xxx NGC_ORG=my-org \ +# bash run_collector_kimi_nvcf.sh 1 +# ============================================================================ + +COLLECTOR_IDX="${1:-0}" +LOG_DIR="${LOG_DIR:-./logs}" + +# Configs +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" +PROJECT_DIR="$PROJECT_ROOT/cua" +# COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua_cpu.sqsh" +COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua_cpu.sqsh" + +MAX_PARALLEL=${MAX_PARALLEL:-10} +MAX_TRAJECTORIES=${MAX_TRAJECTORIES:-10000} +TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-$PROJECT_DIR/trajectories/kimi-nvcf}" + +KIMI_PORT=8000 +NVCF_FUNCTION_NAME_PREFIX="${NVCF_FUNCTION_NAME_PREFIX:-data-collection}" + +# Validate +if [ -z "$MODEL_NODE" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: MODEL_NODE not set." + exit 1 +fi +if [ -z "$NGC_API_KEY" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: NGC_API_KEY not set." + exit 1 +fi +if [ -z "$NGC_ORG" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: NGC_ORG not set." + exit 1 +fi + +mkdir -p "$LOG_DIR" + +echo "[Collector $COLLECTOR_IDX] MODEL_NODE=$MODEL_NODE" +echo "[Collector $COLLECTOR_IDX] MAX_PARALLEL=$MAX_PARALLEL MAX_TRAJECTORIES=$MAX_TRAJECTORIES" +echo "[Collector $COLLECTOR_IDX] Runtime: nvcf" + +# --- 1. Submit holder job on CPU node (no /dev/kvm needed) --- +echo "[Collector $COLLECTOR_IDX] Submitting holder job..." +COLLECTOR_JOB_ID=$(sbatch --parsable \ + --job-name="kimi_nvcf_collector_${COLLECTOR_IDX}" \ + --account=nvr_lpr_agentic \ + --partition=cpu_short \ + --mem=0 \ + --time=01:30:00 \ + --exclusive \ + --output="/dev/null" \ + --error="/dev/null" \ + --wrap="srun --container-image=$COLLECTOR_IMAGE --container-mounts=/lustre:/lustre sleep infinity") + +if [ -z "$COLLECTOR_JOB_ID" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: Job submission failed." + exit 1 +fi +echo "[Collector $COLLECTOR_IDX] Holder job submitted: $COLLECTOR_JOB_ID" + +# --- 2. Cleanup trap --- +cleanup() { + echo "" + echo "[Collector $COLLECTOR_IDX] Cleaning up... Cancelling holder job $COLLECTOR_JOB_ID" + scancel "$COLLECTOR_JOB_ID" 2>/dev/null +} +trap cleanup EXIT + +# --- 3. Wait for job to start --- +echo "[Collector $COLLECTOR_IDX] Waiting for holder job to start..." +COLLECTOR_NODE="" +while [ -z "$COLLECTOR_NODE" ]; do + JOB_STATE=$(squeue -j "$COLLECTOR_JOB_ID" -h -o %T 2>/dev/null) + + if [ "$JOB_STATE" == "RUNNING" ]; then + COLLECTOR_NODE=$(squeue -j "$COLLECTOR_JOB_ID" -h -o %N) + elif [ -z "$JOB_STATE" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: Job $COLLECTOR_JOB_ID disappeared from queue!" + exit 1 + fi + sleep 2 +done +echo "[Collector $COLLECTOR_IDX] Job RUNNING on node: $COLLECTOR_NODE" + +# --- 4. Wait for container readiness --- +echo "[Collector $COLLECTOR_IDX] Polling for container readiness on $COLLECTOR_NODE..." +CONTAINER_PID="" +while [ -z "$CONTAINER_PID" ]; do + sleep 2 + CONTAINER_PID=$(ssh -q -o StrictHostKeyChecking=no "$COLLECTOR_NODE" \ + "enroot list -f | grep 'pyxis' | grep 'sleep' | awk '{print \$2}' | head -n 1" 2>/dev/null) + + if [ -z "$CONTAINER_PID" ]; then + printf "." + fi +done +echo "" +echo "[Collector $COLLECTOR_IDX] Container ready, PID: $CONTAINER_PID" + +# --- 5. SSH+enroot exec: run data collection with NVCF backend --- +ssh -t -q -o StrictHostKeyChecking=no "$COLLECTOR_NODE" \ + "enroot exec $CONTAINER_PID /bin/bash -c ' + set -e + export PYTHONUNBUFFERED=1 + export NGC_API_KEY=$NGC_API_KEY + export NGC_ORG=$NGC_ORG + export NVCF_FUNCTION_NAME_PREFIX=$NVCF_FUNCTION_NAME_PREFIX + export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache + + # Wait for Kimi vLLM to be healthy + echo \"[Collector $COLLECTOR_IDX] Waiting for Kimi vLLM at $MODEL_NODE:$KIMI_PORT...\" + ELAPSED=0 + MAX_WAIT=7200 + while [ \$ELAPSED -lt \$MAX_WAIT ]; do + if curl -sf http://$MODEL_NODE:$KIMI_PORT/health > /dev/null 2>&1; then + echo \"[Collector $COLLECTOR_IDX] Kimi vLLM is healthy!\" + break + fi + sleep 10 + ELAPSED=\$((ELAPSED + 10)) + if [ \$((ELAPSED % 60)) -eq 0 ]; then + echo \"[Collector $COLLECTOR_IDX] Still waiting for Kimi vLLM (\${ELAPSED}s)...\" + fi + done + + if [ \$ELAPSED -ge \$MAX_WAIT ]; then + echo \"[Collector $COLLECTOR_IDX] ERROR: Kimi vLLM did not become healthy within \${MAX_WAIT}s\" + exit 1 + fi + + # Run data collection with NVCF backend + # Install missing Python dependencies (--break-system-packages for PEP 668 containers) + python -m pip install --break-system-packages openai Pillow jsonlines pandas requests 2>&1 || \ + pip3 install --break-system-packages openai Pillow jsonlines pandas requests 2>&1 || \ + echo "[WARNING] Failed to install Python dependencies" + echo \"[Collector $COLLECTOR_IDX] Starting parallel data collection (NVCF backend)...\" + cd $PROJECT_DIR + python parallel_collect_kimi.py \ + --model_node $MODEL_NODE \ + --runtime nvcf \ + --max_parallel $MAX_PARALLEL \ + --max_trajectories $MAX_TRAJECTORIES \ + --trajectory_save_dir $TRAJECTORY_SAVE_DIR + + COLLECT_EXIT=\$? + echo \"[Collector $COLLECTOR_IDX] Data collection finished with exit code \$COLLECT_EXIT\" + exit \$COLLECT_EXIT + '" diff --git a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh new file mode 100755 index 000000000..3a94465d6 --- /dev/null +++ b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh @@ -0,0 +1,210 @@ +#!/bin/bash +# ============================================================================ +# Kimi-K2.5 Data Collection with NVCF Backend - Multi-Collector Launcher +# ============================================================================ +# Same architecture as run_parallel_kimi.sh but uses NVCF for VMs instead of +# local KVM. Collector nodes don't need /dev/kvm or GPU — they just orchestrate +# remote NVCF VMs. +# +# 1. Submits Kimi vLLM sbatch job (2 GPU nodes, Ray cluster) +# 2. Waits for Kimi server to become healthy +# 3. Launches NUM_COLLECTORS instances of run_collector_kimi_nvcf.sh in parallel +# 4. Each collector submits a CPU-only holder job (no /dev/kvm needed) +# 5. Waits for all collectors to finish, then cancels Kimi server +# +# Required env vars: +# NGC_API_KEY - NVCF API key +# NGC_ORG - NVCF organization +# +# Usage: +# NGC_API_KEY=nvapi-xxx NGC_ORG=my-org NUM_COLLECTORS=2 bash run_parallel_kimi_nvcf.sh +# +# ============================================================================ + +export LOG_DIR="${LOG_DIR:-./logs}" + +# Configurable parameters +NUM_COLLECTORS="${NUM_COLLECTORS:-2}" +MAX_PARALLEL="${MAX_PARALLEL:-16}" +MAX_TRAJECTORIES="${MAX_TRAJECTORIES:-10000}" +# TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi-nvcf/}" +TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/ProRL-Agent-Server/cua/trajectories/kimi-nvcf/}" +NVCF_FUNCTION_NAME_PREFIX="${NVCF_FUNCTION_NAME_PREFIX:-data-collection}" + +# Validate NVCF credentials +if [ -z "$NGC_API_KEY" ]; then + echo "[ERROR] NGC_API_KEY not set. Required for NVCF backend." + exit 1 +fi +if [ -z "$NGC_ORG" ]; then + echo "[ERROR] NGC_ORG not set. Required for NVCF backend." + exit 1 +fi + +# Create logs directory +mkdir -p "$LOG_DIR" + +KIMI_JOB_ID="" +COLLECTOR_PIDS=() +KIMI_PORT=8000 + +echo "============================================" +echo "Kimi-K2.5 Data Collection (NVCF Backend)" +echo "============================================" +echo "NUM_COLLECTORS: $NUM_COLLECTORS" +echo "MAX_PARALLEL: $MAX_PARALLEL (per collector)" +echo "MAX_TRAJECTORIES: $MAX_TRAJECTORIES (per collector)" +echo "NGC_ORG: $NGC_ORG" +echo "NVCF_PREFIX: $NVCF_FUNCTION_NAME_PREFIX" +echo "" + + +# --- Cleanup: cancel Kimi server on exit --- +cleanup() { + echo "" + echo "[run_parallel_kimi_nvcf.sh] Cleaning up..." + + # 1. Kill Collector Launcher Scripts + if [ ${#COLLECTOR_PIDS[@]} -gt 0 ]; then + echo "[run_parallel_kimi_nvcf.sh] Killing ${#COLLECTOR_PIDS[@]} collector launcher scripts..." + for pid in "${COLLECTOR_PIDS[@]}"; do + if kill -0 "$pid" 2>/dev/null; then + kill "$pid" 2>/dev/null + fi + done + fi + + # 2. Cancel Kimi vLLM server + if [ -n "$KIMI_JOB_ID" ]; then + echo "[run_parallel_kimi_nvcf.sh] Cancelling Kimi vLLM job $KIMI_JOB_ID" + scancel "$KIMI_JOB_ID" 2>/dev/null + fi + + # 3. Remove head node file + rm -f "$LOG_DIR/head_node_${KIMI_JOB_ID}" +} +trap cleanup EXIT + + +# --- 1. Submit Kimi vLLM server --- +echo "[run_parallel_kimi_nvcf.sh] Submitting Kimi vLLM sbatch job..." +KIMI_JOB_ID=$(sbatch \ + --account=nvr_lacr_llm \ + --partition=batch_short \ + --time=02:00:00 \ + --output="$LOG_DIR/slurm-%j-server.out" \ + --error="$LOG_DIR/slurm-%j-server.out" \ + --parsable \ + "./run_kimi.sbatch") + +if [ -z "$KIMI_JOB_ID" ]; then + echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi sbatch submission failed." + exit 1 +fi +echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM job submitted: $KIMI_JOB_ID" + +# Wait for the job to start and discover the head node +HEAD_NODE_FILE="$LOG_DIR/head_node_${KIMI_JOB_ID}" +echo "[run_parallel_kimi_nvcf.sh] Waiting for head node file: $HEAD_NODE_FILE" +MODEL_NODE="" +ELAPSED=0 +MAX_WAIT=43200 # 12 hours + +while [ $ELAPSED -lt $MAX_WAIT ]; do + JOB_STATE=$(squeue -j "$KIMI_JOB_ID" -h -o %T 2>/dev/null) + if [ -z "$JOB_STATE" ]; then + echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi job $KIMI_JOB_ID disappeared from queue!" + exit 1 + fi + + if [ -f "$HEAD_NODE_FILE" ]; then + MODEL_NODE=$(cat "$HEAD_NODE_FILE") + if [ -n "$MODEL_NODE" ]; then + echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM job running. Head node: $MODEL_NODE" + break + fi + fi + + sleep 10 + ELAPSED=$((ELAPSED + 10)) + if [ $((ELAPSED % 60)) -eq 0 ]; then + echo "[run_parallel_kimi_nvcf.sh] Still waiting for Kimi job to start (${ELAPSED}s)..." + fi +done + +if [ -z "$MODEL_NODE" ]; then + echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi vLLM did not start within ${MAX_WAIT}s." + exit 1 +fi + +# --- 2. Wait for Kimi vLLM health --- +echo "[run_parallel_kimi_nvcf.sh] Waiting for Kimi vLLM health at $MODEL_NODE:$KIMI_PORT..." +ELAPSED=0 +MAX_HEALTH_WAIT=7200 + +while [ $ELAPSED -lt $MAX_HEALTH_WAIT ]; do + if curl -sf "http://$MODEL_NODE:$KIMI_PORT/health" > /dev/null 2>&1; then + echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM is healthy!" + break + fi + sleep 10 + ELAPSED=$((ELAPSED + 10)) + if [ $((ELAPSED % 60)) -eq 0 ]; then + echo "[run_parallel_kimi_nvcf.sh] Still waiting for Kimi health (${ELAPSED}s)..." + fi +done + +if [ $ELAPSED -ge $MAX_HEALTH_WAIT ]; then + echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi vLLM did not become healthy within ${MAX_HEALTH_WAIT}s." + exit 1 +fi + +# --- 3. Launch N Collector Instances --- +echo "[run_parallel_kimi_nvcf.sh] Launching $NUM_COLLECTORS collector(s)..." +export MODEL_NODE +COLLECTOR_PIDS=() + +for i in $(seq 1 "$NUM_COLLECTORS"); do + echo "[run_parallel_kimi_nvcf.sh] Starting collector $i..." + CURRENT_LOG="$LOG_DIR/slurm-${KIMI_JOB_ID}-collector-nvcf-${i}.out" + + MODEL_NODE="$MODEL_NODE" \ + NVCF_FUNCTION_NAME_PREFIX="$NVCF_FUNCTION_NAME_PREFIX" \ + NGC_API_KEY="$NGC_API_KEY" \ + NGC_ORG="$NGC_ORG" \ + MAX_PARALLEL="$MAX_PARALLEL" \ + MAX_TRAJECTORIES="$MAX_TRAJECTORIES" \ + TRAJECTORY_SAVE_DIR="$TRAJECTORY_SAVE_DIR" \ + bash "./run_collector_kimi_nvcf.sh" "$i" &> "$CURRENT_LOG" & + + COLLECTOR_PIDS+=($!) + echo "[run_parallel_kimi_nvcf.sh] Collector $i launched (PID ${COLLECTOR_PIDS[-1]})" + echo " Log: $CURRENT_LOG" +done + +# --- 4. Wait for all collectors --- +echo "" +echo "[run_parallel_kimi_nvcf.sh] All collectors launched. Waiting for completion..." +echo "" + +FAILED=0 +for i in "${!COLLECTOR_PIDS[@]}"; do + COLLECTOR_NUM=$((i + 1)) + wait "${COLLECTOR_PIDS[$i]}" 2>/dev/null + EXIT_CODE=$? + if [ $EXIT_CODE -eq 0 ]; then + echo "[run_parallel_kimi_nvcf.sh] Collector $COLLECTOR_NUM finished successfully." + else + echo "[run_parallel_kimi_nvcf.sh] Collector $COLLECTOR_NUM failed (exit code $EXIT_CODE)." + FAILED=$((FAILED + 1)) + fi +done + +echo "" +echo "============================================" +echo "[run_parallel_kimi_nvcf.sh] All collectors finished. $FAILED/$NUM_COLLECTORS failed." +echo "============================================" + +if [ $FAILED -gt 0 ]; then + exit 1 +fi From d100f6ce6d457f6b90a0279b02ad1f55bbde933f Mon Sep 17 00:00:00 2001 From: Brandon Cui Date: Mon, 16 Mar 2026 11:30:10 -0700 Subject: [PATCH 3/4] Updating scripts --- cua/scripts/kimi/run_collector_kimi.sh | 3 +- cua/scripts/kimi/run_collector_kimi_nvcf.sh | 13 +- .../kimi/run_parallel_kimi_colocated.sh | 6 +- cua/scripts/kimi/run_parallel_kimi_nvcf.sh | 163 ++++++++++++------ 4 files changed, 122 insertions(+), 63 deletions(-) diff --git a/cua/scripts/kimi/run_collector_kimi.sh b/cua/scripts/kimi/run_collector_kimi.sh index 1d8a32d3b..a8d2238eb 100644 --- a/cua/scripts/kimi/run_collector_kimi.sh +++ b/cua/scripts/kimi/run_collector_kimi.sh @@ -24,7 +24,8 @@ COLLECTOR_IDX="${1:-0}" LOG_DIR="${LOG_DIR:-./logs}" # Configs -PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +# PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua_cpu.sqsh" diff --git a/cua/scripts/kimi/run_collector_kimi_nvcf.sh b/cua/scripts/kimi/run_collector_kimi_nvcf.sh index b4d9d961d..adf627a76 100755 --- a/cua/scripts/kimi/run_collector_kimi_nvcf.sh +++ b/cua/scripts/kimi/run_collector_kimi_nvcf.sh @@ -30,7 +30,8 @@ LOG_DIR="${LOG_DIR:-./logs}" PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" # COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua_cpu.sqsh" -COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua_cpu.sqsh" +# COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua_cpu.sqsh" +COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" MAX_PARALLEL=${MAX_PARALLEL:-10} MAX_TRAJECTORIES=${MAX_TRAJECTORIES:-10000} @@ -68,8 +69,8 @@ COLLECTOR_JOB_ID=$(sbatch --parsable \ --mem=0 \ --time=01:30:00 \ --exclusive \ - --output="/dev/null" \ - --error="/dev/null" \ + --output="$LOG_DIR/slurm-holder-${COLLECTOR_IDX}.out" \ + --error="$LOG_DIR/slurm-holder-${COLLECTOR_IDX}.out" \ --wrap="srun --container-image=$COLLECTOR_IMAGE --container-mounts=/lustre:/lustre sleep infinity") if [ -z "$COLLECTOR_JOB_ID" ]; then @@ -149,10 +150,8 @@ ssh -t -q -o StrictHostKeyChecking=no "$COLLECTOR_NODE" \ fi # Run data collection with NVCF backend - # Install missing Python dependencies (--break-system-packages for PEP 668 containers) - python -m pip install --break-system-packages openai Pillow jsonlines pandas requests 2>&1 || \ - pip3 install --break-system-packages openai Pillow jsonlines pandas requests 2>&1 || \ - echo "[WARNING] Failed to install Python dependencies" + # Activate Python venv with required dependencies + echo \"[Collector $COLLECTOR_IDX] Starting parallel data collection (NVCF backend)...\" cd $PROJECT_DIR python parallel_collect_kimi.py \ diff --git a/cua/scripts/kimi/run_parallel_kimi_colocated.sh b/cua/scripts/kimi/run_parallel_kimi_colocated.sh index 13379b46e..b6336c8df 100644 --- a/cua/scripts/kimi/run_parallel_kimi_colocated.sh +++ b/cua/scripts/kimi/run_parallel_kimi_colocated.sh @@ -27,7 +27,8 @@ export LOG_DIR="${LOG_DIR:-./logs}" # Configurable parameters MAX_PARALLEL="${MAX_PARALLEL:-16}" MAX_TRAJECTORIES="${MAX_TRAJECTORIES:-10000}" -TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi +# TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi +TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/ProRL-Agent-Server/cua/trajectories/kimi-debug/ # Create logs directory mkdir -p "$LOG_DIR" @@ -36,7 +37,8 @@ KIMI_JOB_ID="" COLLECTOR_PIDS=() KIMI_PORT=8000 -PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +# PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" echo "============================================" diff --git a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh index 3a94465d6..5c4b76ad9 100755 --- a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh +++ b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh @@ -1,36 +1,36 @@ #!/bin/bash # ============================================================================ -# Kimi-K2.5 Data Collection with NVCF Backend - Multi-Collector Launcher +# Kimi-K2.5 Data Collection with NVCF Backend (Colocated on GPU Nodes) # ============================================================================ -# Same architecture as run_parallel_kimi.sh but uses NVCF for VMs instead of -# local KVM. Collector nodes don't need /dev/kvm or GPU — they just orchestrate -# remote NVCF VMs. +# Runs data collection directly on the GPU server nodes (colocated with vLLM). +# Since NVCF VMs are remote, no /dev/kvm or extra CPU nodes are needed — +# collectors just orchestrate remote NVCF VMs from inside the vLLM container. # # 1. Submits Kimi vLLM sbatch job (2 GPU nodes, Ray cluster) # 2. Waits for Kimi server to become healthy -# 3. Launches NUM_COLLECTORS instances of run_collector_kimi_nvcf.sh in parallel -# 4. Each collector submits a CPU-only holder job (no /dev/kvm needed) -# 5. Waits for all collectors to finish, then cancels Kimi server +# 3. SSH+enroot execs into each GPU node's container to run data collection +# 4. Waits for all collectors to finish, then cancels Kimi server # # Required env vars: # NGC_API_KEY - NVCF API key # NGC_ORG - NVCF organization # # Usage: -# NGC_API_KEY=nvapi-xxx NGC_ORG=my-org NUM_COLLECTORS=2 bash run_parallel_kimi_nvcf.sh +# NGC_API_KEY=nvapi-xxx NGC_ORG=my-org bash run_parallel_kimi_nvcf.sh # # ============================================================================ export LOG_DIR="${LOG_DIR:-./logs}" # Configurable parameters -NUM_COLLECTORS="${NUM_COLLECTORS:-2}" MAX_PARALLEL="${MAX_PARALLEL:-16}" MAX_TRAJECTORIES="${MAX_TRAJECTORIES:-10000}" -# TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi-nvcf/}" TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/ProRL-Agent-Server/cua/trajectories/kimi-nvcf/}" NVCF_FUNCTION_NAME_PREFIX="${NVCF_FUNCTION_NAME_PREFIX:-data-collection}" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" +PROJECT_DIR="$PROJECT_ROOT/cua" + # Validate NVCF credentials if [ -z "$NGC_API_KEY" ]; then echo "[ERROR] NGC_API_KEY not set. Required for NVCF backend." @@ -49,24 +49,33 @@ COLLECTOR_PIDS=() KIMI_PORT=8000 echo "============================================" -echo "Kimi-K2.5 Data Collection (NVCF Backend)" +echo "Kimi-K2.5 Data Collection (NVCF Colocated)" echo "============================================" -echo "NUM_COLLECTORS: $NUM_COLLECTORS" -echo "MAX_PARALLEL: $MAX_PARALLEL (per collector)" -echo "MAX_TRAJECTORIES: $MAX_TRAJECTORIES (per collector)" +echo "MAX_PARALLEL: $MAX_PARALLEL (per node)" +echo "MAX_TRAJECTORIES: $MAX_TRAJECTORIES (per node)" echo "NGC_ORG: $NGC_ORG" echo "NVCF_PREFIX: $NVCF_FUNCTION_NAME_PREFIX" echo "" -# --- Cleanup: cancel Kimi server on exit --- +# --- Helper: run NVCF cleanup --- +nvcf_cleanup() { + echo "[nvcf] Cleaning up NVCF functions with prefix '$NVCF_FUNCTION_NAME_PREFIX'..." + NVCF_FUNCTION_NAME_PREFIX="$NVCF_FUNCTION_NAME_PREFIX" \ + NGC_API_KEY="$NGC_API_KEY" \ + NGC_ORG="$NGC_ORG" \ + python "$PROJECT_DIR/cleanup_nvcf.py" --cleanup 2>&1 || \ + echo "[nvcf] WARNING: NVCF cleanup failed (non-fatal)" +} + +# --- Cleanup: cancel Kimi server + NVCF functions on exit --- cleanup() { echo "" - echo "[run_parallel_kimi_nvcf.sh] Cleaning up..." + echo "[nvcf] Cleaning up..." - # 1. Kill Collector Launcher Scripts + # 1. Kill collector SSH sessions if [ ${#COLLECTOR_PIDS[@]} -gt 0 ]; then - echo "[run_parallel_kimi_nvcf.sh] Killing ${#COLLECTOR_PIDS[@]} collector launcher scripts..." + echo "[nvcf] Killing ${#COLLECTOR_PIDS[@]} collector(s)..." for pid in "${COLLECTOR_PIDS[@]}"; do if kill -0 "$pid" 2>/dev/null; then kill "$pid" 2>/dev/null @@ -76,20 +85,26 @@ cleanup() { # 2. Cancel Kimi vLLM server if [ -n "$KIMI_JOB_ID" ]; then - echo "[run_parallel_kimi_nvcf.sh] Cancelling Kimi vLLM job $KIMI_JOB_ID" + echo "[nvcf] Cancelling Kimi vLLM job $KIMI_JOB_ID" scancel "$KIMI_JOB_ID" 2>/dev/null fi # 3. Remove head node file rm -f "$LOG_DIR/head_node_${KIMI_JOB_ID}" + + # 4. Clean up any remaining NVCF functions + nvcf_cleanup } trap cleanup EXIT +# --- 0. Clean up stale NVCF functions from previous runs --- +nvcf_cleanup + # --- 1. Submit Kimi vLLM server --- -echo "[run_parallel_kimi_nvcf.sh] Submitting Kimi vLLM sbatch job..." +echo "[nvcf] Submitting Kimi vLLM sbatch job..." KIMI_JOB_ID=$(sbatch \ - --account=nvr_lacr_llm \ + --account=llmservice_fm_vision \ --partition=batch_short \ --time=02:00:00 \ --output="$LOG_DIR/slurm-%j-server.out" \ @@ -98,29 +113,31 @@ KIMI_JOB_ID=$(sbatch \ "./run_kimi.sbatch") if [ -z "$KIMI_JOB_ID" ]; then - echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi sbatch submission failed." + echo "[nvcf] ERROR: Kimi sbatch submission failed." exit 1 fi -echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM job submitted: $KIMI_JOB_ID" +echo "[nvcf] Kimi vLLM job submitted: $KIMI_JOB_ID" -# Wait for the job to start and discover the head node +# Wait for the job to start and discover nodes HEAD_NODE_FILE="$LOG_DIR/head_node_${KIMI_JOB_ID}" -echo "[run_parallel_kimi_nvcf.sh] Waiting for head node file: $HEAD_NODE_FILE" +echo "[nvcf] Waiting for head node file: $HEAD_NODE_FILE" MODEL_NODE="" +ALL_NODES="" ELAPSED=0 MAX_WAIT=43200 # 12 hours while [ $ELAPSED -lt $MAX_WAIT ]; do JOB_STATE=$(squeue -j "$KIMI_JOB_ID" -h -o %T 2>/dev/null) if [ -z "$JOB_STATE" ]; then - echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi job $KIMI_JOB_ID disappeared from queue!" + echo "[nvcf] ERROR: Kimi job $KIMI_JOB_ID disappeared from queue!" exit 1 fi if [ -f "$HEAD_NODE_FILE" ]; then MODEL_NODE=$(cat "$HEAD_NODE_FILE") if [ -n "$MODEL_NODE" ]; then - echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM job running. Head node: $MODEL_NODE" + ALL_NODES=$(scontrol show hostnames "$(squeue -j "$KIMI_JOB_ID" -h -o %N)") + echo "[nvcf] Kimi vLLM job running on: $(echo $ALL_NODES | tr '\n' ' ')" break fi fi @@ -128,63 +145,103 @@ while [ $ELAPSED -lt $MAX_WAIT ]; do sleep 10 ELAPSED=$((ELAPSED + 10)) if [ $((ELAPSED % 60)) -eq 0 ]; then - echo "[run_parallel_kimi_nvcf.sh] Still waiting for Kimi job to start (${ELAPSED}s)..." + echo "[nvcf] Still waiting for Kimi job to start (${ELAPSED}s)..." fi done if [ -z "$MODEL_NODE" ]; then - echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi vLLM did not start within ${MAX_WAIT}s." + echo "[nvcf] ERROR: Kimi vLLM did not start within ${MAX_WAIT}s." exit 1 fi +mapfile -t NODES_ARRAY <<< "$ALL_NODES" +echo "[nvcf] Head node (vLLM API): $MODEL_NODE" +echo "[nvcf] All nodes: ${NODES_ARRAY[*]}" + # --- 2. Wait for Kimi vLLM health --- -echo "[run_parallel_kimi_nvcf.sh] Waiting for Kimi vLLM health at $MODEL_NODE:$KIMI_PORT..." +echo "[nvcf] Waiting for Kimi vLLM health at $MODEL_NODE:$KIMI_PORT..." ELAPSED=0 MAX_HEALTH_WAIT=7200 while [ $ELAPSED -lt $MAX_HEALTH_WAIT ]; do if curl -sf "http://$MODEL_NODE:$KIMI_PORT/health" > /dev/null 2>&1; then - echo "[run_parallel_kimi_nvcf.sh] Kimi vLLM is healthy!" + echo "[nvcf] Kimi vLLM is healthy!" break fi sleep 10 ELAPSED=$((ELAPSED + 10)) if [ $((ELAPSED % 60)) -eq 0 ]; then - echo "[run_parallel_kimi_nvcf.sh] Still waiting for Kimi health (${ELAPSED}s)..." + echo "[nvcf] Still waiting for Kimi health (${ELAPSED}s)..." fi done if [ $ELAPSED -ge $MAX_HEALTH_WAIT ]; then - echo "[run_parallel_kimi_nvcf.sh] ERROR: Kimi vLLM did not become healthy within ${MAX_HEALTH_WAIT}s." + echo "[nvcf] ERROR: Kimi vLLM did not become healthy within ${MAX_HEALTH_WAIT}s." exit 1 fi -# --- 3. Launch N Collector Instances --- -echo "[run_parallel_kimi_nvcf.sh] Launching $NUM_COLLECTORS collector(s)..." -export MODEL_NODE +# --- 3. Launch data collection on each GPU node via SSH+enroot --- +echo "[nvcf] Launching data collection on ${#NODES_ARRAY[@]} node(s)..." COLLECTOR_PIDS=() -for i in $(seq 1 "$NUM_COLLECTORS"); do - echo "[run_parallel_kimi_nvcf.sh] Starting collector $i..." - CURRENT_LOG="$LOG_DIR/slurm-${KIMI_JOB_ID}-collector-nvcf-${i}.out" +for i in "${!NODES_ARRAY[@]}"; do + node=${NODES_ARRAY[$i]} + COLLECTOR_IDX=$((i + 1)) + CURRENT_LOG="$LOG_DIR/slurm-${KIMI_JOB_ID}-collector-nvcf-${COLLECTOR_IDX}.out" - MODEL_NODE="$MODEL_NODE" \ - NVCF_FUNCTION_NAME_PREFIX="$NVCF_FUNCTION_NAME_PREFIX" \ - NGC_API_KEY="$NGC_API_KEY" \ - NGC_ORG="$NGC_ORG" \ - MAX_PARALLEL="$MAX_PARALLEL" \ - MAX_TRAJECTORIES="$MAX_TRAJECTORIES" \ - TRAJECTORY_SAVE_DIR="$TRAJECTORY_SAVE_DIR" \ - bash "./run_collector_kimi_nvcf.sh" "$i" &> "$CURRENT_LOG" & + echo "[nvcf] Finding container on $node..." + CONTAINER_PID="" + while [ -z "$CONTAINER_PID" ]; do + sleep 2 + CONTAINER_PID=$(ssh -q -o StrictHostKeyChecking=no "$node" \ + "enroot list -f | grep 'pyxis' | head -n 1 | awk '{print \$2}'" 2>/dev/null) + done + echo "[nvcf] Container on $node ready, PID: $CONTAINER_PID" + + ssh -t -q -o StrictHostKeyChecking=no "$node" \ + "enroot exec $CONTAINER_PID /bin/bash -c ' + set -e + export PYTHONUNBUFFERED=1 + export NGC_API_KEY=$NGC_API_KEY + export NGC_ORG=$NGC_ORG + export NVCF_FUNCTION_NAME_PREFIX=$NVCF_FUNCTION_NAME_PREFIX + export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache + + # Activate Python venv with required dependencies + source /lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/cua_env_reqs/bin/activate + + # Fix TLS certs: copy venv cacert.pem to container path if missing + VENV_CACERT=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/cua_env_reqs/lib/python3.12/site-packages/certifi/cacert.pem + SYS_CACERT=/usr/local/lib/python3.12/dist-packages/certifi/cacert.pem + if [ ! -f \$SYS_CACERT ] && [ -f \$VENV_CACERT ]; then + mkdir -p /usr/local/lib/python3.12/dist-packages/certifi + cp \$VENV_CACERT \$SYS_CACERT 2>/dev/null || true + fi + export REQUESTS_CA_BUNDLE=\$VENV_CACERT + export SSL_CERT_FILE=\$VENV_CACERT + export CURL_CA_BUNDLE=\$VENV_CACERT + + echo \"[Collector $COLLECTOR_IDX] Starting data collection on $node (NVCF backend)...\" + cd $PROJECT_DIR + python parallel_collect_kimi.py \ + --model_node $MODEL_NODE \ + --runtime nvcf \ + --max_parallel $MAX_PARALLEL \ + --max_trajectories $MAX_TRAJECTORIES \ + --trajectory_save_dir $TRAJECTORY_SAVE_DIR + COLLECT_EXIT=\$? + echo \"[Collector $COLLECTOR_IDX] Done (exit code \$COLLECT_EXIT)\" + exit \$COLLECT_EXIT + '" &> "$CURRENT_LOG" & COLLECTOR_PIDS+=($!) - echo "[run_parallel_kimi_nvcf.sh] Collector $i launched (PID ${COLLECTOR_PIDS[-1]})" - echo " Log: $CURRENT_LOG" + echo "[nvcf] Collector $COLLECTOR_IDX launched on $node (PID ${COLLECTOR_PIDS[-1]})" + echo " Log: $CURRENT_LOG" done # --- 4. Wait for all collectors --- echo "" -echo "[run_parallel_kimi_nvcf.sh] All collectors launched. Waiting for completion..." +echo "[nvcf] All collectors launched. Waiting for completion..." echo "" FAILED=0 @@ -193,16 +250,16 @@ for i in "${!COLLECTOR_PIDS[@]}"; do wait "${COLLECTOR_PIDS[$i]}" 2>/dev/null EXIT_CODE=$? if [ $EXIT_CODE -eq 0 ]; then - echo "[run_parallel_kimi_nvcf.sh] Collector $COLLECTOR_NUM finished successfully." + echo "[nvcf] Collector $COLLECTOR_NUM finished successfully." else - echo "[run_parallel_kimi_nvcf.sh] Collector $COLLECTOR_NUM failed (exit code $EXIT_CODE)." + echo "[nvcf] Collector $COLLECTOR_NUM failed (exit code $EXIT_CODE)." FAILED=$((FAILED + 1)) fi done echo "" echo "============================================" -echo "[run_parallel_kimi_nvcf.sh] All collectors finished. $FAILED/$NUM_COLLECTORS failed." +echo "[nvcf] All collectors finished. $FAILED/${#NODES_ARRAY[@]} failed." echo "============================================" if [ $FAILED -gt 0 ]; then From 2ab7208f1fb51fed47af36473831a780a51996b1 Mon Sep 17 00:00:00 2001 From: Brandon Cui Date: Mon, 16 Mar 2026 12:24:36 -0700 Subject: [PATCH 4/4] Cleaning up --- cua/cleanup_nvcf.py | 112 --------- cua/scripts/collect_trajectories_sbatch.sh | 253 --------------------- cua/scripts/kimi/run_kimi.sbatch | 3 +- cua/scripts/kimi/run_parallel_kimi_nvcf.sh | 44 ---- 4 files changed, 2 insertions(+), 410 deletions(-) delete mode 100644 cua/cleanup_nvcf.py delete mode 100755 cua/scripts/collect_trajectories_sbatch.sh diff --git a/cua/cleanup_nvcf.py b/cua/cleanup_nvcf.py deleted file mode 100644 index 9af03afec..000000000 --- a/cua/cleanup_nvcf.py +++ /dev/null @@ -1,112 +0,0 @@ -#!/usr/bin/env python3 -"""List and clean up NVCF functions. - -Usage: - # List all functions in the org: - python cleanup_nvcf.py --list - - # Undeploy and delete only YOUR pool functions (nvcf-pool-*): - python cleanup_nvcf.py --cleanup - - # Undeploy and delete ALL functions (careful — includes other users'): - python cleanup_nvcf.py --cleanup --all - - # Undeploy and delete a specific function: - python cleanup_nvcf.py --delete FUNCTION_ID VERSION_ID -""" -import argparse -import os -import sys - -sys.path.insert(0, "/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server") - -from openhands.nvidia.os_world.nvcf import OSWorldDeployer - -POOL_NAME_PREFIX = "data-collection-" - - -def main(): - parser = argparse.ArgumentParser(description="NVCF function cleanup utility") - parser.add_argument("--list", action="store_true", help="List all functions in the org") - parser.add_argument("--cleanup", action="store_true", - help="Undeploy and delete pool functions (nvcf-pool-* only, unless --all)") - parser.add_argument("--all", action="store_true", - help="With --cleanup: delete ALL functions, not just pool ones") - parser.add_argument("--delete", nargs=2, metavar=("FUNC_ID", "VER_ID"), - help="Delete a specific function") - args = parser.parse_args() - - api_key = os.environ.get("NGC_API_KEY") - org = os.environ.get("NGC_ORG") - if not api_key or not org: - print("ERROR: Set NGC_API_KEY and NGC_ORG environment variables") - sys.exit(1) - - deployer = OSWorldDeployer(api_key=api_key, org_name=org) - - if args.list or (not args.cleanup and not args.delete): - print("Listing all private NVCF functions in org...\n") - result = deployer.list_functions() - functions = result.get("functions", []) - if not functions: - print("No functions found.") - return - for fn in functions: - fn_id = fn.get("id", "?") - name = fn.get("name", "?") - status = fn.get("status", "?") - ver_id = fn.get("versionId", "?") - mine = " <-- pool" if name.startswith(POOL_NAME_PREFIX) else "" - print(f" {name:40s} status={status:10s} fn={fn_id} ver={ver_id}{mine}") - print(f"\nTotal: {len(functions)} functions") - - if args.delete: - fn_id, ver_id = args.delete - print(f"Undeploying {fn_id}...") - try: - deployer.undeploy(fn_id, ver_id, graceful=True) - print("Undeployed. Deleting...") - except Exception as e: - print(f"Undeploy failed (may already be undeployed): {e}") - try: - deployer.delete_function(fn_id, ver_id) - print("Deleted.") - except Exception as e: - print(f"Delete failed: {e}") - - if args.cleanup: - result = deployer.list_functions() - functions = result.get("functions", []) - - if not args.all: - # Only clean up pool functions - functions = [f for f in functions if f.get("name", "").startswith(POOL_NAME_PREFIX)] - print(f"Cleaning up {len(functions)} pool functions (nvcf-pool-*)...\n") - else: - print(f"Cleaning up ALL {len(functions)} functions...\n") - - if not functions: - print("Nothing to clean up.") - return - - for fn in functions: - fn_id = fn.get("id", "?") - ver_id = fn.get("versionId", "?") - name = fn.get("name", "?") - status = fn.get("status", "?") - print(f" Cleaning up: {name} ({fn_id}) status={status}") - try: - deployer.undeploy(fn_id, ver_id, graceful=True) - print(f" Undeployed") - except Exception as e: - print(f" Undeploy skipped: {e}") - try: - deployer.delete_function(fn_id, ver_id) - print(f" Deleted") - except Exception as e: - print(f" Delete failed: {e}") - print(f"\nDone. Cleaned up {len(functions)} functions.") - - -if __name__ == "__main__": - main() diff --git a/cua/scripts/collect_trajectories_sbatch.sh b/cua/scripts/collect_trajectories_sbatch.sh deleted file mode 100755 index 208a33768..000000000 --- a/cua/scripts/collect_trajectories_sbatch.sh +++ /dev/null @@ -1,253 +0,0 @@ -#!/bin/bash -# ============================================================================ -# 2-Node Batch Trajectory Collection -# ============================================================================ -# Node 0 (Planner): Runs Qwen3-VL-235B vLLM server (tp=8, all 8 GPUs) -# Node 1 (Actor): 2x UI-TARS-1.5-7B vLLM servers (tp=4 each, GPUs 0-3 + 4-7) -# + round-robin load balancer on port 8000 -# + trajectory collection -# -# Usage: sbatch collect_trajectories_sbatch.sh -# ============================================================================ - -#SBATCH --job-name=traj_collect -#SBATCH --account=llmservice_fm_vision -#SBATCH --reservation=sla_res_osworld_agent_vlm -#SBATCH --partition=batch_block1 -#SBATCH --gpus-per-node=8 -#SBATCH --nodes=2 -#SBATCH --ntasks-per-node=1 -#SBATCH --mem=0 -#SBATCH --time=04:00:00 -#SBATCH --exclusive -#SBATCH --output=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/scripts/logs-refactor/traj_collect_%j.out -#SBATCH --error=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/scripts/logs-refactor/traj_collect_%j.err - -set -euo pipefail - -# ============================================================================ -# Configuration -# ============================================================================ -IMAGE="/lustre/fsw/portfolios/nvr/users/bcui/images/cua-vllm-0.13.0.sqsh" -PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" -PROJECT_DIR="$PROJECT_ROOT/cua" - -PLANNER_MODEL="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Qwen3-VL-235B-A22B-Thinking" -ACTOR_MODEL="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/huggingface_models/UI-TARS-1.5-7B" - -PLANNER_PORT=8000 -ACTOR_PORT=8000 # Load balancer port (what the code talks to) -ACTOR_PORT_1=8001 # Actor replica 1 (GPUs 0-3) -ACTOR_PORT_2=8002 # Actor replica 2 (GPUs 4-7) - -# Trajectory collection settings -MAX_PARALLEL=16 -MAX_TRAJECTORIES=1024 - -# Log file name (timestamped) -TIMESTAMP=$(date +%m-%d-%H%M) -LOG_FILE="$PROJECT_DIR/${TIMESTAMP}-logs.log" - -# ============================================================================ -# Resolve node assignments -# ============================================================================ -ALL_NODES=$(scontrol show hostnames "$SLURM_JOB_NODELIST") -PLANNER_NODE=$(echo "$ALL_NODES" | head -n 1) -ACTOR_NODE=$(echo "$ALL_NODES" | tail -n 1) - -echo "[sbatch] Job ID: $SLURM_JOB_ID" -echo "[sbatch] Planner Node: $PLANNER_NODE" -echo "[sbatch] Actor Node: $ACTOR_NODE" -echo "[sbatch] Log file: $LOG_FILE" - -mkdir -p "$PROJECT_DIR/scripts/logs" - -# ============================================================================ -# Launch Planner vLLM server on Node 0 -# ============================================================================ -echo "[sbatch] Starting Planner vLLM server on $PLANNER_NODE..." -srun --nodes=1 --ntasks=1 --nodelist="$PLANNER_NODE" \ - --container-image="$IMAGE" \ - --container-mounts=/lustre:/lustre \ - --container-writable \ - bash -c " - vllm serve $PLANNER_MODEL \ - --api-key gen \ - --tensor-parallel-size 8 \ - --enable-expert-parallel \ - --limit-mm-per-prompt.video 0 \ - --limit-mm-per-prompt.image 3 \ - --async-scheduling \ - --max-model-len 65536 \ - --gpu-memory-utilization 0.9 \ - > $PROJECT_DIR/scripts/logs/planner_${SLURM_JOB_ID}.log 2>&1 - " & -PLANNER_SRUN_PID=$! - -# ============================================================================ -# Launch Actor vLLM server + trajectory collection on Node 1 -# ============================================================================ -echo "[sbatch] Starting 2x Actor vLLM servers + collection on $ACTOR_NODE..." -srun --nodes=1 --ntasks=1 --nodelist="$ACTOR_NODE" \ - --container-image="$IMAGE" \ - --container-mounts=/lustre:/lustre \ - --container-writable \ - bash -c " - # --- Start Actor vLLM replica 1 (GPUs 0-3) --- - CUDA_VISIBLE_DEVICES=0,1,2,3 vllm serve $ACTOR_MODEL \ - --served-model-name ByteDance-Seed/UI-TARS-1.5-7B \ - --api-key gen \ - --port $ACTOR_PORT_1 \ - --tensor-parallel-size 4 \ - --limit-mm-per-prompt.image 5 \ - --limit-mm-per-prompt.video 0 \ - --max-model-len 65536 \ - --disable-log-requests \ - --disable-log-stats \ - > $PROJECT_DIR/scripts/logs/actor1_${SLURM_JOB_ID}.log 2>&1 & - ACTOR1_PID=\$! - - # --- Start Actor vLLM replica 2 (GPUs 4-7) --- - CUDA_VISIBLE_DEVICES=4,5,6,7 vllm serve $ACTOR_MODEL \ - --served-model-name ByteDance-Seed/UI-TARS-1.5-7B \ - --api-key gen \ - --port $ACTOR_PORT_2 \ - --tensor-parallel-size 4 \ - --limit-mm-per-prompt.image 5 \ - --limit-mm-per-prompt.video 0 \ - --max-model-len 65536 \ - --disable-log-requests \ - --disable-log-stats \ - > $PROJECT_DIR/scripts/logs/actor2_${SLURM_JOB_ID}.log 2>&1 & - ACTOR2_PID=\$! - - # --- Start round-robin load balancer on port $ACTOR_PORT --- - python3 -c ' -import http.server, http.client, threading, sys, io - -backends = [(\"localhost\", $ACTOR_PORT_1), (\"localhost\", $ACTOR_PORT_2)] -counter = 0 -lock = threading.Lock() - -class LBHandler(http.server.BaseHTTPRequestHandler): - def do_ANY(self, method): - global counter - with lock: - host, port = backends[counter % len(backends)] - counter += 1 - - content_length = int(self.headers.get(\"Content-Length\", 0)) - body = self.rfile.read(content_length) if content_length > 0 else None - - try: - conn = http.client.HTTPConnection(host, port, timeout=300) - conn.request(method, self.path, body=body, headers=dict(self.headers)) - resp = conn.getresponse() - resp_body = resp.read() - - self.send_response(resp.status) - for k, v in resp.getheaders(): - if k.lower() not in (\"transfer-encoding\",): - self.send_header(k, v) - self.end_headers() - self.wfile.write(resp_body) - conn.close() - except Exception as e: - self.send_response(502) - self.end_headers() - self.wfile.write(f\"LB error: {e}\".encode()) - - def do_GET(self): self.do_ANY(\"GET\") - def do_POST(self): self.do_ANY(\"POST\") - def do_PUT(self): self.do_ANY(\"PUT\") - def do_DELETE(self): self.do_ANY(\"DELETE\") - def log_message(self, format, *args): pass # silence logs - -server = http.server.ThreadingHTTPServer((\"0.0.0.0\", $ACTOR_PORT), LBHandler) -print(f\"[LB] Round-robin load balancer on port $ACTOR_PORT -> {backends}\", flush=True) -server.serve_forever() -' > $PROJECT_DIR/scripts/logs/actor_lb_${SLURM_JOB_ID}.log 2>&1 & - LB_PID=\$! - - # --- Wait for all servers to be healthy --- - echo '[actor-node] Waiting for vLLM servers to become healthy...' - - wait_for_server() { - local host=\$1 - local port=\$2 - local name=\$3 - local max_wait=600 - local elapsed=0 - - while [ \$elapsed -lt \$max_wait ]; do - if curl -sf http://\${host}:\${port}/health > /dev/null 2>&1; then - echo \"[actor-node] \$name server healthy (\${elapsed}s)\" - return 0 - fi - sleep 10 - elapsed=\$((elapsed + 10)) - if [ \$((elapsed % 60)) -eq 0 ]; then - echo \"[actor-node] Still waiting for \$name (\${elapsed}s)...\" - fi - done - echo \"[actor-node] ERROR: \$name server did not start within \${max_wait}s\" - return 1 - } - - # Wait for both actor replicas and planner - wait_for_server localhost $ACTOR_PORT_1 'Actor-1 (GPU 0-3)' - ACTOR1_OK=\$? - - wait_for_server localhost $ACTOR_PORT_2 'Actor-2 (GPU 4-7)' - ACTOR2_OK=\$? - - wait_for_server $PLANNER_NODE $PLANNER_PORT Planner - PLANNER_OK=\$? - - if [ \$ACTOR1_OK -ne 0 ] || [ \$ACTOR2_OK -ne 0 ] || [ \$PLANNER_OK -ne 0 ]; then - echo '[actor-node] ERROR: One or more servers failed to start.' - echo 'Planner log:' && tail -20 $PROJECT_DIR/scripts/logs/planner_${SLURM_JOB_ID}.log 2>/dev/null - echo 'Actor-1 log:' && tail -20 $PROJECT_DIR/scripts/logs/actor1_${SLURM_JOB_ID}.log 2>/dev/null - echo 'Actor-2 log:' && tail -20 $PROJECT_DIR/scripts/logs/actor2_${SLURM_JOB_ID}.log 2>/dev/null - kill \$ACTOR1_PID \$ACTOR2_PID \$LB_PID 2>/dev/null - exit 1 - fi - - echo '[actor-node] All servers healthy. Starting trajectory collection...' - - # --- Run trajectory collection --- - cd $PROJECT_DIR - source cua_env_reqs/bin/activate - export PYTHONPATH=$PROJECT_ROOT:\$PYTHONPATH - export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache - - python parallel_collect_trajectories.py \ - --planner_node $PLANNER_NODE \ - --actor_node $ACTOR_NODE \ - --runtime nvcf \ - --max_parallel $MAX_PARALLEL \ - --max_trajectories $MAX_TRAJECTORIES \ - 2>&1 | tee $LOG_FILE - - COLLECT_EXIT=\$? - - # Cleanup - kill \$ACTOR1_PID \$ACTOR2_PID \$LB_PID 2>/dev/null - echo \"[actor-node] Collection finished (exit code: \$COLLECT_EXIT)\" - exit \$COLLECT_EXIT - " & -ACTOR_SRUN_PID=$! - -# ============================================================================ -# Wait for completion -# ============================================================================ -# Wait for the actor srun (which runs collection). When it finishes, kill planner. -wait $ACTOR_SRUN_PID -COLLECT_EXIT=$? - -echo "[sbatch] Actor node finished (exit: $COLLECT_EXIT). Stopping planner..." -kill $PLANNER_SRUN_PID 2>/dev/null -wait $PLANNER_SRUN_PID 2>/dev/null - -echo "[sbatch] Done. Log: $LOG_FILE" -exit $COLLECT_EXIT diff --git a/cua/scripts/kimi/run_kimi.sbatch b/cua/scripts/kimi/run_kimi.sbatch index 1658a4064..cf0ff8a7f 100644 --- a/cua/scripts/kimi/run_kimi.sbatch +++ b/cua/scripts/kimi/run_kimi.sbatch @@ -17,7 +17,8 @@ TOTAL_TP=$(( SLURM_JOB_NUM_NODES * 8 )) TOTAL_PP=1 #TOTAL_TP=8 #TOTAL_PP=$SLURM_JOB_NUM_NODES -CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" +# CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" +CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua-vllm-0.16.0.sqsh" CONTAINER_MOUNTS="/lustre:/lustre,/tmp:/tmp" diff --git a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh index 5c4b76ad9..1d6d40b8a 100755 --- a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh +++ b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh @@ -57,50 +57,6 @@ echo "NGC_ORG: $NGC_ORG" echo "NVCF_PREFIX: $NVCF_FUNCTION_NAME_PREFIX" echo "" - -# --- Helper: run NVCF cleanup --- -nvcf_cleanup() { - echo "[nvcf] Cleaning up NVCF functions with prefix '$NVCF_FUNCTION_NAME_PREFIX'..." - NVCF_FUNCTION_NAME_PREFIX="$NVCF_FUNCTION_NAME_PREFIX" \ - NGC_API_KEY="$NGC_API_KEY" \ - NGC_ORG="$NGC_ORG" \ - python "$PROJECT_DIR/cleanup_nvcf.py" --cleanup 2>&1 || \ - echo "[nvcf] WARNING: NVCF cleanup failed (non-fatal)" -} - -# --- Cleanup: cancel Kimi server + NVCF functions on exit --- -cleanup() { - echo "" - echo "[nvcf] Cleaning up..." - - # 1. Kill collector SSH sessions - if [ ${#COLLECTOR_PIDS[@]} -gt 0 ]; then - echo "[nvcf] Killing ${#COLLECTOR_PIDS[@]} collector(s)..." - for pid in "${COLLECTOR_PIDS[@]}"; do - if kill -0 "$pid" 2>/dev/null; then - kill "$pid" 2>/dev/null - fi - done - fi - - # 2. Cancel Kimi vLLM server - if [ -n "$KIMI_JOB_ID" ]; then - echo "[nvcf] Cancelling Kimi vLLM job $KIMI_JOB_ID" - scancel "$KIMI_JOB_ID" 2>/dev/null - fi - - # 3. Remove head node file - rm -f "$LOG_DIR/head_node_${KIMI_JOB_ID}" - - # 4. Clean up any remaining NVCF functions - nvcf_cleanup -} -trap cleanup EXIT - - -# --- 0. Clean up stale NVCF functions from previous runs --- -nvcf_cleanup - # --- 1. Submit Kimi vLLM server --- echo "[nvcf] Submitting Kimi vLLM sbatch job..." KIMI_JOB_ID=$(sbatch \