diff --git a/cua/desktop_env/desktop_env.py b/cua/desktop_env/desktop_env.py index bac706b0..098b21b5 100644 --- a/cua/desktop_env/desktop_env.py +++ b/cua/desktop_env/desktop_env.py @@ -161,7 +161,7 @@ def __init__( # Track whether environment has been used (step/setup) to optimize snapshot revert # docker, aws, gcp, azure are always unused as the emulator starts from a clean state # vmware, virtualbox are always used as the emulator starts from a dirty state - if self.provider_name in {"docker", "nvcf_dummy", "nvcf", "aws", "gcp", "azure", "aliyun", "volcengine", "singularity"}: + if self.provider_name in {"docker", "nvcf_dummy", "nvcf", "nvcf_singularity", "aws", "gcp", "azure", "aliyun", "volcengine", "singularity"}: self.is_environment_used = False elif self.provider_name in {"vmware", "virtualbox"}: self.is_environment_used = True @@ -347,7 +347,8 @@ def _set_task_info(self, task_config: Dict[str, Any]): # Link existing downloaded files to avoid re-downloading them setup_cache_dir = os.getenv('OSWORLD_SETUP_CACHE_DIR', None) - setup_cache_dir = os.path.join(setup_cache_dir, self.task_id) + if setup_cache_dir is not None: + setup_cache_dir = os.path.join(setup_cache_dir, self.task_id) if setup_cache_dir is not None and os.path.isdir(setup_cache_dir): logger.info(f"Setup cache directory: {setup_cache_dir}. Files will not need to be downloaded again.") # create symlink of all files in setup cache directory to eval cache directory diff --git a/cua/desktop_env/providers/__init__.py b/cua/desktop_env/providers/__init__.py index 13264cdd..e9a9cf6e 100644 --- a/cua/desktop_env/providers/__init__.py +++ b/cua/desktop_env/providers/__init__.py @@ -51,5 +51,9 @@ def create_vm_manager_and_provider(provider_name: str, region: str, use_proxy: b from desktop_env.providers.singularity.manager import SingularityVMManager from desktop_env.providers.singularity.provider import SingularityProvider return SingularityVMManager(), SingularityProvider(region) + elif provider_name == "nvcf_singularity": + from desktop_env.providers.nvcf_singularity.manager import NVCFSingularityVMManager + from desktop_env.providers.nvcf_singularity.provider import NVCFSingularityProvider + return NVCFSingularityVMManager(), NVCFSingularityProvider(region) else: raise NotImplementedError(f"{provider_name} not implemented!") diff --git a/cua/desktop_env/providers/nvcf_singularity/__init__.py b/cua/desktop_env/providers/nvcf_singularity/__init__.py new file mode 100644 index 00000000..e69de29b diff --git a/cua/desktop_env/providers/nvcf_singularity/manager.py b/cua/desktop_env/providers/nvcf_singularity/manager.py new file mode 100644 index 00000000..0930c299 --- /dev/null +++ b/cua/desktop_env/providers/nvcf_singularity/manager.py @@ -0,0 +1,37 @@ +import logging + +from desktop_env.providers.base import VMManager + +logger = logging.getLogger("desktopenv.providers.nvcf_singularity.NVCFSingularityVMManager") +logger.setLevel(logging.INFO) + + +class NVCFSingularityVMManager(VMManager): + """ + Minimal VM manager for the nvcf_singularity provider. + This provider uses a prebuilt Singularity image directly and does not need VM files. + """ + + def __init__(self, registry_path: str = ""): + self.registry_path = registry_path + + def initialize_registry(self, **kwargs): + pass + + def add_vm(self, vm_path, **kwargs): + pass + + def delete_vm(self, vm_path, region=None, **kwargs): + pass + + def occupy_vm(self, vm_path, pid, region=None, **kwargs): + pass + + def list_free_vms(self, **kwargs): + return [""] + + def check_and_clean(self, **kwargs): + pass + + def get_vm_path(self, os_type: str, region: str = None, screen_size=(1920, 1080), **kwargs) -> str: + return "" diff --git a/cua/desktop_env/providers/nvcf_singularity/provider.py b/cua/desktop_env/providers/nvcf_singularity/provider.py new file mode 100644 index 00000000..06f0bc11 --- /dev/null +++ b/cua/desktop_env/providers/nvcf_singularity/provider.py @@ -0,0 +1,267 @@ +import logging +import os +import random +import signal +import socket +import subprocess +import threading +import time +from pathlib import Path + +import requests + +from desktop_env.providers.base import Provider + +logger = logging.getLogger("desktopenv.providers.nvcf_singularity.NVCFSingularityProvider") +logger.setLevel(logging.INFO) + +RETRY_INTERVAL = 10 +WAIT_TIME = 3 +DEFAULT_SIF_PATH = "/lustre/fsw/portfolios/nvr/users/mingjiel/workspace/nvcf-osworld-eval/osworld-linux.sif" +# Port ranges for parallel instance support (matches singularity provider) +API_PORT_RANGE = (15000, 19999) +VNC_PORT_RANGE = (18000, 22999) +CHROME_PORT_RANGE = (19000, 22999) +VLC_PORT_RANGE = (20000, 22999) + + +class PortAllocationError(Exception): + pass + + +class NVCFSingularityProvider(Provider): + """ + Singularity-based provider that runs a prebuilt NVCF image directly. + Unlike the default singularity provider, it does not use qcow2 VM images. + Equivalent to NVCFDummyProvider but uses Singularity instead of Docker. + + Supports running multiple instances in parallel via dynamic port allocation. + Each instance gets unique ports for API, VNC, Chrome DevTools, and VLC, + passed to the container via environment variables. + """ + + _port_allocation_lock = threading.Lock() + + def __init__(self, region: str = None): + super().__init__(region) + self.process: subprocess.Popen = None + self.process_pid: int = None + self._stdout_fh = None + self._stderr_fh = None + + self.server_port = None + self.chromium_port = None + self.vnc_port = None + self.vlc_port = None + + self._check_singularity_availability() + + @staticmethod + def _check_singularity_availability(): + try: + subprocess.run( + ['singularity', '--version'], + capture_output=True, text=True, check=True, + ) + except (subprocess.CalledProcessError, FileNotFoundError) as e: + raise RuntimeError( + 'Singularity/Apptainer is not available. ' + 'Please install it to use NVCFSingularityProvider.' + ) from e + + @staticmethod + def _check_port_available(port: int) -> bool: + sock = socket.socket(socket.AF_INET, socket.SOCK_STREAM) + try: + sock.bind(('0.0.0.0', port)) + return True + except OSError: + return False + finally: + sock.close() + + def _find_available_port(self, min_port: int, max_port: int, max_attempts: int = 50) -> int: + rng = random.SystemRandom() + ports = list(range(min_port, max_port + 1)) + rng.shuffle(ports) + + for port in ports[:max_attempts]: + if self._check_port_available(port): + return port + raise PortAllocationError(f"No available ports found in range {min_port}-{max_port}") + + def _allocate_ports(self) -> tuple: + """Allocate unique ports for API, VNC, Chrome, and VLC.""" + with NVCFSingularityProvider._port_allocation_lock: + api_port = self._find_available_port(*API_PORT_RANGE) + vnc_port = self._find_available_port(*VNC_PORT_RANGE) + chrome_port = self._find_available_port(*CHROME_PORT_RANGE) + vlc_port = self._find_available_port(*VLC_PORT_RANGE) + return api_port, vnc_port, chrome_port, vlc_port + + def _wait_for_vm_ready(self, timeout: int = 300): + start_time = time.time() + + while time.time() - start_time < timeout: + try: + response = requests.get( + f"http://localhost:{self.server_port}/screenshot", + timeout=(10, 10), + ) + if response.status_code == 200: + return True + except Exception: + pass + + if self.process and self.process.poll() is not None: + self._read_and_raise_error() + + logger.info("Checking if nvcf_singularity container is ready...") + time.sleep(RETRY_INTERVAL) + + raise TimeoutError("nvcf_singularity failed to become ready within timeout period") + + def _read_and_raise_error(self): + self._close_log_handles() + log_dir = Path("/tmp/osworld_nvcf_singularity_logs") + err_files = sorted(log_dir.glob("singularity_*.err"), reverse=True) + error_output = "" + if err_files: + error_output = err_files[0].read_text() + raise RuntimeError( + f"Singularity container exited unexpectedly. " + f"Return code: {self.process.returncode}\nError: {error_output}" + ) + + def start_emulator(self, path_to_vm: str, headless: bool, os_type: str = "Ubuntu"): + del path_to_vm, headless, os_type + + sif_path = os.environ.get("NVCF_SINGULARITY_SIF_PATH", DEFAULT_SIF_PATH) + if not os.path.exists(sif_path): + raise FileNotFoundError( + f"Singularity image not found: {sif_path}. " + f"Please build or download the .sif image first." + ) + + try: + self.server_port, self.vnc_port, self.chromium_port, self.vlc_port = self._allocate_ports() + + logger.info( + "Allocated ports - API: %d, VNC: %d, Chrome: %d, VLC: %d", + self.server_port, self.vnc_port, self.chromium_port, self.vlc_port, + ) + + cmd = [ + 'singularity', 'run', + '--contain', + '--cleanenv', + '--pid', + '--writable-tmpfs', + '--no-mount', 'home,cwd,tmp', + '--home', '/home/user', + '--env', f'API_PORT={self.server_port}', + '--env', f'VNC_PORT={self.vnc_port}', + '--env', f'CHROME_PORT={self.chromium_port}', + '--env', f'VLC_PORT={self.vlc_port}', + sif_path, + ] + + log_dir = Path("/tmp/osworld_nvcf_singularity_logs") + log_dir.mkdir(parents=True, exist_ok=True) + + timestamp = int(time.time()) + stdout_path = log_dir / f"singularity_{timestamp}.out" + stderr_path = log_dir / f"singularity_{timestamp}.err" + + self._stdout_fh = open(stdout_path, 'w') + self._stderr_fh = open(stderr_path, 'w') + + self.process = subprocess.Popen( + cmd, + stdout=self._stdout_fh, + stderr=self._stderr_fh, + text=True, + start_new_session=True, + ) + self.process_pid = self.process.pid + + time.sleep(2) + if self.process.poll() is not None: + self._close_log_handles() + with open(stderr_path, 'r') as f: + error_output = f.read() + raise RuntimeError( + f"Singularity container failed to start. " + f"Return code: {self.process.returncode}\nError: {error_output}" + ) + + logger.info("Singularity process started with PID: %d", self.process_pid) + logger.info("Logs: stdout=%s, stderr=%s", stdout_path, stderr_path) + + logger.info( + "Started nvcf_singularity with image '%s' " + "(api=%d, vnc=%d, chrome=%d, vlc=%d)", + sif_path, + self.server_port, self.vnc_port, self.chromium_port, self.vlc_port, + ) + self._wait_for_vm_ready() + except Exception: + self.stop_emulator("") + raise + + def _close_log_handles(self): + for fh in (self._stdout_fh, self._stderr_fh): + if fh: + try: + fh.close() + except Exception: + pass + self._stdout_fh = None + self._stderr_fh = None + + def get_ip_address(self, path_to_vm: str) -> str: + if not all([self.server_port, self.chromium_port, self.vnc_port, self.vlc_port]): + raise RuntimeError("Container not started - ports not allocated") + return f"localhost:{self.server_port}:{self.chromium_port}:{self.vnc_port}:{self.vlc_port}" + + def save_state(self, path_to_vm: str, snapshot_name: str): + raise NotImplementedError("Snapshots not available for nvcf_singularity provider") + + def revert_to_snapshot(self, path_to_vm: str, snapshot_name: str): + self.stop_emulator(path_to_vm) + + def stop_emulator(self, path_to_vm: str, region=None, *args, **kwargs): + del path_to_vm, region, args, kwargs + + try: + self._close_log_handles() + + # With --containall (which includes --pid), the container runs in + # its own PID namespace. Killing the Singularity CLI process + # tears down the namespace and the kernel reaps every container + # process automatically — no manual tree-walk needed. + if self.process_pid is not None: + logger.info("Stopping Singularity process (PID: %d)", self.process_pid) + try: + os.kill(self.process_pid, signal.SIGTERM) + time.sleep(2) + try: + os.kill(self.process_pid, 0) + os.kill(self.process_pid, signal.SIGKILL) + except ProcessLookupError: + pass + except ProcessLookupError: + pass + except Exception as e: + logger.warning("Failed to kill Singularity process: %s", e) + + time.sleep(WAIT_TIME) + except Exception as e: + logger.error("Error stopping nvcf_singularity container: %s", e) + finally: + self.process = None + self.process_pid = None + self.server_port = None + self.chromium_port = None + self.vnc_port = None + self.vlc_port = None diff --git a/cua/modules_kimi/env_controller.py b/cua/modules_kimi/env_controller.py index 2fdc598a..9b125966 100644 --- a/cua/modules_kimi/env_controller.py +++ b/cua/modules_kimi/env_controller.py @@ -26,7 +26,7 @@ class EnvController: """ Static wrapper class that interfaces with either: - OSWorldSingularityRuntime (runtime_type='singularity') - - OSWorld DesktopEnv (runtime_type='nvcf') + - OSWorld DesktopEnv (runtime_type='nvcf' or 'nvcf_singularity') """ @staticmethod @@ -120,6 +120,7 @@ async def initialize_runtime( Initialize runtime. runtime_type='singularity': uses OSWorldSingularityRuntime (local KVM). + runtime_type='nvcf_singularity': uses OSWorld DesktopEnv with NVCFSingularityProvider (local .sif). runtime_type='nvcf': uses OSWorld DesktopEnv with NVCFProvider. """ logger.debug(f"[initialize_runtime] Creating {runtime_type} runtime for {job_id}") @@ -164,6 +165,32 @@ async def initialize_runtime( pass raise + elif runtime_type == "nvcf_singularity": + from desktop_env.desktop_env import DesktopEnv + + env = None + try: + env = DesktopEnv( + provider_name="nvcf_singularity", + path_to_vm="", + action_space="pyautogui", + headless=True, + os_type="Ubuntu" if os_type == "linux" else os_type, + require_a11y_tree=False, + ) + + logger.debug(f"[initialize_runtime] DesktopEnv (nvcf_singularity) created, resetting with OSWorld setup...") + env.reset(task_config=osworld_setup) + logger.debug(f"[initialize_runtime] DesktopEnv reset complete for {job_id}") + return env + except Exception: + if env is not None: + try: + env.close() + except Exception: + pass + raise + else: # Singularity backend from examples.setup import SetupController diff --git a/cua/parallel_collect_kimi.py b/cua/parallel_collect_kimi.py index 25700667..518c179d 100644 --- a/cua/parallel_collect_kimi.py +++ b/cua/parallel_collect_kimi.py @@ -269,8 +269,8 @@ def parse_args(): default="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/models/Kimi-K2.5") # Runtime selection - parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf"], default="singularity", - help="Runtime backend: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)") + parser.add_argument("--runtime", type=str, choices=["singularity", "nvcf", "nvcf_singularity"], default="singularity", + help="Runtime backend: 'singularity' (local KVM), 'nvcf' (NVCF cloud), or 'nvcf_singularity' (local .sif)") # Generation mode parser.add_argument("--generation_mode", type=str, default="vanilla", @@ -290,12 +290,14 @@ def parse_args(): parser.add_argument("--max_parallel", type=int, default=24, help="Max concurrent VMs") parser.add_argument("--max_trajectories", type=int, default=10000, help="Total trajectories to generate") + parser.add_argument("--project_dir", type=str, required=True, + help="Project directory (e.g. /path/to/ProRL-Agent-Server/cua)") parser.add_argument("--timeout", type=int, default=14400, help="Global timeout in seconds (default: 14400 = 4 hours)") args = parser.parse_args() - PROJECT_DIR = "/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server-v2/cua" + PROJECT_DIR = args.project_dir if args.generation_mode == "vanilla": args.persona_dataset_path = "/lustre/fsw/portfolios/nvr/users/yidong/data/nemotron_data/data/" diff --git a/cua/scripts/kimi/run_parallel_kimi_colocated.sh b/cua/scripts/kimi/run_parallel_kimi_colocated.sh index 276ebbff..1cce804d 100644 --- a/cua/scripts/kimi/run_parallel_kimi_colocated.sh +++ b/cua/scripts/kimi/run_parallel_kimi_colocated.sh @@ -22,6 +22,7 @@ # bash run_parallel_kimi_colocated.sh # MAX_PARALLEL=12 RUNTIME=nvcf GENERATION_MODE=zenodo bash run_parallel_kimi_colocated.sh # MAX_PARALLEL=16 RUNTIME=singularity GENERATION_MODE=zenodo bash run_parallel_kimi_colocated.sh +# MAX_PARALLEL=16 RUNTIME=nvcf_singularity GENERATION_MODE=zenodo bash run_parallel_kimi_colocated.sh # Log files: # logs/slurm--server.out # logs/slurm--collector-1.out @@ -69,7 +70,10 @@ KIMI_JOB_ID="" COLLECTOR_PIDS=() KIMI_PORT=8000 -PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server-v2" + +# TODO: change to your proejct root dir +# PROJECT_ROOT="/lustre/fs1/portfolios/nvr/projects/nvr_lacr_llm/users/jaehunj/cua/prorl-agent-server-v2" +PROJECT_ROOT="/lustre/fsw/portfolios/nvr/users/bcui/ProRL-Agent-Server" PROJECT_DIR="$PROJECT_ROOT/cua" echo "============================================" @@ -150,10 +154,9 @@ if [ "$RUNTIME" = "nvcf" ]; then else echo "[colocated] Submitting Kimi vLLM sbatch job (KVM runtime, reserved nodes)..." KIMI_JOB_ID=$(sbatch \ - --account=llmservice_fm_vision \ - --reservation=sla_res_osworld_agent_vlm \ - --partition=batch_block1 \ - --time=04:00:00 \ + --account=nvr_lacr_llm \ + --partition=batch_short \ + --time=02:00:00 \ --output="$LOG_DIR/slurm-%j-server.out" \ --error="$LOG_DIR/slurm-%j-server.out" \ --parsable \ @@ -260,6 +263,8 @@ for i in "${!NODES_ARRAY[@]}"; do if [ "$RUNTIME" = "nvcf" ]; then NVCF_EXPORTS="export NGC_API_KEY=$NGC_API_KEY; export NGC_ORG=$NGC_ORG; export NVCF_FUNCTION_NAME_PREFIX=$NVCF_FUNCTION_NAME_PREFIX; export OSWORLD_SETUP_CACHE_DIR=/tmp/osworld_cache;" RUNTIME_ARG="--runtime nvcf" + elif [ "$RUNTIME" = "nvcf_singularity" ]; then + RUNTIME_ARG="--runtime nvcf_singularity" fi ssh -t -q -o StrictHostKeyChecking=no "$node" \ @@ -272,6 +277,7 @@ for i in "${!NODES_ARRAY[@]}"; do cd $PROJECT_DIR python parallel_collect_kimi.py \ --model_node $MODEL_NODE \ + --project_dir $PROJECT_DIR \ --generation_mode $GENERATION_MODE \ $RUNTIME_ARG \ --max_parallel $MAX_PARALLEL \