Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
341 changes: 248 additions & 93 deletions cua/modules/debug_env_controller.py

Large diffs are not rendered by default.

75 changes: 52 additions & 23 deletions cua/modules/module_data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -12,8 +12,6 @@
from pathlib import Path
from typing import Optional, Dict, Any, Tuple

import ipdb

from modules.actors.debug_uitars_actor import UITarsActor
from modules.debug_planner import Planner
from modules.debug_env_controller import EnvController
Expand Down Expand Up @@ -42,6 +40,13 @@ def __init__(self, args: Namespace):
self.vm_image_path = args.vm_image_path
self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows'

# Runtime type: "singularity" (local KVM) or "nvcf" (NVCF via OSWorld DesktopEnv)
self.runtime_type = getattr(args, 'runtime', 'singularity')

# NVCF credentials (passed via env vars to OSWorld's NVCFProvider)
self.nvcf_api_key = getattr(args, 'nvcf_api_key', None)
self.nvcf_org = getattr(args, 'nvcf_org', None)

self.max_steps_per_trajectory = args.max_steps_per_trajectory
self.max_steps_per_goal = args.max_steps_per_goal

Expand Down Expand Up @@ -117,10 +122,15 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path):

logger.debug(f"✓ [save_trajectory] Saved to {str(trajectory_save_dir / 'trajectory.json')}")

async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
async def init_runtime_for_job(self, trajectory_idx: int,
nvcf_function_id: str = None,
nvcf_version_id: str = None) -> Tuple:
"""
Stage 1: Initialize the VM and OSWorld setup.
Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup)
Returns: (env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup)

Uses OSWorld's DesktopEnv which handles NVCF deploy, local proxy,
and environment setup internally.
"""
# Create unique IDs
job_id = f"job_{trajectory_idx:04d}"
Expand All @@ -140,13 +150,35 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
else:
osworld_setup_ready = True

# Initialize Runtime (Async)
runtime = await EnvController.initialize_runtime(
job_id, self.vm_image_path, self.os_type, osworld_setup
logger.info(f"[job {trajectory_idx:04d}] Sampled OSWorld config: id={osworld_setup.get('id', 'unknown')}, "
f"snapshot={osworld_setup.get('snapshot', 'unknown')}, "
f"apps={osworld_setup.get('related_apps', [])}, "
f"instruction={osworld_setup.get('instruction', '')[:80]}")

# Pre-download setup files to local cache BEFORE deploying NVCF.
# This avoids wasting expensive NVCF GPU resources if downloads fail.
if self.runtime_type == "nvcf":
logger.info(f"[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...")
download_ok = EnvController.pre_download_setup_files(osworld_setup)
if not download_ok:
raise RuntimeError(
f"[job {trajectory_idx:04d}] Setup file pre-download failed. "
f"Skipping NVCF deploy to avoid wasting resources."
)
logger.info(f"[job {trajectory_idx:04d}] Pre-download complete, proceeding with NVCF deploy.")

# Initialize DesktopEnv (handles NVCF deploy + proxy + setup internally)
env = await EnvController.initialize_runtime(
job_id, self.vm_image_path, self.os_type, osworld_setup,
runtime_type=self.runtime_type,
nvcf_function_id=nvcf_function_id,
nvcf_version_id=nvcf_version_id,
nvcf_api_key=self.nvcf_api_key,
nvcf_org=self.nvcf_org,
)

# Get screen size
width, height = EnvController.get_screen_size(runtime)
width, height = EnvController.get_screen_size(env)

# Prepare Metadata
trajectory = {
Expand All @@ -160,25 +192,24 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
'steps': [],
}

return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup
return env, trajectory, trajectory_save_dir, trajectory_id, osworld_setup

async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict):
async def collect_trajectory(self, env, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict):
"""
Stage 2: Run the Agent Loop (Goal Generation -> Action Execution).
`env` is an OSWorld DesktopEnv instance.
"""
# Wait for UI initialization
time.sleep(3.0)

# Initial Screenshot
screenshot_bytes = EnvController.get_screenshot(runtime)
screenshot_bytes = EnvController.get_screenshot(env)
image_filename = trajectory_save_dir / f"0-0.png"
save_image(screenshot_bytes, image_filename, logger)

# --- 1. Generate High Level Goal --- #
# todo implement the verification mechanism for goal achievability using requirements
# generate goal in a separate loop
prev_requirements = [] # will be a list of tuple [("condition 1", "verdict 1"), ...]
example_goals = random.sample(self.example_instructions, 1) # for now, we sample 1 example goal
prev_requirements = []
example_goals = random.sample(self.example_instructions, 1)
goal, requirements = self.planner.generate_goal_with_long_horizon(
screenshot_bytes, osworld_setup["config"], example_goals, prev_requirements,
)
Expand Down Expand Up @@ -231,21 +262,19 @@ async def collect_trajectory(self, runtime, trajectory: Dict, trajectory_save_di
)

