Skip to content
Merged
73 changes: 71 additions & 2 deletions factory/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,6 +15,10 @@
import time
from datetime import datetime
from pathlib import Path
from typing import TYPE_CHECKING

if TYPE_CHECKING:
from factory.messages import Message


def _run(coro): # noqa: ANN001, ANN202
Expand Down Expand Up @@ -314,6 +318,29 @@ def cmd_finalize(args: argparse.Namespace) -> int:
return 0


def cmd_message(args: argparse.Namespace) -> int:
"""Queue a message for the CEO agent."""
from factory.messages import write_message

project_path = Path(args.path)
if not project_path.exists():
print(f"Error: project path does not exist: {project_path}", file=sys.stderr)
return 1
if not (project_path / ".factory").exists():
print(f"Error: not a factory project (no .factory/ directory): {project_path}", file=sys.stderr)
return 1
if not args.text or not args.text.strip():
print("Error: message text must not be empty.", file=sys.stderr)
return 1
try:
msg = write_message(project_path, args.text)
except ValueError as exc:
print(f"Error: {exc}", file=sys.stderr)
return 1
print(f"Message queued (id={msg.id}). The CEO will see it at the start of the next cycle.")
return 0


def cmd_history(args: argparse.Namespace) -> int:
from factory.store import ExperimentStore

Expand Down Expand Up @@ -1373,18 +1400,24 @@ def cmd_ceo(args: argparse.Namespace) -> int:
from factory.study import add_backlog_item
add_backlog_item(project_path, focus)

from factory.messages import mark_read, read_pending

pending = read_pending(project_path)
pending_ids = [m.id for m in pending]

ceo_mode = "build" if mode == "interactive" or research_ideation else mode
task = _build_ceo_task(
project_path, ceo_mode, context, focus=focus, prompt_file=prompt_file,
min_growth=min_growth, max_new=max_new, branch=branch,
discover_only=discover_only, no_github=no_github,
interactive_idea=interactive_idea,
research_ideation=research_ideation,
messages=pending,
)

standup = _run_standup(project_path, ceo_mode, model=model)
if standup:
task += f"\n\n## Sprint Standup\n\n{standup}"
task += f"\n\n## Sprint Standup\n\n{standup}"

if headless:
# Non-interactive pipe mode (for scripting, cron, tmux)
Expand All @@ -1400,6 +1433,9 @@ def cmd_ceo(args: argparse.Namespace) -> int:
timeout=7200.0,
))
print(result)
if code == 0:
if pending_ids:
mark_read(project_path, pending_ids)
if code != 0:
return code
return _chain_modes(
Expand All @@ -1409,7 +1445,16 @@ def cmd_ceo(args: argparse.Namespace) -> int:
model=model, no_github=no_github,
)

