diff --git a/factory/cli.py b/factory/cli.py index 6905356..7208e45 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -15,6 +15,10 @@ import time from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from factory.messages import Message def _run(coro): # noqa: ANN001, ANN202 @@ -314,6 +318,29 @@ def cmd_finalize(args: argparse.Namespace) -> int: return 0 +def cmd_message(args: argparse.Namespace) -> int: + """Queue a message for the CEO agent.""" + from factory.messages import write_message + + project_path = Path(args.path) + if not project_path.exists(): + print(f"Error: project path does not exist: {project_path}", file=sys.stderr) + return 1 + if not (project_path / ".factory").exists(): + print(f"Error: not a factory project (no .factory/ directory): {project_path}", file=sys.stderr) + return 1 + if not args.text or not args.text.strip(): + print("Error: message text must not be empty.", file=sys.stderr) + return 1 + try: + msg = write_message(project_path, args.text) + except ValueError as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 + print(f"Message queued (id={msg.id}). The CEO will see it at the start of the next cycle.") + return 0 + + def cmd_history(args: argparse.Namespace) -> int: from factory.store import ExperimentStore @@ -1373,6 +1400,11 @@ def cmd_ceo(args: argparse.Namespace) -> int: from factory.study import add_backlog_item add_backlog_item(project_path, focus) + from factory.messages import mark_read, read_pending + + pending = read_pending(project_path) + pending_ids = [m.id for m in pending] + ceo_mode = "build" if mode == "interactive" or research_ideation else mode task = _build_ceo_task( project_path, ceo_mode, context, focus=focus, prompt_file=prompt_file, @@ -1380,11 +1412,12 @@ def cmd_ceo(args: argparse.Namespace) -> int: discover_only=discover_only, no_github=no_github, interactive_idea=interactive_idea, research_ideation=research_ideation, + messages=pending, ) standup = _run_standup(project_path, ceo_mode, model=model) if standup: - task += f"\n\n## Sprint Standup\n\n{standup}" + task += f"\n\n## Sprint Standup\n\n{standup}" if headless: # Non-interactive pipe mode (for scripting, cron, tmux) @@ -1400,6 +1433,9 @@ def cmd_ceo(args: argparse.Namespace) -> int: timeout=7200.0, )) print(result) + if code == 0: + if pending_ids: + mark_read(project_path, pending_ids) if code != 0: return code return _chain_modes( @@ -1409,7 +1445,16 @@ def cmd_ceo(args: argparse.Namespace) -> int: model=model, no_github=no_github, ) - # Interactive foreground mode: use runner's interactive_exec + # Interactive foreground mode: use runner's interactive_exec. + # Mark read before exec — interactive_exec replaces the process via os.execvp + # so there's no post-execution hook. If the session fails to launch, messages + # are lost. This is accepted: the user is at the terminal and can re-send. + if pending_ids: + print( + f"Consuming {len(pending_ids)} message(s): {', '.join(pending_ids)}", + file=sys.stderr, + ) + mark_read(project_path, pending_ids) prompt = resolve_prompt("ceo", project_path) runner = get_runner(runner_name) runner.interactive_exec( @@ -1816,10 +1861,18 @@ def _build_ceo_task( no_github: bool = False, interactive_idea: str | None = None, research_ideation: str | None = None, + messages: list[Message] | None = None, ) -> str: """Build the CEO agent task string from mode and optional context.""" task = f"Project: {project_path}\nMode: {mode}" + if messages: + task += "\n\n## User Messages\n" + task += "The user has sent the following directives. Treat these as HIGH PRIORITY:\n\n" + for msg in messages: + ts = msg.timestamp.strftime("%Y-%m-%d %H:%M:%S") + task += f"**[{ts}]** {msg.text}\n\n" + if interactive_idea: task += ( f"\n\n## Interactive Ideation Mode (Phase 0)\n\n" @@ -2006,10 +2059,16 @@ def _run_single_cycle( from factory.study import add_backlog_item add_backlog_item(project_path, focus) + from factory.messages import mark_read, read_pending + + pending = read_pending(project_path) + pending_ids = [m.id for m in pending] + task = _build_ceo_task( project_path, mode, context, focus=focus, prompt_file=prompt_file, min_growth=min_growth, max_new=max_new, branch=branch, discover_only=discover_only, no_github=no_github, + messages=pending, ) standup = _run_standup(project_path, mode, model=model) @@ -2025,6 +2084,10 @@ def _run_single_cycle( model=model, )) + if code == 0: + if pending_ids: + mark_read(project_path, pending_ids) + print(result) return code @@ -2390,6 +2453,11 @@ def build_parser() -> argparse.ArgumentParser: # vault-init p = sub.add_parser("vault-init", help="Create the factory Obsidian vault") + # message — send a directive to the CEO + p = sub.add_parser("message", help="Send a message to the CEO for the next cycle") + p.add_argument("path", help="Path to the project") + p.add_argument("text", help="Message text") + # self-update sub.add_parser("self-update", help="Upgrade the factory CLI to the latest version") @@ -2605,6 +2673,7 @@ def main(argv: list[str] | None = None) -> int: "resume": cmd_resume, "log": cmd_log, "vault-init": cmd_vault_init, + "message": cmd_message, "self-update": cmd_self_update, "install": cmd_install, "serve-mcp": cmd_serve_mcp, diff --git a/factory/messages.py b/factory/messages.py new file mode 100644 index 0000000..e1b1489 --- /dev/null +++ b/factory/messages.py @@ -0,0 +1,129 @@ +"""User-to-CEO message channel. + +Users queue messages via ``factory message``. The factory loop reads pending +messages and injects them into the CEO's task string each cycle. +""" + +from __future__ import annotations + +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + +import structlog + +log = structlog.get_logger() + +MAX_MESSAGE_CHARS = 10_000 +MAX_PENDING_MESSAGES = 20 +MAX_TOTAL_CHARS = 50_000 + + +@dataclass(frozen=True) +class Message: + id: str + timestamp: datetime + text: str + + +def _messages_dir(project_path: Path) -> Path: + return project_path / ".factory" / "messages" + + +def _read_dir(project_path: Path) -> Path: + return _messages_dir(project_path) / "read" + + +def write_message(project_path: Path, text: str) -> Message: + """Write a message to the pending queue. Returns the created Message.""" + if not text or not text.strip(): + raise ValueError("Message text must not be empty or whitespace-only.") + if len(text) > MAX_MESSAGE_CHARS: + raise ValueError( + f"Message text exceeds maximum length of {MAX_MESSAGE_CHARS} characters " + f"(got {len(text)})." + ) + + msg_dir = _messages_dir(project_path) + msg_dir.mkdir(parents=True, exist_ok=True) + + existing = list(msg_dir.glob("*.md")) + if len(existing) >= MAX_PENDING_MESSAGES: + raise ValueError( + f"Too many pending messages ({len(existing)} >= {MAX_PENDING_MESSAGES}). " + "Wait for existing messages to be consumed before sending more." + ) + + ts = datetime.now(timezone.utc) + msg_id = ts.strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8] + + path = msg_dir / f"{msg_id}.md" + path.write_text(f"timestamp: {ts.isoformat()}\n\n{text}\n") + + msg = Message(id=msg_id, timestamp=ts, text=text) + log.info("message_written", id=msg_id, chars=len(text)) + return msg + + +def read_pending( + project_path: Path, + max_messages: int = MAX_PENDING_MESSAGES, + max_chars: int = MAX_TOTAL_CHARS, +) -> list[Message]: + """Read pending (unread) messages, ordered by timestamp. + + Caps at ``max_messages`` messages or ``max_chars`` total characters + to prevent flooding the CEO task string. + """ + msg_dir = _messages_dir(project_path) + if not msg_dir.exists(): + return [] + + all_paths = sorted(msg_dir.glob("*.md")) + if len(all_paths) > max_messages: + log.warning("messages_capped", total=len(all_paths), cap=max_messages) + + messages: list[Message] = [] + total_chars = 0 + for path in all_paths: + if len(messages) >= max_messages: + break + if path.is_symlink(): + continue + content = path.read_text() + lines = content.split("\n", 2) + ts = datetime.now(timezone.utc) + if lines and lines[0].startswith("timestamp:"): + try: + ts = datetime.fromisoformat(lines[0].split(":", 1)[1].strip()) + except ValueError: + log.warning("message_timestamp_parse_failed", path=str(path)) + text = lines[2].strip() if len(lines) > 2 else content.strip() + if total_chars + len(text) > max_chars and messages: + log.warning("messages_chars_capped", total_chars=total_chars + len(text), cap=max_chars) + break + total_chars += len(text) + messages.append(Message(id=path.stem, timestamp=ts, text=text)) + + return messages + + +def mark_read(project_path: Path, message_ids: list[str]) -> None: + """Move messages to the read/ subdirectory.""" + msg_dir = _messages_dir(project_path) + read_dir = _read_dir(project_path) + read_dir.mkdir(parents=True, exist_ok=True) + + for msg_id in message_ids: + raw = msg_dir / f"{msg_id}.md" + if raw.is_symlink(): + log.warning("mark_read_symlink_skipped", id=msg_id) + continue + src = raw.resolve() + if not src.is_relative_to(msg_dir.resolve()): + log.warning("mark_read_path_traversal", id=msg_id) + continue + if src.exists(): + src.rename(read_dir / src.name) + log.debug("message_marked_read", id=msg_id) diff --git a/tests/test_messages.py b/tests/test_messages.py new file mode 100644 index 0000000..9aeaa0c --- /dev/null +++ b/tests/test_messages.py @@ -0,0 +1,325 @@ +"""Tests for factory.messages — user-to-CEO message channel.""" + +from pathlib import Path + +import pytest + +from factory.messages import ( + MAX_MESSAGE_CHARS, + MAX_PENDING_MESSAGES, + MAX_TOTAL_CHARS, + mark_read, + read_pending, + write_message, +) + + +class TestWriteMessage: + def test_creates_message_file(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "focus on quality") + assert msg.text == "focus on quality" + assert msg.id + + msg_dir = project / ".factory" / "messages" + assert msg_dir.exists() + files = list(msg_dir.glob("*.md")) + assert len(files) == 1 + assert "focus on quality" in files[0].read_text() + + def test_multiple_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "msg 1") + write_message(project, "msg 2") + + files = list((project / ".factory" / "messages").glob("*.md")) + assert len(files) == 2 + + +class TestReadPending: + def test_empty_when_no_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + assert read_pending(project) == [] + + def test_reads_written_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "hello CEO") + pending = read_pending(project) + assert len(pending) == 1 + assert pending[0].text == "hello CEO" + + def test_ordered_by_filename(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "first") + write_message(project, "second") + pending = read_pending(project) + assert len(pending) == 2 + assert pending[0].text == "first" + assert pending[1].text == "second" + + +class TestMarkRead: + def test_moves_to_read_dir(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "done reading") + mark_read(project, [msg.id]) + + assert read_pending(project) == [] + read_dir = project / ".factory" / "messages" / "read" + assert read_dir.exists() + files = list(read_dir.glob("*.md")) + assert len(files) == 1 + + def test_partial_mark_read(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg1 = write_message(project, "msg 1") + write_message(project, "msg 2") + mark_read(project, [msg1.id]) + + pending = read_pending(project) + assert len(pending) == 1 + assert pending[0].text == "msg 2" + + def test_idempotent(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "test") + mark_read(project, [msg.id]) + mark_read(project, [msg.id]) + assert read_pending(project) == [] + + +class TestMessageCLI: + def test_cmd_message_writes_file(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text="focus on quality gates") + result = cmd_message(args) + assert result == 0 + msg_dir = project / ".factory" / "messages" + files = list(msg_dir.glob("*.md")) + assert len(files) == 1 + assert "focus on quality gates" in files[0].read_text() + + def test_message_subcommand_parsing(self) -> None: + from factory.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["message", "/tmp/project", "hello CEO"]) + assert args.path == "/tmp/project" + assert args.text == "hello CEO" + + +class TestMessageInjection: + def test_build_ceo_task_includes_messages(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + write_message(project, "fix quality gates first") + + pending = read_pending(project) + task = _build_ceo_task(project, "improve", messages=pending) + assert "User Messages" in task + assert "fix quality gates first" in task + assert "HIGH PRIORITY" in task + + def test_build_ceo_task_no_messages(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + task = _build_ceo_task(project, "improve") + assert "User Messages" not in task + + def test_build_ceo_task_does_not_mark_read(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + write_message(project, "test message") + pending = read_pending(project) + assert len(pending) == 1 + + _build_ceo_task(project, "improve", messages=pending) + assert len(read_pending(project)) == 1 + + +class TestMessageValidation: + def test_cmd_message_rejects_nonexistent_path(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + args = Namespace(path=str(tmp_path / "nonexistent"), text="hello") + assert cmd_message(args) == 1 + + def test_cmd_message_rejects_non_factory_project(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + args = Namespace(path=str(project), text="hello") + assert cmd_message(args) == 1 + + def test_cmd_message_rejects_empty_text(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text="") + assert cmd_message(args) == 1 + + def test_cmd_message_rejects_whitespace_text(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text=" \n ") + assert cmd_message(args) == 1 + + +class TestReadPendingCaps: + def test_read_pending_caps_at_max_messages(self, tmp_path: Path) -> None: + """Writing 25 messages, read_pending returns only 20 (MAX_PENDING_MESSAGES).""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Write 25 messages directly to bypass rate limiting + for i in range(25): + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\nhello {i}\n" + ) + + pending = read_pending(project) + assert len(pending) == MAX_PENDING_MESSAGES # 20 + + def test_read_pending_truncates_at_max_total_chars(self, tmp_path: Path) -> None: + """Messages exceeding MAX_TOTAL_CHARS are truncated.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Each message is ~10k chars; at 50k cap we should get ~5 + for i in range(10): + text = "x" * 10_000 + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\n{text}\n" + ) + + pending = read_pending(project) + total = sum(len(m.text) for m in pending) + assert total <= MAX_TOTAL_CHARS + assert len(pending) < 10 + + def test_single_large_message_bypasses_char_cap(self, tmp_path: Path) -> None: + """First message is always included even if it alone exceeds max_chars.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Single message larger than MAX_TOTAL_CHARS — the `and messages` guard + # means the first message always passes through. + big_text = "y" * (MAX_TOTAL_CHARS + 1_000) + (msg_dir / "20260504T120000000000-aaaaaaaa.md").write_text( + f"timestamp: 2026-05-04T12:00:00+00:00\n\n{big_text}\n" + ) + + pending = read_pending(project) + assert len(pending) == 1 + assert len(pending[0].text) > MAX_TOTAL_CHARS + + +class TestMessageSizeValidation: + def test_write_rejects_empty_text(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="empty"): + write_message(project, "") + + def test_write_rejects_whitespace_only_text(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="empty"): + write_message(project, " \n\t ") + + def test_write_rejects_oversized_message(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="exceeds maximum"): + write_message(project, "x" * (MAX_MESSAGE_CHARS + 1)) + + +class TestRateLimiting: + def test_write_rejects_when_too_many_pending(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Create MAX_PENDING_MESSAGES files directly + for i in range(MAX_PENDING_MESSAGES): + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\nmsg {i}\n" + ) + + with pytest.raises(ValueError, match="Too many pending"): + write_message(project, "one more") + + +class TestMalformedMessages: + def test_malformed_file_no_timestamp_header(self, tmp_path: Path) -> None: + """A message file without a valid timestamp header is still read gracefully.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + (msg_dir / "20260504T120000000000-badbadbad.md").write_text( + "no timestamp here\n\njust some text\n" + ) + + pending = read_pending(project) + assert len(pending) == 1 + # The text should still be extracted from line index 2 + assert pending[0].text == "just some text"