if action_result is None:
# UI-TARS action generation failed (failed to meet the requirement)
# in this case, save only up to the current trajectory
break

pyautogui_command = action_result["pyautogui_command"]
action_generation = action_result["action_generation"]

# Execute
EnvController.execute_pyautogui_command(runtime, pyautogui_command)
EnvController.execute_pyautogui_command(env, pyautogui_command)

# Wait & Observe
time.sleep(3.0)

# Capture new state
screenshot_bytes = EnvController.get_screenshot(runtime)
screenshot_bytes = EnvController.get_screenshot(env)

# Save step info
action_idx = len(step_for_this_subgoal['actions'])
Expand Down Expand Up @@ -276,11 +305,11 @@ async def single_trajectory_job(self, trajectory_idx: int):
Simply chains the two stages sequentially in the main thread.
"""
# 1. Init
runtime, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx)
env, trajectory_data, save_dir, t_id, setup = await self.init_runtime_for_job(trajectory_idx)

try:
# 2. Collect
await self.collect_trajectory(runtime, trajectory_data, save_dir, setup)
await self.collect_trajectory(env, trajectory_data, save_dir, setup)
finally:
# Cleanup for debug mode
runtime.close()
# Cleanup
env.close()
28 changes: 22 additions & 6 deletions cua/modules_kimi/data_collector.py
Original file line number Diff line number Diff line change
Expand Up @@ -16,7 +16,6 @@
from pathlib import Path
from typing import Optional, Dict, Any, Tuple

import ipdb

from modules_kimi.kimi_actor import KimiActor
from modules_kimi.env_controller import EnvController
Expand Down Expand Up @@ -44,6 +43,9 @@ def __init__(self, args: Namespace):
self.vm_image_path = args.vm_image_path
self.os_type = 'linux' if 'Ubuntu' in self.vm_image_path else 'windows'

# Runtime type: 'singularity' (local KVM) or 'nvcf' (NVCF via OSWorld DesktopEnv)
self.runtime_type = getattr(args, 'runtime', 'singularity')

self.max_steps_per_trajectory = args.max_steps_per_trajectory

# Output directory: cua/trajectories/kimi/<hostname>--<timestamp>
Expand Down Expand Up @@ -85,7 +87,7 @@ def save_trajectory(trajectory: Dict, trajectory_save_dir: Path):

logger.debug(f"Saved trajectory to {trajectory_save_dir / 'trajectory.json'}")

async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
async def init_runtime_for_job(self, trajectory_idx: int, nvcf_function_id: str = None, nvcf_version_id: str = None) -> Tuple:
"""
Stage 1: Initialize the VM and OSWorld setup.
Returns: (runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup)
Expand All @@ -109,13 +111,27 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
# else:
# osworld_setup_ready = True

# Pre-download setup files before NVCF deploy to avoid wasting GPU resources
if self.runtime_type == 'nvcf':
logger.info(f'[job {trajectory_idx:04d}] Pre-downloading setup files before NVCF deploy...')
download_ok = EnvController.pre_download_setup_files(osworld_setup)
if not download_ok:
raise RuntimeError(
f'[job {trajectory_idx:04d}] Setup file pre-download failed. '
f'Skipping NVCF deploy to avoid wasting resources.'
)
logger.info(f'[job {trajectory_idx:04d}] Pre-download complete.')

# Initialize runtime
runtime = await EnvController.initialize_runtime(
job_id, self.vm_image_path, self.os_type, osworld_setup
env_or_runtime = await EnvController.initialize_runtime(
job_id, self.vm_image_path, self.os_type, osworld_setup,
runtime_type=self.runtime_type,
nvcf_function_id=nvcf_function_id,
nvcf_version_id=nvcf_version_id,
)

# Get screen size
width, height = EnvController.get_screen_size(runtime)
width, height = EnvController.get_screen_size(env_or_runtime)

trajectory = {
"trajectory_id": trajectory_id,
Expand All @@ -129,7 +145,7 @@ async def init_runtime_for_job(self, trajectory_idx: int) -> Tuple:
"steps": [],
}

return runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup
return env_or_runtime, trajectory, trajectory_save_dir, trajectory_id, osworld_setup

async def collect_trajectory(
self, runtime, trajectory: Dict, trajectory_save_dir: Path, osworld_setup: Dict
Expand Down
Loading