# Interactive foreground mode: use runner's interactive_exec
# Interactive foreground mode: use runner's interactive_exec.
# Mark read before exec — interactive_exec replaces the process via os.execvp
# so there's no post-execution hook. If the session fails to launch, messages
# are lost. This is accepted: the user is at the terminal and can re-send.
if pending_ids:
print(
f"Consuming {len(pending_ids)} message(s): {', '.join(pending_ids)}",
file=sys.stderr,
)
mark_read(project_path, pending_ids)
prompt = resolve_prompt("ceo", project_path)
runner = get_runner(runner_name)
runner.interactive_exec(
Expand Down Expand Up @@ -1816,10 +1861,18 @@ def _build_ceo_task(
no_github: bool = False,
interactive_idea: str | None = None,
research_ideation: str | None = None,
messages: list[Message] | None = None,
) -> str:
"""Build the CEO agent task string from mode and optional context."""
task = f"Project: {project_path}\nMode: {mode}"

if messages:
task += "\n\n## User Messages\n"
task += "The user has sent the following directives. Treat these as HIGH PRIORITY:\n\n"
for msg in messages:
ts = msg.timestamp.strftime("%Y-%m-%d %H:%M:%S")
task += f"**[{ts}]** {msg.text}\n\n"

if interactive_idea:
task += (
f"\n\n## Interactive Ideation Mode (Phase 0)\n\n"
Expand Down Expand Up @@ -2006,10 +2059,16 @@ def _run_single_cycle(
from factory.study import add_backlog_item
add_backlog_item(project_path, focus)

from factory.messages import mark_read, read_pending

pending = read_pending(project_path)
pending_ids = [m.id for m in pending]

task = _build_ceo_task(
project_path, mode, context, focus=focus, prompt_file=prompt_file,
min_growth=min_growth, max_new=max_new, branch=branch,
discover_only=discover_only, no_github=no_github,
messages=pending,
)

standup = _run_standup(project_path, mode, model=model)
Expand All @@ -2025,6 +2084,10 @@ def _run_single_cycle(
model=model,
))

if code == 0:
if pending_ids:
mark_read(project_path, pending_ids)

print(result)
return code

Expand Down Expand Up @@ -2390,6 +2453,11 @@ def build_parser() -> argparse.ArgumentParser:
# vault-init
p = sub.add_parser("vault-init", help="Create the factory Obsidian vault")

# message — send a directive to the CEO
p = sub.add_parser("message", help="Send a message to the CEO for the next cycle")
p.add_argument("path", help="Path to the project")
p.add_argument("text", help="Message text")

# self-update
sub.add_parser("self-update", help="Upgrade the factory CLI to the latest version")

Expand Down Expand Up @@ -2605,6 +2673,7 @@ def main(argv: list[str] | None = None) -> int:
"resume": cmd_resume,
"log": cmd_log,
"vault-init": cmd_vault_init,
"message": cmd_message,
"self-update": cmd_self_update,
"install": cmd_install,
"serve-mcp": cmd_serve_mcp,
Expand Down
129 changes: 129 additions & 0 deletions factory/messages.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,129 @@
"""User-to-CEO message channel.

Users queue messages via ``factory message``. The factory loop reads pending
messages and injects them into the CEO's task string each cycle.
"""

from __future__ import annotations

import uuid
from dataclasses import dataclass
from datetime import datetime, timezone
from pathlib import Path

import structlog

log = structlog.get_logger()

MAX_MESSAGE_CHARS = 10_000
MAX_PENDING_MESSAGES = 20
MAX_TOTAL_CHARS = 50_000


@dataclass(frozen=True)
class Message:
id: str
timestamp: datetime
text: str


def _messages_dir(project_path: Path) -> Path:
return project_path / ".factory" / "messages"


def _read_dir(project_path: Path) -> Path:
return _messages_dir(project_path) / "read"


def write_message(project_path: Path, text: str) -> Message:
"""Write a message to the pending queue. Returns the created Message."""
if not text or not text.strip():
raise ValueError("Message text must not be empty or whitespace-only.")
if len(text) > MAX_MESSAGE_CHARS:
raise ValueError(
f"Message text exceeds maximum length of {MAX_MESSAGE_CHARS} characters "
f"(got {len(text)})."
)

msg_dir = _messages_dir(project_path)
msg_dir.mkdir(parents=True, exist_ok=True)

existing = list(msg_dir.glob("*.md"))
if len(existing) >= MAX_PENDING_MESSAGES:
raise ValueError(
f"Too many pending messages ({len(existing)} >= {MAX_PENDING_MESSAGES}). "
"Wait for existing messages to be consumed before sending more."
)

ts = datetime.now(timezone.utc)
msg_id = ts.strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8]

path = msg_dir / f"{msg_id}.md"
path.write_text(f"timestamp: {ts.isoformat()}\n\n{text}\n")

msg = Message(id=msg_id, timestamp=ts, text=text)
log.info("message_written", id=msg_id, chars=len(text))
return msg


def read_pending(
project_path: Path,
max_messages: int = MAX_PENDING_MESSAGES,
max_chars: int = MAX_TOTAL_CHARS,
) -> list[Message]:
"""Read pending (unread) messages, ordered by timestamp.

Caps at ``max_messages`` messages or ``max_chars`` total characters
to prevent flooding the CEO task string.
"""
msg_dir = _messages_dir(project_path)
if not msg_dir.exists():
return []

all_paths = sorted(msg_dir.glob("*.md"))
if len(all_paths) > max_messages:
log.warning("messages_capped", total=len(all_paths), cap=max_messages)

messages: list[Message] = []
total_chars = 0
for path in all_paths:
if len(messages) >= max_messages:
break
if path.is_symlink():
continue
content = path.read_text()
lines = content.split("\n", 2)
ts = datetime.now(timezone.utc)
if lines and lines[0].startswith("timestamp:"):
try:
ts = datetime.fromisoformat(lines[0].split(":", 1)[1].strip())
except ValueError:
log.warning("message_timestamp_parse_failed", path=str(path))
text = lines[2].strip() if len(lines) > 2 else content.strip()
if total_chars + len(text) > max_chars and messages:
log.warning("messages_chars_capped", total_chars=total_chars + len(text), cap=max_chars)
break
total_chars += len(text)
messages.append(Message(id=path.stem, timestamp=ts, text=text))

return messages


def mark_read(project_path: Path, message_ids: list[str]) -> None:
"""Move messages to the read/ subdirectory."""
msg_dir = _messages_dir(project_path)
read_dir = _read_dir(project_path)
read_dir.mkdir(parents=True, exist_ok=True)

for msg_id in message_ids:
raw = msg_dir / f"{msg_id}.md"
if raw.is_symlink():
log.warning("mark_read_symlink_skipped", id=msg_id)
continue
src = raw.resolve()
if not src.is_relative_to(msg_dir.resolve()):
log.warning("mark_read_path_traversal", id=msg_id)
continue
if src.exists():
src.rename(read_dir / src.name)
log.debug("message_marked_read", id=msg_id)
Loading
Loading