-
Notifications
You must be signed in to change notification settings - Fork 22
Expand file tree
/
Copy pathharness.py
More file actions
390 lines (326 loc) · 15.6 KB
/
harness.py
File metadata and controls
390 lines (326 loc) · 15.6 KB
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
#!/usr/bin/env python3
"""
Harness — profile-driven multi-agent architecture for autonomous task execution.
Reproduces the design from Anthropic's "Harness design for long-running
application development" using a pure Python + OpenAI-compatible API approach.
The core loop (Plan → Build → Evaluate → Iterate) is generic.
Profiles define the scenario-specific behavior (prompts, tools, scoring).
Built-in profiles:
app-builder — Build web apps from a prompt (original Anthropic article scenario)
terminal — Solve terminal/CLI tasks (Terminal-Bench-2 style)
swe-bench — Fix GitHub issues in real repos
reasoning — Knowledge-intensive QA (MMMU-Pro style)
Usage:
python harness.py "Build a DAW in the browser" # default: app-builder
python harness.py --profile terminal "Fix the broken git merge"
python harness.py --profile swe-bench "Fix issue #123"
python harness.py --profile reasoning "Calculate the orbital period of..."
python harness.py --list-profiles
"""
from __future__ import annotations
import logging
import os
import re
import sys
import time
from pathlib import Path
import config
import tools
from agents import Agent
from skills import SkillRegistry
from profiles import get_profile, list_profiles
from profiles.base import BaseProfile
log = logging.getLogger("harness")
class Harness:
"""
Generic orchestration loop driven by a Profile.
The Profile defines:
- System prompts for each agent role
- Which tools each agent gets
- Evaluation criteria and pass threshold
- Whether contract negotiation is enabled
The Harness handles:
- The Plan → Build → Evaluate → Iterate loop
- Context lifecycle (compaction / reset)
- Score tracking and REFINE/PIVOT decisions
- Workspace and git management
"""
def __init__(self, profile: BaseProfile):
self.profile = profile
self.skill_registry = SkillRegistry()
skill_catalog = self.skill_registry.build_catalog_prompt()
# Build agents from profile config
planner_cfg = profile.planner()
builder_cfg = profile.builder()
evaluator_cfg = profile.evaluator()
proposer_cfg = profile.contract_proposer()
reviewer_cfg = profile.contract_reviewer()
self.planner = Agent(
"planner", planner_cfg.system_prompt + skill_catalog,
use_tools=True, extra_tool_schemas=planner_cfg.extra_tool_schemas,
middlewares=planner_cfg.middlewares, time_budget=planner_cfg.time_budget,
) if planner_cfg.enabled else None
self.builder = Agent(
"builder", builder_cfg.system_prompt + skill_catalog,
use_tools=True, extra_tool_schemas=builder_cfg.extra_tool_schemas,
tool_schemas=builder_cfg.tool_schemas,
middlewares=builder_cfg.middlewares, time_budget=builder_cfg.time_budget,
)
self.evaluator = Agent(
"evaluator", evaluator_cfg.system_prompt,
use_tools=True, extra_tool_schemas=evaluator_cfg.extra_tool_schemas,
middlewares=evaluator_cfg.middlewares, time_budget=evaluator_cfg.time_budget,
) if evaluator_cfg.enabled else None
self.contract_proposer = Agent(
"contract_proposer", proposer_cfg.system_prompt, use_tools=True,
middlewares=proposer_cfg.middlewares,
) if proposer_cfg.enabled else None
self.contract_reviewer = Agent(
"contract_reviewer", reviewer_cfg.system_prompt, use_tools=True,
middlewares=reviewer_cfg.middlewares,
) if reviewer_cfg.enabled else None
def run(self, user_prompt: str) -> None:
# Create a unique project subdirectory under workspace
# (skip if HARNESS_FLAT_WORKSPACE is set — used for benchmarks
# where outputs must land directly in the workspace root)
if os.environ.get("HARNESS_FLAT_WORKSPACE"):
Path(config.WORKSPACE).mkdir(parents=True, exist_ok=True)
else:
from datetime import datetime
slug = re.sub(r'[^a-z0-9]+', '-', user_prompt.lower().strip())[:40].strip('-')
timestamp = datetime.now().strftime("%Y%m%d-%H%M%S")
project_name = f"{timestamp}_{slug}"
project_dir = os.path.join(config.WORKSPACE, project_name)
config.WORKSPACE = os.path.abspath(project_dir)
Path(config.WORKSPACE).mkdir(parents=True, exist_ok=True)
log.info(f"Profile: {self.profile.name()}")
log.info(f"Project directory: {config.WORKSPACE}")
# Initialize git
git_dir = Path(config.WORKSPACE) / ".git"
if not git_dir.exists():
os.system(f"cd {config.WORKSPACE} && git init && git add -A 2>/dev/null; git commit -m 'init' --allow-empty 2>/dev/null")
total_start = time.time()
max_rounds = self.profile.max_rounds() or config.MAX_HARNESS_ROUNDS
threshold = self.profile.pass_threshold()
# ---- Resolve dynamic time allocation ----
allocation = self.profile.resolve_time_allocation(user_prompt)
skip_planner = not allocation.get("planner_enabled", True)
skip_evaluator = not allocation.get("evaluator_enabled", True)
log.info(f"Time allocation: planner={allocation['planner']:.0%} "
f"builder={allocation['builder']:.0%} "
f"evaluator={allocation['evaluator']:.0%} "
f"(planner={'skip' if skip_planner else 'on'}, "
f"evaluator={'skip' if skip_evaluator else 'on'})")
# ---- Phase 1: Planning ----
if self.planner and not skip_planner:
log.info("=" * 60)
log.info("PHASE 1: PLANNING")
log.info("=" * 60)
phase_start = time.time()
self.planner.run(
f"Create a plan for the following task:\n\n"
f"{user_prompt}\n\n"
f"Save the plan to spec.md."
)
log.info(f"Planning completed in {time.time() - phase_start:.0f}s")
else:
# No planner — write prompt directly as spec
spec_path = Path(config.WORKSPACE) / config.SPEC_FILE
spec_path.write_text(f"# Task\n\n{user_prompt}\n", encoding="utf-8")
log.info("No planner — wrote prompt directly to spec.md")
# ---- Phase 2: Build → Evaluate loop ----
score_history: list[float] = []
for round_num in range(1, max_rounds + 1):
# ---- Contract negotiation (if enabled) ----
if self.contract_proposer and self.contract_reviewer:
log.info("=" * 60)
log.info(f"ROUND {round_num}/{max_rounds}: CONTRACT NEGOTIATION")
log.info("=" * 60)
contract_start = time.time()
self._negotiate_contract(round_num)
log.info(f"Contract negotiation completed in {time.time() - contract_start:.0f}s")
# ---- Build ----
log.info("=" * 60)
log.info(f"ROUND {round_num}/{max_rounds}: BUILD")
log.info("=" * 60)
build_start = time.time()
# Sync time budget to harness start so builder knows total elapsed time
from middlewares import TimeBudgetMiddleware
for mw in self.builder.middlewares:
if isinstance(mw, TimeBudgetMiddleware):
mw.sync_start_time(total_start)
# Let the profile resolve task-specific timeout
task_timeout = self.profile.resolve_task_timeout(user_prompt)
if task_timeout:
mw.budget_seconds = task_timeout
log.info(f"Time budget set to {task_timeout}s from task metadata")
feedback_path = Path(config.WORKSPACE) / config.FEEDBACK_FILE
prev_feedback = feedback_path.read_text(encoding="utf-8") if feedback_path.exists() else ""
build_task = self.profile.format_build_task(
user_prompt, round_num, prev_feedback, score_history,
)
self.builder.run(build_task)
log.info(f"Build round {round_num} completed in {time.time() - build_start:.0f}s")
# ---- Evaluate (if enabled) ----
if self.evaluator and not skip_evaluator:
log.info("=" * 60)
log.info(f"ROUND {round_num}/{max_rounds}: EVALUATE")
log.info("=" * 60)
eval_start = time.time()
self.evaluator.run(
f"This is evaluation round {round_num}.\n"
f"Read spec.md to understand the task.\n"
f"Examine the work done and test it thoroughly.\n"
f"Score each criterion honestly. Write your evaluation to feedback.md."
)
log.info(f"Evaluation round {round_num} completed in {time.time() - eval_start:.0f}s")
tools.stop_dev_server()
# Check score
feedback_text = ""
if feedback_path.exists():
feedback_text = feedback_path.read_text(encoding="utf-8")
score = self.profile.extract_score(feedback_text)
score_history.append(score)
log.info(f"Round {round_num} average score: {score:.1f} / 10 (threshold: {threshold})")
log.info(f"Score history: {score_history}")
if score >= threshold:
log.info(f"PASSED at round {round_num}.")
break
else:
log.info("No evaluator — single-pass execution.")
break
else:
log.warning(f"Did not pass after {max_rounds} rounds.")
total_duration = time.time() - total_start
log.info("=" * 60)
log.info(f"HARNESS COMPLETE — total time: {total_duration / 60:.1f} minutes")
log.info(f"Output in: {config.WORKSPACE}")
log.info("=" * 60)
def _negotiate_contract(self, round_num: int, max_iterations: int = 3) -> None:
self.contract_proposer.run(
f"This is round {round_num}.\n"
f"Read spec.md. If feedback.md exists, read it too.\n"
f"Propose a sprint contract for this round. Write it to contract.md."
)
for i in range(max_iterations):
log.info(f"[contract] Review iteration {i + 1}/{max_iterations}")
self.contract_reviewer.run(
f"Review the sprint contract in contract.md for round {round_num}.\n"
f"Read spec.md for context. Read feedback.md if it exists.\n"
f"If acceptable, write APPROVED at the top and save to contract.md.\n"
f"If changes needed, write revision requests and save updated contract."
)
contract_path = Path(config.WORKSPACE) / "contract.md"
if contract_path.exists():
contract_text = contract_path.read_text(encoding="utf-8")
if "APPROVED" in contract_text.upper()[:200]:
log.info("[contract] Contract approved.")
return
if i < max_iterations - 1:
log.info("[contract] Contract needs revision...")
self.contract_proposer.run(
f"The reviewer requested changes. Read contract.md and revise."
)
log.warning("[contract] Max iterations reached, proceeding with current contract.")
# ---------------------------------------------------------------------------
# CLI entry point
# ---------------------------------------------------------------------------
def main():
from logger import setup_logging
setup_logging(verbose="--verbose" in sys.argv or "-v" in sys.argv)
# Parse flags
args = [a for a in sys.argv[1:] if a not in ("--verbose", "-v")]
# --list-profiles
if "--list-profiles" in args:
print("Available profiles:\n")
for p in list_profiles():
print(f" {p['name']:15s} {p['description']}")
sys.exit(0)
# --profile <name>
profile_name = "app-builder"
if "--profile" in args:
idx = args.index("--profile")
if idx + 1 < len(args):
profile_name = args[idx + 1]
args = args[:idx] + args[idx + 2:]
else:
print("Error: --profile requires a name")
sys.exit(1)
if not config.API_KEY:
print("Error: Set OPENAI_API_KEY in .env or environment.")
sys.exit(1)
if len(args) < 1:
print("Usage: python harness.py [--profile <name>] \"<task>\" [--verbose]")
print()
print("Profiles:")
for p in list_profiles():
print(f" {p['name']:15s} {p['description']}")
print()
print("Examples:")
print(' python harness.py "Build a DAW in the browser"')
print(' python harness.py --profile terminal "Fix the broken symlinks in /tmp"')
print(' python harness.py --profile swe-bench "Fix the TypeError in parse_config()"')
print(' python harness.py --profile reasoning "What is the escape velocity of Mars?"')
sys.exit(1)
user_prompt = " ".join(args)
try:
profile = get_profile(profile_name)
except ValueError as e:
print(f"Error: {e}")
sys.exit(1)
log.info(f"Prompt: {user_prompt}")
log.info(f"Profile: {profile_name}")
log.info(f"Model: {config.MODEL}")
log.info(f"Base URL: {config.BASE_URL}")
log.info(f"Workspace: {config.WORKSPACE}")
# Preflight — verify API connection with retries for rate limits
# Skip in benchmark mode (HARNESS_FLAT_WORKSPACE) to avoid wasting an API
# call and hitting rate limits when many containers start simultaneously.
if os.environ.get("HARNESS_FLAT_WORKSPACE"):
log.info("Benchmark mode — skipping API preflight check.")
preflight_ok = True
else:
log.info("Verifying API connection...")
from agents import get_client
import random
preflight_ok = False
for attempt in range(8):
try:
resp = get_client().chat.completions.create(
model=config.MODEL,
messages=[{"role": "user", "content": "Say OK"}],
max_tokens=5,
)
log.info(f"API OK — model responded: {resp.choices[0].message.content}")
preflight_ok = True
break
except Exception as e:
err_str = str(e)
if "rate_limit" in err_str or "429" in err_str:
# Exponential backoff with jitter to avoid thundering herd
base_wait = min(2 ** (attempt + 1), 60)
jitter = random.uniform(0, base_wait * 0.5)
wait = base_wait + jitter
log.warning(f"API rate limited (attempt {attempt+1}/8), waiting {wait:.1f}s...")
time.sleep(wait)
else:
log.error(f"API preflight failed: {e}")
break
if not preflight_ok:
print(f"\nCannot connect to API. Check your .env:\n"
f" OPENAI_API_KEY — is it valid?\n"
f" OPENAI_BASE_URL — is {config.BASE_URL} correct?\n"
f" HARNESS_MODEL — does {config.MODEL} exist on this provider?")
sys.exit(1)
harness = Harness(profile)
try:
harness.run(user_prompt)
except KeyboardInterrupt:
log.warning("Interrupted by user.")
sys.exit(130)
except Exception as e:
log.error(f"Harness crashed with unhandled exception: {e}", exc_info=True)
# Exit 1 signals failure to Harbor, but at least we log the cause
sys.exit(1)
if __name__ == "__main__":
main()