diff --git a/cua/modules/debug_env_controller.py b/cua/modules/debug_env_controller.py index 71c113fb4..e37f320a2 100644 --- a/cua/modules/debug_env_controller.py +++ b/cua/modules/debug_env_controller.py @@ -1,123 +1,278 @@ import logging +import os +import sys import re -from typing import Dict, Tuple +import uuid +import time +import threading +import requests +from typing import Dict, List, Optional, Tuple -import ipdb - -from examples.setup import SetupController -from openhands.core.config import OpenHandsConfig -from openhands.events import EventStream -from openhands.events.action.os import OSWorldInteractiveAction -from openhands.events.observation import ErrorObservation -from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime -from openhands.storage import get_file_store from openhands.core.logger import openhands_logger # Create a child logger logger = openhands_logger.getChild('env_controller') logger.setLevel(logging.DEBUG) +# Semaphore to limit concurrent downloads (shared with setup.py via env var) +_DOWNLOAD_SEMAPHORE = threading.Semaphore(int(os.environ.get('OSWORLD_MAX_CONCURRENT_DOWNLOADS', '3'))) + + +def _is_desktop_env(env_or_runtime) -> bool: + """Check if the object is a DesktopEnv instance (duck-type check to avoid hard import).""" + return hasattr(env_or_runtime, 'controller') and hasattr(env_or_runtime, 'reset') + class EnvController: """ - Static Wrapper class that interfaces with OSWorldSingularityRuntime. + Static wrapper class that interfaces with either: + - OSWorldSingularityRuntime (runtime_type='singularity') + - OSWorld DesktopEnv (runtime_type='nvcf') """ + @staticmethod - async def initialize_runtime(job_id: str, vm_image_path: str, os_type: str, - osworld_setup: Dict) -> OSWorldSingularityRuntime: + def pre_download_setup_files(osworld_setup: Dict, cache_dir: str = "/tmp/osworld_cache") -> bool: """ - Initialize OSWorldSingularityRuntime. - Used by DataCollector._init_worker to boot up the VM. + Pre-download all setup files to local cache BEFORE deploying NVCF. + This avoids wasting NVCF resources if downloads fail (e.g., HF 429 errors). + + Returns True if all downloads succeeded, False otherwise. """ - config = OpenHandsConfig() - config.runtime = "osworld" - config.sandbox.base_container_image = "ubuntu:24.04" - config.sandbox.run_as_fakeroot = False - config.sandbox.runtime_container_image = None # Trigger auto-build - - # Unique event stream per trajectory - file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') - event_stream = EventStream(sid=job_id, file_store=file_store) - - logger.debug(f"[initialize_runtime] Creating runtime for {job_id}") - logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") - logger.debug(f"[initialize_runtime] Base image: {config.sandbox.base_container_image}") - - runtime = OSWorldSingularityRuntime( - config=config, - event_stream=event_stream, - sid=job_id, - os_type=os_type, - vm_image_path=vm_image_path, - attach_to_existing=False, - ) - - logger.debug(f"[initialize_runtime] Runtime object created, connecting to VM...") - - await runtime.connect() - logger.debug(f"[initialize_runtime] ✓ Runtime initialized and connected for {job_id}") - logger.debug(f"[initialize_runtime] VM URL: {runtime.osworld_vm_url if hasattr(runtime, 'osworld_vm_url') else 'N/A'}") - - if osworld_setup and os_type == "linux": - logger.debug(f"[initialize_runtime] Setting up OSWorld...") - logger.debug(f"[initialize_runtime OSWorld Setup: {osworld_setup}") - setup_controller = SetupController( - vm_ip="127.0.0.1", - server_port=runtime._vm_server_port, - chromium_port=runtime._chromium_port, - cache_dir="/tmp/osworld_example", # might need to be changed to a unique directory for each job - client_password="password", - runtime=runtime - ) - await setup_controller.setup(osworld_setup['config']) - logger.debug(f"[initialize_runtime] ✓ OSWorld setup completed") - else: - logger.debug(f"[initialize_runtime] No OSWorld setup provided") + config_list = osworld_setup.get("config", []) + if not config_list: + return True - return runtime + os.makedirs(cache_dir, exist_ok=True) - @staticmethod - def execute_pyautogui_command(runtime: OSWorldSingularityRuntime, pyautogui_command: str): - pyautogui_action = OSWorldInteractiveAction( - method="execute_python_command", - params={ - "command": pyautogui_command, - } - ) - result = runtime.run_action(pyautogui_action) - - if not isinstance(result, ErrorObservation): - logger.debug("[execute_pyautogui_command] ✓ Action complete") - else: - logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") + # Build headers with HF token if available + dl_headers = {} + hf_token = os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') + if hf_token: + dl_headers['Authorization'] = f'Bearer {hf_token}' - @staticmethod - def get_screen_size(runtime: OSWorldSingularityRuntime) -> Tuple[int, int]: - observation = runtime.run_action(OSWorldInteractiveAction( - method="get_vm_screen_size", - params={}, - thought="" - )) + for cfg in config_list: + if cfg.get("type") != "download": + continue - assert hasattr(observation, "content"), "get_screen_size failed." + files = cfg.get("parameters", {}).get("files", []) + for f in files: + url = f.get("url", "") + path = f.get("path", "") + if not url or not path: + continue - match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) - width, height = int(match.group(1)), int(match.group(2)) + cache_path = os.path.join(cache_dir, "{:}_{:}".format( + uuid.uuid5(uuid.NAMESPACE_URL, url), + os.path.basename(path))) - return width, height + if os.path.exists(cache_path): + logger.info(f"[pre_download] Cache hit: {cache_path}") + continue + + logger.info(f"[pre_download] Downloading {url} to cache...") + max_retries = 8 + downloaded = False + last_error = None + + with _DOWNLOAD_SEMAPHORE: + for i in range(max_retries): + try: + backoff = min(2 ** i + 1, 60) + if i > 0: + logger.info(f"[pre_download] Waiting {backoff}s before retry {i+1}/{max_retries}") + time.sleep(backoff) + + response = requests.get(url, stream=True, timeout=300, headers=dl_headers) + response.raise_for_status() + + downloaded_size = 0 + with open(cache_path, 'wb') as fh: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + fh.write(chunk) + downloaded_size += len(chunk) + + logger.info(f"[pre_download] Downloaded {downloaded_size / (1024*1024):.2f} MB to {cache_path}") + downloaded = True + break + + except requests.RequestException as e: + last_error = e + logger.warning(f"[pre_download] Failed {url}: {e} ({max_retries - i - 1} retries left)") + if os.path.exists(cache_path): + os.remove(cache_path) + + if not downloaded: + logger.error(f"[pre_download] All retries exhausted for {url}. Last error: {last_error}") + return False + + return True @staticmethod - def get_screenshot(runtime: OSWorldSingularityRuntime) -> bytes: + async def initialize_runtime( + job_id: str, + vm_image_path: str, + os_type: str, + osworld_setup: Dict, + runtime_type: str = "singularity", + nvcf_function_id: Optional[str] = None, + nvcf_version_id: Optional[str] = None, + nvcf_api_key: Optional[str] = None, + nvcf_org: Optional[str] = None, + ): """ - Returns the current screenshot from the runtime, in base64 format. - If screenshot_path is set, save the screenshot as png. + Initialize runtime. + + runtime_type='singularity': uses OSWorldSingularityRuntime (local KVM). + runtime_type='nvcf': uses OSWorld DesktopEnv with NVCFProvider. """ - screenshot = runtime.get_vm_screenshot() - if not screenshot: - logger.debug("✗ Failed to get screenshot from runtime.") - raise RuntimeError("Failed to get screenshot from runtime.") + logger.debug(f"[initialize_runtime] Creating {runtime_type} runtime for {job_id}") + + if runtime_type == "nvcf": + # Lazy import DesktopEnv only when needed + _osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" + if _osworld_path not in sys.path: + sys.path.insert(0, _osworld_path) + from desktop_env.desktop_env import DesktopEnv - return screenshot + if nvcf_api_key: + os.environ.setdefault("NGC_API_KEY", nvcf_api_key) + if nvcf_org: + os.environ.setdefault("NGC_ORG", nvcf_org) + if nvcf_function_id: + os.environ["NVCF_FUNCTION_ID"] = nvcf_function_id + # Set function name prefix for NVCF deployments + os.environ.setdefault("OSWORLD_SETUP_CACHE_DIR", "/tmp/osworld_cache") + fn_prefix = os.environ.get("NVCF_FUNCTION_NAME_PREFIX", "data-collection") + os.environ.setdefault("NVCF_FUNCTION_NAME_PREFIX", fn_prefix) + if nvcf_version_id: + os.environ["NVCF_VERSION_ID"] = nvcf_version_id + env = DesktopEnv( + provider_name="nvcf", + path_to_vm="", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, + ) + + logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") + env.reset(task_config=osworld_setup) + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") + return env + + else: + # Singularity backend + from examples.setup import SetupController + from openhands.core.config import OpenHandsConfig + from openhands.events import EventStream + from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime + from openhands.storage import get_file_store + + config = OpenHandsConfig() + config.runtime = "osworld" + config.sandbox.base_container_image = "ubuntu:24.04" + config.sandbox.run_as_fakeroot = False + config.sandbox.runtime_container_image = None + + file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') + event_stream = EventStream(sid=job_id, file_store=file_store) + + logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") + + runtime = OSWorldSingularityRuntime( + config=config, + event_stream=event_stream, + sid=job_id, + os_type=os_type, + vm_image_path=vm_image_path, + attach_to_existing=False, + ) + + await runtime.connect() + logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") + + if osworld_setup and os_type == "linux" and osworld_setup.get('config'): + logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") + setup_controller = SetupController( + vm_ip="127.0.0.1", + server_port=runtime._vm_server_port, + chromium_port=runtime._chromium_port, + cache_dir="/tmp/osworld_example", + client_password="password", + runtime=runtime + ) + try: + await setup_controller.setup(osworld_setup['config']) + logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") + except Exception as e: + logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") + raise + else: + logger.debug(f"[initialize_runtime] No OSWorld setup provided") + return runtime + + @staticmethod + def execute_pyautogui_command(env_or_runtime, pyautogui_command: str): + """Execute a pyautogui command. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + env_or_runtime.controller.execute_python_command(pyautogui_command) + logger.debug("[execute_pyautogui_command] Action complete") + except Exception as e: + logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") + else: + from openhands.events.action.os import OSWorldInteractiveAction + from openhands.events.observation import ErrorObservation + action = OSWorldInteractiveAction( + method="execute_python_command", + params={"command": pyautogui_command}, + ) + result = env_or_runtime.run_action(action) + if not isinstance(result, ErrorObservation): + logger.debug("[execute_pyautogui_command] Action complete") + else: + logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") + + @staticmethod + def get_screen_size(env_or_runtime) -> Tuple[int, int]: + """Get screen size. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + size = env_or_runtime.controller.get_vm_screen_size() + if isinstance(size, tuple) and len(size) == 2: + return size + if isinstance(size, str): + match = re.search(r"(\d+)\D+(\d+)", size) + if match: + return int(match.group(1)), int(match.group(2)) + except Exception as e: + logger.warning(f"[get_screen_size] Failed: {e}, using defaults") + return env_or_runtime.screen_width, env_or_runtime.screen_height + else: + from openhands.events.action.os import OSWorldInteractiveAction + observation = env_or_runtime.run_action(OSWorldInteractiveAction( + method="get_vm_screen_size", + params={}, + thought="" + )) + assert hasattr(observation, "content"), "get_screen_size failed." + match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) + return int(match.group(1)), int(match.group(2)) + + @staticmethod + def get_screenshot(env_or_runtime) -> bytes: + """Get screenshot. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + screenshot = env_or_runtime.controller.get_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from DesktopEnv.") + return screenshot + else: + screenshot = env_or_runtime.get_vm_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from runtime.") + return screenshot diff --git a/cua/modules/module_data_collector.py b/cua/modules/module_data_collector.py index eb9233493..2c3d75772 100644 --- a/cua/modules/module_data_collector.py +++ b/cua/modules/module_data_collector.py @@ -12,8 +12,6 @@ from pathlib import Path from typing import Optional, Dict, Any, Tuple -import ipdb - from modules.actors.debug_uitars_actor import UITarsActor from modules.debug_planner import Planner from modules.debug_env_controller import EnvController @@ -42,6 +40,13 @@ def __init__(self, args: Namespace): self.vm_image_path = args.vm_image_path self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows' + # Runtime type: "singularity" (local KVM) or "nvcf" (NVCF via OSWorld DesktopEnv) + self.runtime_type = getattr(args, 'runtime', 'singularity') + + # NVCF credentials (passed via env vars to OSWorld's NVCFProvider) + self.nvcf_api_key = getattr(args, 'nvcf_api_key', None) + self.nvcf_org = getattr(args, 'nvcf_org', None) + self.max_steps_per_trajectory = args.max_steps_per_trajectory self.max_steps_per_goal = args.max_steps_per_goal @@ -117,10 +122,15 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path): logger.debug(f"✓ [save_trajectory] Saved to {str(trajectory_save_dir / 'trajectory.json')}") - async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: + async def init_runtime_for_job(self, trajectory_idx: int, + nvcf_function_id: str = None, + nvcf_version_id: str = None) -> Tuple: """ Stage 1: Initialize the VM and OSWorld setup. - Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) + Returns: (env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) + + Uses OSWorld's DesktopEnv which handles NVCF deploy, local proxy, + and environment setup internally. """ # Create unique IDs job_id = f"job_{trajectory_idx:04d}" @@ -140,13 +150,35 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: else: osworld_setup_ready = True - # Initialize Runtime (Async) - runtime = await EnvController.initialize_runtime( - job_id, self.vm_image_path, self.os_type, osworld_setup + logger.info(f"[job {trajectory_idx:04d}] Sampled OSWorld config: id={osworld_setup.get('id', 'unknown')}, " + f"snapshot={osworld_setup.get('snapshot', 'unknown')}, " + f"apps={osworld_setup.get('related_apps', [])}, " + f"instruction={osworld_setup.get('instruction', '')[:80]}") + + # Pre-download setup files to local cache BEFORE deploying NVCF. + # This avoids wasting expensive NVCF GPU resources if downloads fail. + if self.runtime_type == "nvcf": + logger.info(f"[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...") + download_ok = EnvController.pre_download_setup_files(osworld_setup) + if not download_ok: + raise RuntimeError( + f"[job {trajectory_idx:04d}] Setup file pre-download failed. " + f"Skipping NVCF deploy to avoid wasting resources." + ) + logger.info(f"[job {trajectory_idx:04d}] Pre-download complete, proceeding with NVCF deploy.") + + # Initialize DesktopEnv (handles NVCF deploy + proxy + setup internally) + env = await EnvController.initialize_runtime( + job_id, self.vm_image_path, self.os_type, osworld_setup, + runtime_type=self.runtime_type, + nvcf_function_id=nvcf_function_id, + nvcf_version_id=nvcf_version_id, + nvcf_api_key=self.nvcf_api_key, + nvcf_org=self.nvcf_org, ) # Get screen size - width, height = EnvController.get_screen_size(runtime) + width, height = EnvController.get_screen_size(env) # Prepare Metadata trajectory = { @@ -160,25 +192,24 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: 'steps': [], } - return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup + return env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup - async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict): + async def collect_trajectory(self, env, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict): """ Stage 2: Run the Agent Loop (Goal Generation -> Action Execution). + `env` is an OSWorld DesktopEnv instance. """ # Wait for UI initialization time.sleep(3.0) # Initial Screenshot - screenshot_bytes = EnvController.get_screenshot(runtime) + screenshot_bytes = EnvController.get_screenshot(env) image_filename = trajectory_save_dir / f"0-0.png" save_image(screenshot_bytes, image_filename, logger) # --- 1. Generate High Level Goal --- # - # todo implement the verification mechanism for goal achievability using requirements - # generate goal in a separate loop - prev_requirements = [] # will be a list of tuple [("condition 1", "verdict 1"), ...] - example_goals = random.sample(self.example_instructions, 1) # for now, we sample 1 example goal + prev_requirements = [] + example_goals = random.sample(self.example_instructions, 1) goal, requirements = self.planner.generate_goal_with_long_horizon( screenshot_bytes, osworld_setup["config"], example_goals, prev_requirements, ) @@ -231,21 +262,19 @@ async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_di ) if action_result is None: - # UI-TARS action generation failed (failed to meet the requirement) - # in this case, save only up to the current trajectory break pyautogui_command = action_result["pyautogui_command"] action_generation = action_result["action_generation"] # Execute - EnvController.execute_pyautogui_command(runtime, pyautogui_command) + EnvController.execute_pyautogui_command(env, pyautogui_command) # Wait & Observe time.sleep(3.0) # Capture new state - screenshot_bytes = EnvController.get_screenshot(runtime) + screenshot_bytes = EnvController.get_screenshot(env) # Save step info action_idx = len(step_for_this_subgoal['actions']) @@ -276,11 +305,11 @@ async def single_trajectory_job(self, trajectory_idx: int): Simply chains the two stages sequentially in the main thread. """ # 1. Init - runtime, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx) + env, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx) try: # 2. Collect - await self.collect_trajectory(runtime, trajectory_data, save_dir, setup) + await self.collect_trajectory(env, trajectory_data, save_dir, setup) finally: - # Cleanup for debug mode - runtime.close() + # Cleanup + env.close() diff --git a/cua/modules_kimi/data_collector.py b/cua/modules_kimi/data_collector.py index d75800bf4..0db547468 100644 --- a/cua/modules_kimi/data_collector.py +++ b/cua/modules_kimi/data_collector.py @@ -16,7 +16,6 @@ from pathlib import Path from typing import Optional, Dict, Any, Tuple -import ipdb from modules_kimi.kimi_actor import KimiActor from modules_kimi.env_controller import EnvController @@ -44,6 +43,9 @@ def __init__(self, args: Namespace): self.vm_image_path = args.vm_image_path self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows' + # Runtime type: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv) + self.runtime_type = getattr(args, 'runtime', 'singularity') + self.max_steps_per_trajectory = args.max_steps_per_trajectory # Output directory: cua/trajectories/kimi/-- @@ -85,7 +87,7 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path): logger.debug(f"Saved trajectory to {trajectory_save_dir / 'trajectory.json'}") - async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: + async def init_runtime_for_job(self, trajectory_idx: int, nvcf_function_id: str = None, nvcf_version_id: str = None) -> Tuple: """ Stage 1: Initialize the VM and OSWorld setup. Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup) @@ -109,13 +111,27 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: # else: # osworld_setup_ready = True + # Pre-download setup files before NVCF deploy to avoid wasting GPU resources + if self.runtime_type == 'nvcf': + logger.info(f'[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...') + download_ok = EnvController.pre_download_setup_files(osworld_setup) + if not download_ok: + raise RuntimeError( + f'[job {trajectory_idx:04d}] Setup file pre-download failed. ' + f'Skipping NVCF deploy to avoid wasting resources.' + ) + logger.info(f'[job {trajectory_idx:04d}] Pre-download complete.') + # Initialize runtime - runtime = await EnvController.initialize_runtime( - job_id, self.vm_image_path, self.os_type, osworld_setup + env_or_runtime = await EnvController.initialize_runtime( + job_id, self.vm_image_path, self.os_type, osworld_setup, + runtime_type=self.runtime_type, + nvcf_function_id=nvcf_function_id, + nvcf_version_id=nvcf_version_id, ) # Get screen size - width, height = EnvController.get_screen_size(runtime) + width, height = EnvController.get_screen_size(env_or_runtime) trajectory = { "trajectory_id": trajectory_id, @@ -129,7 +145,7 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple: "steps": [], } - return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup + return env_or_runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup async def collect_trajectory( self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict diff --git a/cua/modules_kimi/env_controller.py b/cua/modules_kimi/env_controller.py index fbd8d0594..e83ca9cea 100644 --- a/cua/modules_kimi/env_controller.py +++ b/cua/modules_kimi/env_controller.py @@ -1,108 +1,273 @@ import logging +import os +import sys import re -from typing import Dict, Tuple - -from examples.setup import SetupController -from openhands.core.config import OpenHandsConfig -from openhands.events import EventStream -from openhands.events.action.os import OSWorldInteractiveAction -from openhands.events.observation import ErrorObservation -from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime -from openhands.storage import get_file_store +import uuid +import time +import threading +import requests +from typing import Dict, Optional, Tuple + from openhands.core.logger import openhands_logger logger = openhands_logger.getChild('kimi_env_controller') logger.setLevel(logging.DEBUG) +# Semaphore to limit concurrent downloads +_DOWNLOAD_SEMAPHORE = threading.Semaphore(int(os.environ.get('OSWORLD_MAX_CONCURRENT_DOWNLOADS', '3'))) + + +def _is_desktop_env(env_or_runtime) -> bool: + """Check if the object is a DesktopEnv instance (duck-type check to avoid hard import).""" + return hasattr(env_or_runtime, 'controller') and hasattr(env_or_runtime, 'reset') + class EnvController: """ - Static wrapper class that interfaces with OSWorldSingularityRuntime. - Copied from modules/debug_env_controller.py for self-containment. + Static wrapper class that interfaces with either: + - OSWorldSingularityRuntime (runtime_type='singularity') + - OSWorld DesktopEnv (runtime_type='nvcf') """ @staticmethod - async def initialize_runtime(job_id: str, vm_image_path: str, os_type: str, - osworld_setup: Dict) -> OSWorldSingularityRuntime: - config = OpenHandsConfig() - config.runtime = "osworld" - config.sandbox.base_container_image = "ubuntu:24.04" - config.sandbox.run_as_fakeroot = False - config.sandbox.runtime_container_image = None - - file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') - event_stream = EventStream(sid=job_id, file_store=file_store) - - logger.debug(f"[initialize_runtime] Creating runtime for {job_id}") - logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") - - runtime = OSWorldSingularityRuntime( - config=config, - event_stream=event_stream, - sid=job_id, - os_type=os_type, - vm_image_path=vm_image_path, - attach_to_existing=False, - ) - - await runtime.connect() - logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") - - if osworld_setup and os_type == "linux" and osworld_setup.get('config'): - logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") - setup_controller = SetupController( - vm_ip="127.0.0.1", - server_port=runtime._vm_server_port, - chromium_port=runtime._chromium_port, - cache_dir="/tmp/osworld_example", - client_password="password", - runtime=runtime - ) - try: - await setup_controller.setup(osworld_setup['config']) - logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") - except Exception as e: - logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") - logger.error(f"[initialize_runtime] [{job_id}] Failed config: {osworld_setup.get('config', [])}") - raise - else: - logger.debug(f"[initialize_runtime] No OSWorld setup provided") + def pre_download_setup_files(osworld_setup: Dict, cache_dir: str = "/tmp/osworld_cache") -> bool: + """ + Pre-download all setup files to local cache BEFORE deploying NVCF. + Returns True if all downloads succeeded, False otherwise. + """ + config_list = osworld_setup.get("config", []) + if not config_list: + return True + + os.makedirs(cache_dir, exist_ok=True) + + dl_headers = {} + hf_token = os.environ.get('HF_TOKEN') or os.environ.get('HUGGING_FACE_HUB_TOKEN') + if hf_token: + dl_headers['Authorization'] = f'Bearer {hf_token}' + + for cfg in config_list: + if cfg.get("type") != "download": + continue + + files = cfg.get("parameters", {}).get("files", []) + for f in files: + url = f.get("url", "") + path = f.get("path", "") + if not url or not path: + continue - return runtime + cache_path = os.path.join(cache_dir, "{:}_{:}".format( + uuid.uuid5(uuid.NAMESPACE_URL, url), + os.path.basename(path))) + + if os.path.exists(cache_path): + logger.info(f"[pre_download] Cache hit: {cache_path}") + continue + + logger.info(f"[pre_download] Downloading {url} to cache...") + max_retries = 8 + downloaded = False + last_error = None + + with _DOWNLOAD_SEMAPHORE: + for i in range(max_retries): + try: + backoff = min(2 ** i + 1, 60) + if i > 0: + logger.info(f"[pre_download] Waiting {backoff}s before retry {i+1}/{max_retries}") + time.sleep(backoff) + + response = requests.get(url, stream=True, timeout=300, headers=dl_headers) + response.raise_for_status() + + downloaded_size = 0 + with open(cache_path, 'wb') as fh: + for chunk in response.iter_content(chunk_size=8192): + if chunk: + fh.write(chunk) + downloaded_size += len(chunk) + + logger.info(f"[pre_download] Downloaded {downloaded_size / (1024*1024):.2f} MB to {cache_path}") + downloaded = True + break + + except requests.RequestException as e: + last_error = e + logger.warning(f"[pre_download] Failed {url}: {e} ({max_retries - i - 1} retries left)") + if os.path.exists(cache_path): + os.remove(cache_path) + + if not downloaded: + logger.error(f"[pre_download] All retries exhausted for {url}. Last error: {last_error}") + return False + + return True @staticmethod - def execute_pyautogui_command(runtime: OSWorldSingularityRuntime, pyautogui_command: str): - pyautogui_action = OSWorldInteractiveAction( - method="execute_python_command", - params={"command": pyautogui_command}, - ) - result = runtime.run_action(pyautogui_action) - - if not isinstance(result, ErrorObservation): - logger.debug("[execute_pyautogui_command] Action complete") + async def initialize_runtime( + job_id: str, + vm_image_path: str, + os_type: str, + osworld_setup: Dict, + runtime_type: str = "singularity", + nvcf_function_id: Optional[str] = None, + nvcf_version_id: Optional[str] = None, + nvcf_api_key: Optional[str] = None, + nvcf_org: Optional[str] = None, + ): + """ + Initialize runtime. + + runtime_type='singularity': uses OSWorldSingularityRuntime (local KVM). + runtime_type='nvcf': uses OSWorld DesktopEnv with NVCFProvider. + """ + logger.debug(f"[initialize_runtime] Creating {runtime_type} runtime for {job_id}") + + if runtime_type == "nvcf": + _osworld_path = "/lustre/fsw/portfolios/nvr/users/bcui/OSWorld" + if _osworld_path not in sys.path: + sys.path.insert(0, _osworld_path) + from desktop_env.desktop_env import DesktopEnv + + if nvcf_api_key: + os.environ.setdefault("NGC_API_KEY", nvcf_api_key) + if nvcf_org: + os.environ.setdefault("NGC_ORG", nvcf_org) + if nvcf_function_id: + os.environ["NVCF_FUNCTION_ID"] = nvcf_function_id + # Set function name prefix for NVCF deployments + os.environ.setdefault("OSWORLD_SETUP_CACHE_DIR", "/tmp/osworld_cache") + fn_prefix = os.environ.get("NVCF_FUNCTION_NAME_PREFIX", "data-collection") + os.environ.setdefault("NVCF_FUNCTION_NAME_PREFIX", fn_prefix) + + if nvcf_version_id: + os.environ["NVCF_VERSION_ID"] = nvcf_version_id + + env = DesktopEnv( + provider_name="nvcf", + path_to_vm="", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, + ) + + logger.debug(f"[initialize_runtime] DesktopEnv created, resetting with OSWorld setup...") + env.reset(task_config=osworld_setup) + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") + return env + else: - logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") + # Singularity backend + from examples.setup import SetupController + from openhands.core.config import OpenHandsConfig + from openhands.events import EventStream + from openhands.runtime.impl.singularity.osworld_singularity_runtime import OSWorldSingularityRuntime + from openhands.storage import get_file_store - @staticmethod - def get_screen_size(runtime: OSWorldSingularityRuntime) -> Tuple[int, int]: - observation = runtime.run_action(OSWorldInteractiveAction( - method="get_vm_screen_size", - params={}, - thought="" - )) + config = OpenHandsConfig() + config.runtime = "osworld" + config.sandbox.base_container_image = "ubuntu:24.04" + config.sandbox.run_as_fakeroot = False + config.sandbox.runtime_container_image = None + + file_store = get_file_store('local', f'/tmp/synthetic_data_gen_{job_id}') + event_stream = EventStream(sid=job_id, file_store=file_store) + + logger.debug(f"[initialize_runtime] VM image: {vm_image_path}") + + runtime = OSWorldSingularityRuntime( + config=config, + event_stream=event_stream, + sid=job_id, + os_type=os_type, + vm_image_path=vm_image_path, + attach_to_existing=False, + ) - assert hasattr(observation, "content"), "get_screen_size failed." + await runtime.connect() + logger.debug(f"[initialize_runtime] Runtime initialized and connected for {job_id}") - match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) - width, height = int(match.group(1)), int(match.group(2)) + if osworld_setup and os_type == "linux" and osworld_setup.get('config'): + logger.info(f"[initialize_runtime] [{job_id}] Setting up OSWorld with {len(osworld_setup['config'])} config step(s)") + setup_controller = SetupController( + vm_ip="127.0.0.1", + server_port=runtime._vm_server_port, + chromium_port=runtime._chromium_port, + cache_dir="/tmp/osworld_example", + client_password="password", + runtime=runtime + ) + try: + await setup_controller.setup(osworld_setup['config']) + logger.info(f"[initialize_runtime] [{job_id}] OSWorld setup completed successfully") + except Exception as e: + logger.error(f"[initialize_runtime] [{job_id}] OSWorld setup FAILED: {e}") + raise + else: + logger.debug(f"[initialize_runtime] No OSWorld setup provided") - return width, height + return runtime @staticmethod - def get_screenshot(runtime: OSWorldSingularityRuntime) -> bytes: - screenshot = runtime.get_vm_screenshot() - if not screenshot: - logger.debug("Failed to get screenshot from runtime.") - raise RuntimeError("Failed to get screenshot from runtime.") + def execute_pyautogui_command(env_or_runtime, pyautogui_command: str): + """Execute a pyautogui command. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + env_or_runtime.controller.execute_python_command(pyautogui_command) + logger.debug("[execute_pyautogui_command] Action complete") + except Exception as e: + logger.debug(f"[execute_pyautogui_command] Error in Action: {e}") + else: + from openhands.events.action.os import OSWorldInteractiveAction + from openhands.events.observation import ErrorObservation + action = OSWorldInteractiveAction( + method="execute_python_command", + params={"command": pyautogui_command}, + ) + result = env_or_runtime.run_action(action) + if not isinstance(result, ErrorObservation): + logger.debug("[execute_pyautogui_command] Action complete") + else: + logger.debug(f"[execute_pyautogui_command] Error in Action: {result}") - return screenshot + @staticmethod + def get_screen_size(env_or_runtime) -> Tuple[int, int]: + """Get screen size. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + try: + size = env_or_runtime.controller.get_vm_screen_size() + if isinstance(size, tuple) and len(size) == 2: + return size + if isinstance(size, str): + match = re.search(r"(\d+)\D+(\d+)", size) + if match: + return int(match.group(1)), int(match.group(2)) + except Exception as e: + logger.warning(f"[get_screen_size] Failed: {e}, using defaults") + return env_or_runtime.screen_width, env_or_runtime.screen_height + else: + from openhands.events.action.os import OSWorldInteractiveAction + observation = env_or_runtime.run_action(OSWorldInteractiveAction( + method="get_vm_screen_size", + params={}, + thought="" + )) + assert hasattr(observation, "content"), "get_screen_size failed." + match = re.search(r"Width: (\d+), Height: (\d+)", observation.content) + return int(match.group(1)), int(match.group(2)) + + @staticmethod + def get_screenshot(env_or_runtime) -> bytes: + """Get screenshot. Works with both DesktopEnv and OSWorldSingularityRuntime.""" + if _is_desktop_env(env_or_runtime): + screenshot = env_or_runtime.controller.get_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from DesktopEnv.") + return screenshot + else: + screenshot = env_or_runtime.get_vm_screenshot() + if not screenshot: + raise RuntimeError("Failed to get screenshot from runtime.") + return screenshot diff --git a/cua/parallel_collect_kimi.py b/cua/parallel_collect_kimi.py index b1ec1c29a..a41fe9b24 100644 --- a/cua/parallel_collect_kimi.py +++ b/cua/parallel_collect_kimi.py @@ -59,6 +59,7 @@ def __init__(self, args, data_collector: DataCollector): self.data_collector = data_collector self.max_parallel = args.max_parallel self.max_trajectories = args.max_trajectories + self.runtime_type = getattr(args, 'runtime', 'singularity') # Queues self.init_queue: queue.Queue = queue.Queue() @@ -80,7 +81,7 @@ def __init__(self, args, data_collector: DataCollector): # For sequential VM start-ups to mitigate boot storm self._launch_lock = threading.Lock() self._last_launch_time = 0 - self._launch_delay_seconds = 15.0 # Wait 15s between starts + self._launch_delay_seconds = 0.0 if self.runtime_type == "nvcf" else 15.0 def start_workers(self): self._server_running = True @@ -164,6 +165,12 @@ async def _init_worker(self, worker_id: int): job_details.completed = False job_details.event.set() + if job_details.runtime: + try: + job_details.runtime.close() + except Exception: + pass + self._runtime_semaphore.release() with self._active_runtime_lock: self._active_runtime_count -= 1 @@ -261,6 +268,10 @@ def parse_args(): parser.add_argument("--kimi_model_name", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Kimi-K2.5") + # Runtime selection + parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf"], default="singularity", + help="Runtime backend: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)") + # Environment & Setup parser.add_argument("--vm_image_path", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server/OS_images/Ubuntu.qcow2") diff --git a/cua/parallel_collect_trajectories.py b/cua/parallel_collect_trajectories.py index 53f91753f..da5a0de87 100644 --- a/cua/parallel_collect_trajectories.py +++ b/cua/parallel_collect_trajectories.py @@ -11,7 +11,7 @@ from openhands.core.logger import openhands_logger # Configure logging -openhands_logger.setLevel(logging.WARNING) +openhands_logger.setLevel(logging.DEBUG) logger = openhands_logger.getChild('parallel_collector') logger.setLevel(logging.INFO) @@ -35,7 +35,7 @@ def __init__(self, index: int): self.error: Optional[str] = None # Runtime objects (populated during execution) - self.runtime: Any = None + self.env: Any = None # OSWorld DesktopEnv instance self.trajectory_data: Optional[Dict] = None self.save_dir: Any = None self.osworld_setup: Any = None @@ -46,6 +46,7 @@ def __init__(self, args, data_collector: DataCollector): self.data_collector = data_collector self.max_parallel = args.max_parallel self.max_trajectories = args.max_trajectories + self.runtime_type = getattr(args, 'runtime', 'singularity') # Queues self.init_queue: queue.Queue = queue.Queue() @@ -65,9 +66,10 @@ def __init__(self, args, data_collector: DataCollector): self._server_running = False # For sequential VM start-ups to mitigate boot storm + # NVCF uses pre-deployed VMs so no boot storm delay needed self._launch_lock = threading.Lock() self._last_launch_time = 0 - self._launch_delay_seconds = 15.0 # Wait 15s between starts + self._launch_delay_seconds = 0.0 if self.runtime_type == "nvcf" else 15.0 def start_workers(self): self._server_running = True @@ -112,23 +114,16 @@ async def _init_worker(self, worker_id: int): job_details = self.jobs[job_idx] # Wait for available runtime slot - # logger.debug(f"[init-{worker_id}] Waiting for slot for job {job_idx}") await asyncio.to_thread(self._runtime_semaphore.acquire) # Rate Limit Logic: Prevent Boot Storm wait_time = 0.0 with self._launch_lock: now = time.time() - # The earliest this worker can start is either NOW, - # or 15s after the last scheduled launch. target_start_time = max(now, self._last_launch_time + self._launch_delay_seconds) - wait_time = target_start_time - now - - # Reserve this slot by updating the global timestamp immediately self._last_launch_time = target_start_time - # Perform the wait asynchronously (outside the lock) if wait_time > 0: if wait_time > 1.0: logger.info(f"[init-{worker_id}] Delayed boot-up: waiting {wait_time:.1f}s...") @@ -141,14 +136,14 @@ async def _init_worker(self, worker_id: int): try: # --- call init_runtime_for_job --- # - # This creates the runtime and runs setup - runtime, traj_data, save_dir, traj_id, setup = \ + # This creates the DesktopEnv, deploys NVCF (if needed), and runs setup + env, traj_data, save_dir, traj_id, setup = \ await self.data_collector.init_runtime_for_job(job_idx) # Store details in the pre-allocated object - job_details.job_id = traj_id # Using traj_id as primary ID + job_details.job_id = traj_id job_details.trajectory_id = traj_id - job_details.runtime = runtime + job_details.env = env job_details.trajectory_data = traj_data job_details.save_dir = save_dir job_details.osworld_setup = setup @@ -159,8 +154,15 @@ async def _init_worker(self, worker_id: int): except Exception as e: logger.error(f"[init-{worker_id}] Failed setup for job {job_idx}: {e}") job_details.error = str(e) - job_details.completed = False # Failed - job_details.event.set() # Signal main thread we are done (failed) + job_details.completed = False + job_details.event.set() + + # Close DesktopEnv if it was created + if job_details.env: + try: + job_details.env.close() + except Exception: + pass # Release semaphore immediately on failure self._runtime_semaphore.release() @@ -184,7 +186,7 @@ async def _collect_worker(self, worker_id: int): # --- call collect_trajectory --- # This runs the Planner/Actor loop await self.data_collector.collect_trajectory( - job_details.runtime, + job_details.env, job_details.trajectory_data, job_details.save_dir, job_details.osworld_setup @@ -196,10 +198,9 @@ async def _collect_worker(self, worker_id: int): logger.error(f"[collect-{worker_id}] Error in {traj_id}: {e}") job_details.error = str(e) finally: - # Cleanup Runtime - if job_details.runtime: - # Run close in background thread to not block loop - threading.Thread(target=job_details.runtime.close, daemon=True).start() + # Cleanup DesktopEnv (closes NVCF proxy, undeploys function) + if job_details.env: + threading.Thread(target=job_details.env.close, daemon=True).start() # Release semaphore (allows new Init worker to proceed) self._runtime_semaphore.release() @@ -269,6 +270,10 @@ def parse_args(): parser.add_argument("--planner_node", type=str, required=True) parser.add_argument("--actor_node", type=str, required=True) + # Runtime selection + parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf"], default="singularity", + help="Runtime backend: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)") + # Environment & Setup parser.add_argument("--vm_image_path", type=str, default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server/OS_images/Ubuntu.qcow2") @@ -295,8 +300,7 @@ def parse_args(): # Parallel specific args parser.add_argument("--max_parallel", type=int, default=24, help="Max concurrent VMs") parser.add_argument( - "--max_trajectories", type=int, default=10000, help="Total trajectories to generate" - ) + "--max_trajectories", type=int, default=10000, help="Total trajectories to generate") return parser.parse_args() @@ -309,6 +313,9 @@ async def main(): logger.info("DataCollector initialized (datasets loaded)") # 2. Start Parallel Generator + # Each worker's DesktopEnv manages its own NVCF function lifecycle + # (deploy, local proxy, health monitoring, undeploy on close) + # No centralized NVCFPool needed - OSWorld's NVCFProvider handles everything. generator = ParallelTrajectoryGenerator(args, data_collector) await generator.run() diff --git a/cua/scripts/kimi/run_collector_kimi.sh b/cua/scripts/kimi/run_collector_kimi.sh index 1d8a32d3b..a8d2238eb 100644 --- a/cua/scripts/kimi/run_collector_kimi.sh +++ b/cua/scripts/kimi/run_collector_kimi.sh @@ -24,7 +24,8 @@ COLLECTOR_IDX="${1:-0}" LOG_DIR="${LOG_DIR:-./logs}" # Configs -PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +# PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua_cpu.sqsh" diff --git a/cua/scripts/kimi/run_collector_kimi_nvcf.sh b/cua/scripts/kimi/run_collector_kimi_nvcf.sh new file mode 100755 index 000000000..adf627a76 --- /dev/null +++ b/cua/scripts/kimi/run_collector_kimi_nvcf.sh @@ -0,0 +1,167 @@ +#!/bin/bash +# ============================================================================ +# Kimi NVCF Collector Launcher (SSH+Enroot Pattern) +# ============================================================================ +# Same as run_collector_kimi.sh but passes --runtime nvcf. +# No /dev/kvm reservation needed — VMs are remote NVCF instances. +# +# Required env vars: +# MODEL_NODE - hostname of the Kimi vLLM server head node +# NGC_API_KEY - NVCF API key +# NGC_ORG - NVCF organization +# +# Optional env vars: +# MAX_PARALLEL - parallel VMs per collector (default: 10) +# MAX_TRAJECTORIES - trajectories to collect (default: 10000) +# TRAJECTORY_SAVE_DIR - output directory +# +# Optional arg: +# $1 = collector index (for log naming, default: 0) +# +# Usage: +# MODEL_NODE=pool0-03161 NGC_API_KEY=nvapi-xxx NGC_ORG=my-org \ +# bash run_collector_kimi_nvcf.sh 1 +# ============================================================================ + +COLLECTOR_IDX="${1:-0}" +LOG_DIR="${LOG_DIR:-./logs}" + +# Configs +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" +PROJECT_DIR="$PROJECT_ROOT/cua" +# COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua_cpu.sqsh" +# COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua_cpu.sqsh" +COLLECTOR_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" + +MAX_PARALLEL=${MAX_PARALLEL:-10} +MAX_TRAJECTORIES=${MAX_TRAJECTORIES:-10000} +TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-$PROJECT_DIR/trajectories/kimi-nvcf}" + +KIMI_PORT=8000 +NVCF_FUNCTION_NAME_PREFIX="${NVCF_FUNCTION_NAME_PREFIX:-data-collection}" + +# Validate +if [ -z "$MODEL_NODE" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: MODEL_NODE not set." + exit 1 +fi +if [ -z "$NGC_API_KEY" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: NGC_API_KEY not set." + exit 1 +fi +if [ -z "$NGC_ORG" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: NGC_ORG not set." + exit 1 +fi + +mkdir -p "$LOG_DIR" + +echo "[Collector $COLLECTOR_IDX] MODEL_NODE=$MODEL_NODE" +echo "[Collector $COLLECTOR_IDX] MAX_PARALLEL=$MAX_PARALLEL MAX_TRAJECTORIES=$MAX_TRAJECTORIES" +echo "[Collector $COLLECTOR_IDX] Runtime: nvcf" + +# --- 1. Submit holder job on CPU node (no /dev/kvm needed) --- +echo "[Collector $COLLECTOR_IDX] Submitting holder job..." +COLLECTOR_JOB_ID=$(sbatch --parsable \ + --job-name="kimi_nvcf_collector_${COLLECTOR_IDX}" \ + --account=nvr_lpr_agentic \ + --partition=cpu_short \ + --mem=0 \ + --time=01:30:00 \ + --exclusive \ + --output="$LOG_DIR/slurm-holder-${COLLECTOR_IDX}.out" \ + --error="$LOG_DIR/slurm-holder-${COLLECTOR_IDX}.out" \ + --wrap="srun --container-image=$COLLECTOR_IMAGE --container-mounts=/lustre:/lustre sleep infinity") + +if [ -z "$COLLECTOR_JOB_ID" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: Job submission failed." + exit 1 +fi +echo "[Collector $COLLECTOR_IDX] Holder job submitted: $COLLECTOR_JOB_ID" + +# --- 2. Cleanup trap --- +cleanup() { + echo "" + echo "[Collector $COLLECTOR_IDX] Cleaning up... Cancelling holder job $COLLECTOR_JOB_ID" + scancel "$COLLECTOR_JOB_ID" 2>/dev/null +} +trap cleanup EXIT + +# --- 3. Wait for job to start --- +echo "[Collector $COLLECTOR_IDX] Waiting for holder job to start..." +COLLECTOR_NODE="" +while [ -z "$COLLECTOR_NODE" ]; do + JOB_STATE=$(squeue -j "$COLLECTOR_JOB_ID" -h -o %T 2>/dev/null) + + if [ "$JOB_STATE" == "RUNNING" ]; then + COLLECTOR_NODE=$(squeue -j "$COLLECTOR_JOB_ID" -h -o %N) + elif [ -z "$JOB_STATE" ]; then + echo "[Collector $COLLECTOR_IDX] ERROR: Job $COLLECTOR_JOB_ID disappeared from queue!" + exit 1 + fi + sleep 2 +done +echo "[Collector $COLLECTOR_IDX] Job RUNNING on node: $COLLECTOR_NODE" + +# --- 4. Wait for container readiness --- +echo "[Collector $COLLECTOR_IDX] Polling for container readiness on $COLLECTOR_NODE..." +CONTAINER_PID="" +while [ -z "$CONTAINER_PID" ]; do + sleep 2 + CONTAINER_PID=$(ssh -q -o StrictHostKeyChecking=no "$COLLECTOR_NODE" \ + "enroot list -f | grep 'pyxis' | grep 'sleep' | awk '{print \$2}' | head -n 1" 2>/dev/null) + + if [ -z "$CONTAINER_PID" ]; then + printf "." + fi +done +echo "" +echo "[Collector $COLLECTOR_IDX] Container ready, PID: $CONTAINER_PID" + +# --- 5. SSH+enroot exec: run data collection with NVCF backend --- +ssh -t -q -o StrictHostKeyChecking=no "$COLLECTOR_NODE" \ + "enroot exec $CONTAINER_PID /bin/bash -c ' + set -e + export PYTHONUNBUFFERED=1 + export NGC_API_KEY=$NGC_API_KEY + export NGC_ORG=$NGC_ORG + export NVCF_FUNCTION_NAME_PREFIX=$NVCF_FUNCTION_NAME_PREFIX + export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache + + # Wait for Kimi vLLM to be healthy + echo \"[Collector $COLLECTOR_IDX] Waiting for Kimi vLLM at $MODEL_NODE:$KIMI_PORT...\" + ELAPSED=0 + MAX_WAIT=7200 + while [ \$ELAPSED -lt \$MAX_WAIT ]; do + if curl -sf http://$MODEL_NODE:$KIMI_PORT/health > /dev/null 2>&1; then + echo \"[Collector $COLLECTOR_IDX] Kimi vLLM is healthy!\" + break + fi + sleep 10 + ELAPSED=\$((ELAPSED + 10)) + if [ \$((ELAPSED % 60)) -eq 0 ]; then + echo \"[Collector $COLLECTOR_IDX] Still waiting for Kimi vLLM (\${ELAPSED}s)...\" + fi + done + + if [ \$ELAPSED -ge \$MAX_WAIT ]; then + echo \"[Collector $COLLECTOR_IDX] ERROR: Kimi vLLM did not become healthy within \${MAX_WAIT}s\" + exit 1 + fi + + # Run data collection with NVCF backend + # Activate Python venv with required dependencies + + echo \"[Collector $COLLECTOR_IDX] Starting parallel data collection (NVCF backend)...\" + cd $PROJECT_DIR + python parallel_collect_kimi.py \ + --model_node $MODEL_NODE \ + --runtime nvcf \ + --max_parallel $MAX_PARALLEL \ + --max_trajectories $MAX_TRAJECTORIES \ + --trajectory_save_dir $TRAJECTORY_SAVE_DIR + + COLLECT_EXIT=\$? + echo \"[Collector $COLLECTOR_IDX] Data collection finished with exit code \$COLLECT_EXIT\" + exit \$COLLECT_EXIT + '" diff --git a/cua/scripts/kimi/run_kimi.sbatch b/cua/scripts/kimi/run_kimi.sbatch index 1658a4064..cf0ff8a7f 100644 --- a/cua/scripts/kimi/run_kimi.sbatch +++ b/cua/scripts/kimi/run_kimi.sbatch @@ -17,7 +17,8 @@ TOTAL_TP=$(( SLURM_JOB_NUM_NODES * 8 )) TOTAL_PP=1 #TOTAL_TP=8 #TOTAL_PP=$SLURM_JOB_NUM_NODES -CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" +# CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/images/cua-vllm-0.16.0.sqsh" +CONTAINER_IMAGE="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/images/cua-vllm-0.16.0.sqsh" CONTAINER_MOUNTS="/lustre:/lustre,/tmp:/tmp" diff --git a/cua/scripts/kimi/run_parallel_kimi_colocated.sh b/cua/scripts/kimi/run_parallel_kimi_colocated.sh index 13379b46e..b6336c8df 100644 --- a/cua/scripts/kimi/run_parallel_kimi_colocated.sh +++ b/cua/scripts/kimi/run_parallel_kimi_colocated.sh @@ -27,7 +27,8 @@ export LOG_DIR="${LOG_DIR:-./logs}" # Configurable parameters MAX_PARALLEL="${MAX_PARALLEL:-16}" MAX_TRAJECTORIES="${MAX_TRAJECTORIES:-10000}" -TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi +# TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lpr_agentic/users/mingjiel/workspace/data/jaehun/cua/trajectories/kimi +TRAJECTORY_SAVE_DIR=/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/ProRL-Agent-Server/cua/trajectories/kimi-debug/ # Create logs directory mkdir -p "$LOG_DIR" @@ -36,7 +37,8 @@ KIMI_JOB_ID="" COLLECTOR_PIDS=() KIMI_PORT=8000 -PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +# PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" echo "============================================" diff --git a/cua/scripts/kimi/run_parallel_kimi_nvcf.sh b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh new file mode 100755 index 000000000..1d6d40b8a --- /dev/null +++ b/cua/scripts/kimi/run_parallel_kimi_nvcf.sh @@ -0,0 +1,223 @@ +#!/bin/bash +# ============================================================================ +# Kimi-K2.5 Data Collection with NVCF Backend (Colocated on GPU Nodes) +# ============================================================================ +# Runs data collection directly on the GPU server nodes (colocated with vLLM). +# Since NVCF VMs are remote, no /dev/kvm or extra CPU nodes are needed — +# collectors just orchestrate remote NVCF VMs from inside the vLLM container. +# +# 1. Submits Kimi vLLM sbatch job (2 GPU nodes, Ray cluster) +# 2. Waits for Kimi server to become healthy +# 3. SSH+enroot execs into each GPU node's container to run data collection +# 4. Waits for all collectors to finish, then cancels Kimi server +# +# Required env vars: +# NGC_API_KEY - NVCF API key +# NGC_ORG - NVCF organization +# +# Usage: +# NGC_API_KEY=nvapi-xxx NGC_ORG=my-org bash run_parallel_kimi_nvcf.sh +# +# ============================================================================ + +export LOG_DIR="${LOG_DIR:-./logs}" + +# Configurable parameters +MAX_PARALLEL="${MAX_PARALLEL:-16}" +MAX_TRAJECTORIES="${MAX_TRAJECTORIES:-10000}" +TRAJECTORY_SAVE_DIR="${TRAJECTORY_SAVE_DIR:-/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/bcui/ProRL-Agent-Server/cua/trajectories/kimi-nvcf/}" +NVCF_FUNCTION_NAME_PREFIX="${NVCF_FUNCTION_NAME_PREFIX:-data-collection}" + +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" +PROJECT_DIR="$PROJECT_ROOT/cua" + +# Validate NVCF credentials +if [ -z "$NGC_API_KEY" ]; then + echo "[ERROR] NGC_API_KEY not set. Required for NVCF backend." + exit 1 +fi +if [ -z "$NGC_ORG" ]; then + echo "[ERROR] NGC_ORG not set. Required for NVCF backend." + exit 1 +fi + +# Create logs directory +mkdir -p "$LOG_DIR" + +KIMI_JOB_ID="" +COLLECTOR_PIDS=() +KIMI_PORT=8000 + +echo "============================================" +echo "Kimi-K2.5 Data Collection (NVCF Colocated)" +echo "============================================" +echo "MAX_PARALLEL: $MAX_PARALLEL (per node)" +echo "MAX_TRAJECTORIES: $MAX_TRAJECTORIES (per node)" +echo "NGC_ORG: $NGC_ORG" +echo "NVCF_PREFIX: $NVCF_FUNCTION_NAME_PREFIX" +echo "" + +# --- 1. Submit Kimi vLLM server --- +echo "[nvcf] Submitting Kimi vLLM sbatch job..." +KIMI_JOB_ID=$(sbatch \ + --account=llmservice_fm_vision \ + --partition=batch_short \ + --time=02:00:00 \ + --output="$LOG_DIR/slurm-%j-server.out" \ + --error="$LOG_DIR/slurm-%j-server.out" \ + --parsable \ + "./run_kimi.sbatch") + +if [ -z "$KIMI_JOB_ID" ]; then + echo "[nvcf] ERROR: Kimi sbatch submission failed." + exit 1 +fi +echo "[nvcf] Kimi vLLM job submitted: $KIMI_JOB_ID" + +# Wait for the job to start and discover nodes +HEAD_NODE_FILE="$LOG_DIR/head_node_${KIMI_JOB_ID}" +echo "[nvcf] Waiting for head node file: $HEAD_NODE_FILE" +MODEL_NODE="" +ALL_NODES="" +ELAPSED=0 +MAX_WAIT=43200 # 12 hours + +while [ $ELAPSED -lt $MAX_WAIT ]; do + JOB_STATE=$(squeue -j "$KIMI_JOB_ID" -h -o %T 2>/dev/null) + if [ -z "$JOB_STATE" ]; then + echo "[nvcf] ERROR: Kimi job $KIMI_JOB_ID disappeared from queue!" + exit 1 + fi + + if [ -f "$HEAD_NODE_FILE" ]; then + MODEL_NODE=$(cat "$HEAD_NODE_FILE") + if [ -n "$MODEL_NODE" ]; then + ALL_NODES=$(scontrol show hostnames "$(squeue -j "$KIMI_JOB_ID" -h -o %N)") + echo "[nvcf] Kimi vLLM job running on: $(echo $ALL_NODES | tr '\n' ' ')" + break + fi + fi + + sleep 10 + ELAPSED=$((ELAPSED + 10)) + if [ $((ELAPSED % 60)) -eq 0 ]; then + echo "[nvcf] Still waiting for Kimi job to start (${ELAPSED}s)..." + fi +done + +if [ -z "$MODEL_NODE" ]; then + echo "[nvcf] ERROR: Kimi vLLM did not start within ${MAX_WAIT}s." + exit 1 +fi + +mapfile -t NODES_ARRAY <<< "$ALL_NODES" +echo "[nvcf] Head node (vLLM API): $MODEL_NODE" +echo "[nvcf] All nodes: ${NODES_ARRAY[*]}" + +# --- 2. Wait for Kimi vLLM health --- +echo "[nvcf] Waiting for Kimi vLLM health at $MODEL_NODE:$KIMI_PORT..." +ELAPSED=0 +MAX_HEALTH_WAIT=7200 + +while [ $ELAPSED -lt $MAX_HEALTH_WAIT ]; do + if curl -sf "http://$MODEL_NODE:$KIMI_PORT/health" > /dev/null 2>&1; then + echo "[nvcf] Kimi vLLM is healthy!" + break + fi + sleep 10 + ELAPSED=$((ELAPSED + 10)) + if [ $((ELAPSED % 60)) -eq 0 ]; then + echo "[nvcf] Still waiting for Kimi health (${ELAPSED}s)..." + fi +done + +if [ $ELAPSED -ge $MAX_HEALTH_WAIT ]; then + echo "[nvcf] ERROR: Kimi vLLM did not become healthy within ${MAX_HEALTH_WAIT}s." + exit 1 +fi + +# --- 3. Launch data collection on each GPU node via SSH+enroot --- +echo "[nvcf] Launching data collection on ${#NODES_ARRAY[@]} node(s)..." +COLLECTOR_PIDS=() + +for i in "${!NODES_ARRAY[@]}"; do + node=${NODES_ARRAY[$i]} + COLLECTOR_IDX=$((i + 1)) + CURRENT_LOG="$LOG_DIR/slurm-${KIMI_JOB_ID}-collector-nvcf-${COLLECTOR_IDX}.out" + + echo "[nvcf] Finding container on $node..." + CONTAINER_PID="" + while [ -z "$CONTAINER_PID" ]; do + sleep 2 + CONTAINER_PID=$(ssh -q -o StrictHostKeyChecking=no "$node" \ + "enroot list -f | grep 'pyxis' | head -n 1 | awk '{print \$2}'" 2>/dev/null) + done + echo "[nvcf] Container on $node ready, PID: $CONTAINER_PID" + + ssh -t -q -o StrictHostKeyChecking=no "$node" \ + "enroot exec $CONTAINER_PID /bin/bash -c ' + set -e + export PYTHONUNBUFFERED=1 + export NGC_API_KEY=$NGC_API_KEY + export NGC_ORG=$NGC_ORG + export NVCF_FUNCTION_NAME_PREFIX=$NVCF_FUNCTION_NAME_PREFIX + export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache + + # Activate Python venv with required dependencies + source /lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/cua_env_reqs/bin/activate + + # Fix TLS certs: copy venv cacert.pem to container path if missing + VENV_CACERT=/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server/cua/cua_env_reqs/lib/python3.12/site-packages/certifi/cacert.pem + SYS_CACERT=/usr/local/lib/python3.12/dist-packages/certifi/cacert.pem + if [ ! -f \$SYS_CACERT ] && [ -f \$VENV_CACERT ]; then + mkdir -p /usr/local/lib/python3.12/dist-packages/certifi + cp \$VENV_CACERT \$SYS_CACERT 2>/dev/null || true + fi + export REQUESTS_CA_BUNDLE=\$VENV_CACERT + export SSL_CERT_FILE=\$VENV_CACERT + export CURL_CA_BUNDLE=\$VENV_CACERT + + echo \"[Collector $COLLECTOR_IDX] Starting data collection on $node (NVCF backend)...\" + cd $PROJECT_DIR + python parallel_collect_kimi.py \ + --model_node $MODEL_NODE \ + --runtime nvcf \ + --max_parallel $MAX_PARALLEL \ + --max_trajectories $MAX_TRAJECTORIES \ + --trajectory_save_dir $TRAJECTORY_SAVE_DIR + + COLLECT_EXIT=\$? + echo \"[Collector $COLLECTOR_IDX] Done (exit code \$COLLECT_EXIT)\" + exit \$COLLECT_EXIT + '" &> "$CURRENT_LOG" & + COLLECTOR_PIDS+=($!) + echo "[nvcf] Collector $COLLECTOR_IDX launched on $node (PID ${COLLECTOR_PIDS[-1]})" + echo " Log: $CURRENT_LOG" +done + +# --- 4. Wait for all collectors --- +echo "" +echo "[nvcf] All collectors launched. Waiting for completion..." +echo "" + +FAILED=0 +for i in "${!COLLECTOR_PIDS[@]}"; do + COLLECTOR_NUM=$((i + 1)) + wait "${COLLECTOR_PIDS[$i]}" 2>/dev/null + EXIT_CODE=$? + if [ $EXIT_CODE -eq 0 ]; then + echo "[nvcf] Collector $COLLECTOR_NUM finished successfully." + else + echo "[nvcf] Collector $COLLECTOR_NUM failed (exit code $EXIT_CODE)." + FAILED=$((FAILED + 1)) + fi +done + +echo "" +echo "============================================" +echo "[nvcf] All collectors finished. $FAILED/${#NODES_ARRAY[@]} failed." +echo "============================================" + +if [ $FAILED -gt 0 ]; then + exit 1 +fi