From d94f62ca11bc9dfe92edc916ab713d763b944734 Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Thu, 7 May 2026 15:53:57 -0400 Subject: [PATCH 1/7] feat: user-to-CEO message channel Adds `factory message "text"` to queue directives for the CEO. Messages are injected into the CEO's task string as a high-priority `## User Messages` section at the start of each cycle, then marked read. - New module: factory/messages.py (write_message, read_pending, mark_read) - CLI: cmd_message + _build_ceo_task injection - 13 tests covering lifecycle, CLI, and injection Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 26 +++++++ factory/messages.py | 85 +++++++++++++++++++++ tests/test_messages.py | 162 +++++++++++++++++++++++++++++++++++++++++ 3 files changed, 273 insertions(+) create mode 100644 factory/messages.py create mode 100644 tests/test_messages.py diff --git a/factory/cli.py b/factory/cli.py index 6905356..959e691 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -314,6 +314,15 @@ def cmd_finalize(args: argparse.Namespace) -> int: return 0 +def cmd_message(args: argparse.Namespace) -> int: + from factory.messages import write_message + + project_path = Path(args.path) + msg = write_message(project_path, args.text) + print(f"Message queued (id={msg.id}). The CEO will see it at the start of the next cycle.") + return 0 + + def cmd_history(args: argparse.Namespace) -> int: from factory.store import ExperimentStore @@ -1818,8 +1827,19 @@ def _build_ceo_task( research_ideation: str | None = None, ) -> str: """Build the CEO agent task string from mode and optional context.""" + from factory.messages import mark_read, read_pending + task = f"Project: {project_path}\nMode: {mode}" + pending = read_pending(project_path) + if pending: + task += "\n\n## User Messages\n" + task += "The user has sent the following directives. Treat these as HIGH PRIORITY:\n\n" + for msg in pending: + ts = msg.timestamp.strftime("%Y-%m-%d %H:%M:%S") + task += f"**[{ts}]** {msg.text}\n\n" + mark_read(project_path, [m.id for m in pending]) + if interactive_idea: task += ( f"\n\n## Interactive Ideation Mode (Phase 0)\n\n" @@ -2390,6 +2410,11 @@ def build_parser() -> argparse.ArgumentParser: # vault-init p = sub.add_parser("vault-init", help="Create the factory Obsidian vault") + # message — send a directive to the CEO + p = sub.add_parser("message", help="Send a message to the CEO for the next cycle") + p.add_argument("path", help="Path to the project") + p.add_argument("text", help="Message text") + # self-update sub.add_parser("self-update", help="Upgrade the factory CLI to the latest version") @@ -2605,6 +2630,7 @@ def main(argv: list[str] | None = None) -> int: "resume": cmd_resume, "log": cmd_log, "vault-init": cmd_vault_init, + "message": cmd_message, "self-update": cmd_self_update, "install": cmd_install, "serve-mcp": cmd_serve_mcp, diff --git a/factory/messages.py b/factory/messages.py new file mode 100644 index 0000000..d156395 --- /dev/null +++ b/factory/messages.py @@ -0,0 +1,85 @@ +"""User-to-CEO message channel. + +Users queue messages via ``factory message``. The factory loop reads pending +messages and injects them into the CEO's task string each cycle. +""" + +from __future__ import annotations + +import shutil +import uuid +from dataclasses import dataclass +from datetime import datetime, timezone +from pathlib import Path + +import structlog + +log = structlog.get_logger() + + +@dataclass +class Message: + id: str + timestamp: datetime + text: str + + +def _messages_dir(project_path: Path) -> Path: + return project_path / ".factory" / "messages" + + +def _read_dir(project_path: Path) -> Path: + return _messages_dir(project_path) / "read" + + +def write_message(project_path: Path, text: str) -> Message: + """Write a message to the pending queue. Returns the created Message.""" + msg_dir = _messages_dir(project_path) + msg_dir.mkdir(parents=True, exist_ok=True) + + msg_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8] + ts = datetime.now(timezone.utc) + + path = msg_dir / f"{msg_id}.md" + path.write_text(f"timestamp: {ts.isoformat()}\n\n{text}\n") + + msg = Message(id=msg_id, timestamp=ts, text=text) + log.info("message_written", id=msg_id, chars=len(text)) + return msg + + +def read_pending(project_path: Path) -> list[Message]: + """Read all pending (unread) messages, ordered by timestamp.""" + msg_dir = _messages_dir(project_path) + if not msg_dir.exists(): + return [] + + messages: list[Message] = [] + for path in sorted(msg_dir.glob("*.md")): + if path.parent.name == "read": + continue + content = path.read_text() + lines = content.split("\n", 2) + ts = datetime.now(timezone.utc) + if lines and lines[0].startswith("timestamp:"): + try: + ts = datetime.fromisoformat(lines[0].split(":", 1)[1].strip()) + except ValueError: + pass + text = lines[2].strip() if len(lines) > 2 else content.strip() + messages.append(Message(id=path.stem, timestamp=ts, text=text)) + + return messages + + +def mark_read(project_path: Path, message_ids: list[str]) -> None: + """Move messages to the read/ subdirectory.""" + msg_dir = _messages_dir(project_path) + read_dir = _read_dir(project_path) + read_dir.mkdir(parents=True, exist_ok=True) + + for msg_id in message_ids: + src = msg_dir / f"{msg_id}.md" + if src.exists(): + shutil.move(str(src), str(read_dir / src.name)) + log.debug("message_marked_read", id=msg_id) diff --git a/tests/test_messages.py b/tests/test_messages.py new file mode 100644 index 0000000..aaf4392 --- /dev/null +++ b/tests/test_messages.py @@ -0,0 +1,162 @@ +"""Tests for factory.messages — user-to-CEO message channel.""" + +from pathlib import Path + +from factory.messages import mark_read, read_pending, write_message + + +class TestWriteMessage: + def test_creates_message_file(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "focus on quality") + assert msg.text == "focus on quality" + assert msg.id + + msg_dir = project / ".factory" / "messages" + assert msg_dir.exists() + files = list(msg_dir.glob("*.md")) + assert len(files) == 1 + assert "focus on quality" in files[0].read_text() + + def test_multiple_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "msg 1") + write_message(project, "msg 2") + + files = list((project / ".factory" / "messages").glob("*.md")) + assert len(files) == 2 + + +class TestReadPending: + def test_empty_when_no_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + assert read_pending(project) == [] + + def test_reads_written_messages(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "hello CEO") + pending = read_pending(project) + assert len(pending) == 1 + assert pending[0].text == "hello CEO" + + def test_ordered_by_filename(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + write_message(project, "first") + write_message(project, "second") + pending = read_pending(project) + assert len(pending) == 2 + assert pending[0].text == "first" + assert pending[1].text == "second" + + +class TestMarkRead: + def test_moves_to_read_dir(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "done reading") + mark_read(project, [msg.id]) + + assert read_pending(project) == [] + read_dir = project / ".factory" / "messages" / "read" + assert read_dir.exists() + files = list(read_dir.glob("*.md")) + assert len(files) == 1 + + def test_partial_mark_read(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg1 = write_message(project, "msg 1") + write_message(project, "msg 2") + mark_read(project, [msg1.id]) + + pending = read_pending(project) + assert len(pending) == 1 + assert pending[0].text == "msg 2" + + def test_idempotent(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + + msg = write_message(project, "test") + mark_read(project, [msg.id]) + mark_read(project, [msg.id]) + assert read_pending(project) == [] + + +class TestMessageCLI: + def test_cmd_message_writes_file(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text="focus on quality gates") + result = cmd_message(args) + assert result == 0 + msg_dir = project / ".factory" / "messages" + files = list(msg_dir.glob("*.md")) + assert len(files) == 1 + assert "focus on quality gates" in files[0].read_text() + + def test_message_subcommand_parsing(self) -> None: + from factory.cli import build_parser + + parser = build_parser() + args = parser.parse_args(["message", "/tmp/project", "hello CEO"]) + assert args.path == "/tmp/project" + assert args.text == "hello CEO" + + +class TestMessageInjection: + def test_build_ceo_task_includes_pending_messages(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + write_message(project, "fix quality gates first") + + task = _build_ceo_task(project, "improve") + assert "User Messages" in task + assert "fix quality gates first" in task + assert "HIGH PRIORITY" in task + + def test_build_ceo_task_no_messages(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + task = _build_ceo_task(project, "improve") + assert "User Messages" not in task + + def test_messages_marked_read_after_injection(self, tmp_path: Path) -> None: + from factory.cli import _build_ceo_task + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + write_message(project, "test message") + assert len(read_pending(project)) == 1 + + _build_ceo_task(project, "improve") + assert len(read_pending(project)) == 0 From 8e56bb0a1bc09f590598b1a10e6883ab4a34f795 Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Thu, 7 May 2026 16:17:04 -0400 Subject: [PATCH 2/7] =?UTF-8?q?fix:=20address=20review=20=E2=80=94=20side?= =?UTF-8?q?=20effect,=20double=20datetime,=20dead=20guard,=20validation?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. Move mark_read out of _build_ceo_task to call sites (_run_single_cycle, cmd_ceo) so messages aren't lost if CEO fails to spawn 2. Use single datetime.now() in write_message 3. Remove unreachable path.parent.name == "read" guard in read_pending 4. Add project path validation in cmd_message (reject nonexistent/non-factory) Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 27 ++++++++++++++++++++++++--- factory/messages.py | 4 +--- tests/test_messages.py | 24 ++++++++++++++++++++++-- 3 files changed, 47 insertions(+), 8 deletions(-) diff --git a/factory/cli.py b/factory/cli.py index 959e691..9779f34 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -318,6 +318,12 @@ def cmd_message(args: argparse.Namespace) -> int: from factory.messages import write_message project_path = Path(args.path) + if not project_path.exists(): + print(f"Error: project path does not exist: {project_path}", file=sys.stderr) + return 1 + if not (project_path / ".factory").exists(): + print(f"Error: not a factory project (no .factory/ directory): {project_path}", file=sys.stderr) + return 1 msg = write_message(project_path, args.text) print(f"Message queued (id={msg.id}). The CEO will see it at the start of the next cycle.") return 0 @@ -1382,6 +1388,8 @@ def cmd_ceo(args: argparse.Namespace) -> int: from factory.study import add_backlog_item add_backlog_item(project_path, focus) + from factory.messages import mark_read, read_pending + ceo_mode = "build" if mode == "interactive" or research_ideation else mode task = _build_ceo_task( project_path, ceo_mode, context, focus=focus, prompt_file=prompt_file, @@ -1391,9 +1399,11 @@ def cmd_ceo(args: argparse.Namespace) -> int: research_ideation=research_ideation, ) + pending_ids = [m.id for m in read_pending(project_path)] + standup = _run_standup(project_path, ceo_mode, model=model) if standup: - task += f"\n\n## Sprint Standup\n\n{standup}" + task += f"\n\n## Sprint Standup\n\n{standup}" if headless: # Non-interactive pipe mode (for scripting, cron, tmux) @@ -1408,6 +1418,8 @@ def cmd_ceo(args: argparse.Namespace) -> int: model=model, timeout=7200.0, )) + if pending_ids: + mark_read(project_path, pending_ids) print(result) if code != 0: return code @@ -1419,6 +1431,8 @@ def cmd_ceo(args: argparse.Namespace) -> int: ) # Interactive foreground mode: use runner's interactive_exec + if pending_ids: + mark_read(project_path, pending_ids) prompt = resolve_prompt("ceo", project_path) runner = get_runner(runner_name) runner.interactive_exec( @@ -1827,7 +1841,7 @@ def _build_ceo_task( research_ideation: str | None = None, ) -> str: """Build the CEO agent task string from mode and optional context.""" - from factory.messages import mark_read, read_pending + from factory.messages import read_pending task = f"Project: {project_path}\nMode: {mode}" @@ -1838,7 +1852,6 @@ def _build_ceo_task( for msg in pending: ts = msg.timestamp.strftime("%Y-%m-%d %H:%M:%S") task += f"**[{ts}]** {msg.text}\n\n" - mark_read(project_path, [m.id for m in pending]) if interactive_idea: task += ( @@ -2036,6 +2049,10 @@ def _run_single_cycle( if standup: task += f"\n\n## Sprint Standup\n\n{standup}" + from factory.messages import mark_read, read_pending + + pending_ids = [m.id for m in read_pending(project_path)] + result, code = _run(invoke_agent( "ceo", task, @@ -2045,6 +2062,10 @@ def _run_single_cycle( model=model, )) + if code == 0: + if pending_ids: + mark_read(project_path, pending_ids) + print(result) return code diff --git a/factory/messages.py b/factory/messages.py index d156395..ebe9539 100644 --- a/factory/messages.py +++ b/factory/messages.py @@ -37,8 +37,8 @@ def write_message(project_path: Path, text: str) -> Message: msg_dir = _messages_dir(project_path) msg_dir.mkdir(parents=True, exist_ok=True) - msg_id = datetime.now(timezone.utc).strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8] ts = datetime.now(timezone.utc) + msg_id = ts.strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8] path = msg_dir / f"{msg_id}.md" path.write_text(f"timestamp: {ts.isoformat()}\n\n{text}\n") @@ -56,8 +56,6 @@ def read_pending(project_path: Path) -> list[Message]: messages: list[Message] = [] for path in sorted(msg_dir.glob("*.md")): - if path.parent.name == "read": - continue content = path.read_text() lines = content.split("\n", 2) ts = datetime.now(timezone.utc) diff --git a/tests/test_messages.py b/tests/test_messages.py index aaf4392..daf6e61 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -149,7 +149,7 @@ def test_build_ceo_task_no_messages(self, tmp_path: Path) -> None: task = _build_ceo_task(project, "improve") assert "User Messages" not in task - def test_messages_marked_read_after_injection(self, tmp_path: Path) -> None: + def test_build_ceo_task_does_not_mark_read(self, tmp_path: Path) -> None: from factory.cli import _build_ceo_task project = tmp_path / "proj" @@ -159,4 +159,24 @@ def test_messages_marked_read_after_injection(self, tmp_path: Path) -> None: assert len(read_pending(project)) == 1 _build_ceo_task(project, "improve") - assert len(read_pending(project)) == 0 + assert len(read_pending(project)) == 1 + + +class TestMessageValidation: + def test_cmd_message_rejects_nonexistent_path(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + args = Namespace(path=str(tmp_path / "nonexistent"), text="hello") + assert cmd_message(args) == 1 + + def test_cmd_message_rejects_non_factory_project(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + args = Namespace(path=str(project), text="hello") + assert cmd_message(args) == 1 From c5afd95e101af25388a3842d9577186e7e5a6116 Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Thu, 7 May 2026 16:49:34 -0400 Subject: [PATCH 3/7] fix: only mark messages read on CEO success, document interactive limitation MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit 1. mark_read only fires when code == 0 (headless + _run_single_cycle) 2. Interactive mode: mark before exec with comment explaining why 3. shutil.move → Path.rename (atomic on Unix, same filesystem) Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 11 ++++++++--- factory/messages.py | 3 +-- 2 files changed, 9 insertions(+), 5 deletions(-) diff --git a/factory/cli.py b/factory/cli.py index 9779f34..8c34f18 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -1418,9 +1418,10 @@ def cmd_ceo(args: argparse.Namespace) -> int: model=model, timeout=7200.0, )) - if pending_ids: - mark_read(project_path, pending_ids) print(result) + if code == 0: + if pending_ids: + mark_read(project_path, pending_ids) if code != 0: return code return _chain_modes( @@ -1430,7 +1431,10 @@ def cmd_ceo(args: argparse.Namespace) -> int: model=model, no_github=no_github, ) - # Interactive foreground mode: use runner's interactive_exec + # Interactive foreground mode: use runner's interactive_exec. + # Mark read before exec — interactive_exec replaces the process via os.execvp + # so there's no post-execution hook. If the session fails to launch, messages + # are lost. This is accepted: the user is at the terminal and can re-send. if pending_ids: mark_read(project_path, pending_ids) prompt = resolve_prompt("ceo", project_path) @@ -2066,6 +2070,7 @@ def _run_single_cycle( if pending_ids: mark_read(project_path, pending_ids) + print(result) return code diff --git a/factory/messages.py b/factory/messages.py index ebe9539..9cf0a80 100644 --- a/factory/messages.py +++ b/factory/messages.py @@ -6,7 +6,6 @@ from __future__ import annotations -import shutil import uuid from dataclasses import dataclass from datetime import datetime, timezone @@ -79,5 +78,5 @@ def mark_read(project_path: Path, message_ids: list[str]) -> None: for msg_id in message_ids: src = msg_dir / f"{msg_id}.md" if src.exists(): - shutil.move(str(src), str(read_dir / src.name)) + src.rename(read_dir / src.name) log.debug("message_marked_read", id=msg_id) From c28c2e3275bae632a3ab9647ff5ce3f7ddb4474f Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Fri, 8 May 2026 10:02:49 -0400 Subject: [PATCH 4/7] =?UTF-8?q?fix:=20eliminate=20TOCTOU=20=E2=80=94=20rea?= =?UTF-8?q?d=20pending=20once,=20pass=20messages=20to=20=5Fbuild=5Fceo=5Ft?= =?UTF-8?q?ask?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Read pending messages once at the call site, capture IDs from the same list, and pass messages as a parameter to _build_ceo_task. Prevents a message written between task construction and ID capture from being marked read without injection. Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 24 +++++++++++++----------- tests/test_messages.py | 10 ++++++---- 2 files changed, 19 insertions(+), 15 deletions(-) diff --git a/factory/cli.py b/factory/cli.py index 8c34f18..80cfd35 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -1390,6 +1390,9 @@ def cmd_ceo(args: argparse.Namespace) -> int: from factory.messages import mark_read, read_pending + pending = read_pending(project_path) + pending_ids = [m.id for m in pending] + ceo_mode = "build" if mode == "interactive" or research_ideation else mode task = _build_ceo_task( project_path, ceo_mode, context, focus=focus, prompt_file=prompt_file, @@ -1397,10 +1400,9 @@ def cmd_ceo(args: argparse.Namespace) -> int: discover_only=discover_only, no_github=no_github, interactive_idea=interactive_idea, research_ideation=research_ideation, + messages=pending, ) - pending_ids = [m.id for m in read_pending(project_path)] - standup = _run_standup(project_path, ceo_mode, model=model) if standup: task += f"\n\n## Sprint Standup\n\n{standup}" @@ -1843,17 +1845,15 @@ def _build_ceo_task( no_github: bool = False, interactive_idea: str | None = None, research_ideation: str | None = None, + messages: list | None = None, ) -> str: """Build the CEO agent task string from mode and optional context.""" - from factory.messages import read_pending - task = f"Project: {project_path}\nMode: {mode}" - pending = read_pending(project_path) - if pending: + if messages: task += "\n\n## User Messages\n" task += "The user has sent the following directives. Treat these as HIGH PRIORITY:\n\n" - for msg in pending: + for msg in messages: ts = msg.timestamp.strftime("%Y-%m-%d %H:%M:%S") task += f"**[{ts}]** {msg.text}\n\n" @@ -2043,20 +2043,22 @@ def _run_single_cycle( from factory.study import add_backlog_item add_backlog_item(project_path, focus) + from factory.messages import mark_read, read_pending + + pending = read_pending(project_path) + pending_ids = [m.id for m in pending] + task = _build_ceo_task( project_path, mode, context, focus=focus, prompt_file=prompt_file, min_growth=min_growth, max_new=max_new, branch=branch, discover_only=discover_only, no_github=no_github, + messages=pending, ) standup = _run_standup(project_path, mode, model=model) if standup: task += f"\n\n## Sprint Standup\n\n{standup}" - from factory.messages import mark_read, read_pending - - pending_ids = [m.id for m in read_pending(project_path)] - result, code = _run(invoke_agent( "ceo", task, diff --git a/tests/test_messages.py b/tests/test_messages.py index daf6e61..d25e29c 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -128,7 +128,7 @@ def test_message_subcommand_parsing(self) -> None: class TestMessageInjection: - def test_build_ceo_task_includes_pending_messages(self, tmp_path: Path) -> None: + def test_build_ceo_task_includes_messages(self, tmp_path: Path) -> None: from factory.cli import _build_ceo_task project = tmp_path / "proj" @@ -136,7 +136,8 @@ def test_build_ceo_task_includes_pending_messages(self, tmp_path: Path) -> None: (project / ".factory").mkdir() write_message(project, "fix quality gates first") - task = _build_ceo_task(project, "improve") + pending = read_pending(project) + task = _build_ceo_task(project, "improve", messages=pending) assert "User Messages" in task assert "fix quality gates first" in task assert "HIGH PRIORITY" in task @@ -156,9 +157,10 @@ def test_build_ceo_task_does_not_mark_read(self, tmp_path: Path) -> None: project.mkdir() (project / ".factory").mkdir() write_message(project, "test message") - assert len(read_pending(project)) == 1 + pending = read_pending(project) + assert len(pending) == 1 - _build_ceo_task(project, "improve") + _build_ceo_task(project, "improve", messages=pending) assert len(read_pending(project)) == 1 From 5a9ebcdc2f09f4109ef967fa8c334a885b1c97b9 Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Fri, 8 May 2026 11:23:56 -0400 Subject: [PATCH 5/7] fix: type hint list -> list[Message] with TYPE_CHECKING guard Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 6 +++++- 1 file changed, 5 insertions(+), 1 deletion(-) diff --git a/factory/cli.py b/factory/cli.py index 80cfd35..95f33d0 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -15,6 +15,10 @@ import time from datetime import datetime from pathlib import Path +from typing import TYPE_CHECKING + +if TYPE_CHECKING: + from factory.messages import Message def _run(coro): # noqa: ANN001, ANN202 @@ -1845,7 +1849,7 @@ def _build_ceo_task( no_github: bool = False, interactive_idea: str | None = None, research_ideation: str | None = None, - messages: list | None = None, + messages: list[Message] | None = None, ) -> str: """Build the CEO agent task string from mode and optional context.""" task = f"Project: {project_path}\nMode: {mode}" From d77ca71b0f06ec8c08a38e926b24ab5df9766f52 Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Fri, 8 May 2026 13:13:35 -0400 Subject: [PATCH 6/7] fix: cap pending messages at 20 count / 50k chars to prevent flooding Warns when messages are dropped due to caps. Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/messages.py | 29 ++++++++++++++++++++++++++--- 1 file changed, 26 insertions(+), 3 deletions(-) diff --git a/factory/messages.py b/factory/messages.py index 9cf0a80..ff7a271 100644 --- a/factory/messages.py +++ b/factory/messages.py @@ -47,14 +47,33 @@ def write_message(project_path: Path, text: str) -> Message: return msg -def read_pending(project_path: Path) -> list[Message]: - """Read all pending (unread) messages, ordered by timestamp.""" +MAX_PENDING_MESSAGES = 20 +MAX_TOTAL_CHARS = 50_000 + + +def read_pending( + project_path: Path, + max_messages: int = MAX_PENDING_MESSAGES, + max_chars: int = MAX_TOTAL_CHARS, +) -> list[Message]: + """Read pending (unread) messages, ordered by timestamp. + + Caps at ``max_messages`` messages or ``max_chars`` total characters + to prevent flooding the CEO task string. + """ msg_dir = _messages_dir(project_path) if not msg_dir.exists(): return [] + all_paths = sorted(msg_dir.glob("*.md")) + if len(all_paths) > max_messages: + log.warning("messages_capped", total=len(all_paths), cap=max_messages) + messages: list[Message] = [] - for path in sorted(msg_dir.glob("*.md")): + total_chars = 0 + for path in all_paths: + if len(messages) >= max_messages: + break content = path.read_text() lines = content.split("\n", 2) ts = datetime.now(timezone.utc) @@ -64,6 +83,10 @@ def read_pending(project_path: Path) -> list[Message]: except ValueError: pass text = lines[2].strip() if len(lines) > 2 else content.strip() + if total_chars + len(text) > max_chars and messages: + log.warning("messages_chars_capped", total_chars=total_chars + len(text), cap=max_chars) + break + total_chars += len(text) messages.append(Message(id=path.stem, timestamp=ts, text=text)) return messages From 3204dac79d5043d27b32b1b7a742307b6e896edc Mon Sep 17 00:00:00 2001 From: Oleg S <97077423+RobotSail@users.noreply.github.com> Date: Fri, 8 May 2026 13:39:48 -0400 Subject: [PATCH 7/7] =?UTF-8?q?fix:=20address=20round=201=20review=20?= =?UTF-8?q?=E2=80=94=20validation,=20security=20hardening,=20test=20covera?= =?UTF-8?q?ge?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Round 1 findings from 3 independent reviewers addressed: - Frozen dataclass (Message is now immutable) - Input validation: empty text rejection, MAX_MESSAGE_CHARS=10k, rate limiting - Path containment in mark_read (resolve + is_relative_to) - Symlink checks in read_pending and mark_read (before resolve) - Timestamp parse failure logging - cmd_message: docstring, try/except ValueError, stderr for interactive IDs - PEP 8: removed double blank line in _run_single_cycle - 10 new tests: caps, validation, malformed files, edge cases (25 total) Co-Authored-By: Claude Opus 4.6 (1M context) --- factory/cli.py | 15 ++++- factory/messages.py | 38 +++++++++-- tests/test_messages.py | 143 ++++++++++++++++++++++++++++++++++++++++- 3 files changed, 186 insertions(+), 10 deletions(-) diff --git a/factory/cli.py b/factory/cli.py index 95f33d0..7208e45 100644 --- a/factory/cli.py +++ b/factory/cli.py @@ -319,6 +319,7 @@ def cmd_finalize(args: argparse.Namespace) -> int: def cmd_message(args: argparse.Namespace) -> int: + """Queue a message for the CEO agent.""" from factory.messages import write_message project_path = Path(args.path) @@ -328,7 +329,14 @@ def cmd_message(args: argparse.Namespace) -> int: if not (project_path / ".factory").exists(): print(f"Error: not a factory project (no .factory/ directory): {project_path}", file=sys.stderr) return 1 - msg = write_message(project_path, args.text) + if not args.text or not args.text.strip(): + print("Error: message text must not be empty.", file=sys.stderr) + return 1 + try: + msg = write_message(project_path, args.text) + except ValueError as exc: + print(f"Error: {exc}", file=sys.stderr) + return 1 print(f"Message queued (id={msg.id}). The CEO will see it at the start of the next cycle.") return 0 @@ -1442,6 +1450,10 @@ def cmd_ceo(args: argparse.Namespace) -> int: # so there's no post-execution hook. If the session fails to launch, messages # are lost. This is accepted: the user is at the terminal and can re-send. if pending_ids: + print( + f"Consuming {len(pending_ids)} message(s): {', '.join(pending_ids)}", + file=sys.stderr, + ) mark_read(project_path, pending_ids) prompt = resolve_prompt("ceo", project_path) runner = get_runner(runner_name) @@ -2076,7 +2088,6 @@ def _run_single_cycle( if pending_ids: mark_read(project_path, pending_ids) - print(result) return code diff --git a/factory/messages.py b/factory/messages.py index ff7a271..e1b1489 100644 --- a/factory/messages.py +++ b/factory/messages.py @@ -15,8 +15,12 @@ log = structlog.get_logger() +MAX_MESSAGE_CHARS = 10_000 +MAX_PENDING_MESSAGES = 20 +MAX_TOTAL_CHARS = 50_000 + -@dataclass +@dataclass(frozen=True) class Message: id: str timestamp: datetime @@ -33,9 +37,24 @@ def _read_dir(project_path: Path) -> Path: def write_message(project_path: Path, text: str) -> Message: """Write a message to the pending queue. Returns the created Message.""" + if not text or not text.strip(): + raise ValueError("Message text must not be empty or whitespace-only.") + if len(text) > MAX_MESSAGE_CHARS: + raise ValueError( + f"Message text exceeds maximum length of {MAX_MESSAGE_CHARS} characters " + f"(got {len(text)})." + ) + msg_dir = _messages_dir(project_path) msg_dir.mkdir(parents=True, exist_ok=True) + existing = list(msg_dir.glob("*.md")) + if len(existing) >= MAX_PENDING_MESSAGES: + raise ValueError( + f"Too many pending messages ({len(existing)} >= {MAX_PENDING_MESSAGES}). " + "Wait for existing messages to be consumed before sending more." + ) + ts = datetime.now(timezone.utc) msg_id = ts.strftime("%Y%m%dT%H%M%S%f") + "-" + uuid.uuid4().hex[:8] @@ -47,10 +66,6 @@ def write_message(project_path: Path, text: str) -> Message: return msg -MAX_PENDING_MESSAGES = 20 -MAX_TOTAL_CHARS = 50_000 - - def read_pending( project_path: Path, max_messages: int = MAX_PENDING_MESSAGES, @@ -74,6 +89,8 @@ def read_pending( for path in all_paths: if len(messages) >= max_messages: break + if path.is_symlink(): + continue content = path.read_text() lines = content.split("\n", 2) ts = datetime.now(timezone.utc) @@ -81,7 +98,7 @@ def read_pending( try: ts = datetime.fromisoformat(lines[0].split(":", 1)[1].strip()) except ValueError: - pass + log.warning("message_timestamp_parse_failed", path=str(path)) text = lines[2].strip() if len(lines) > 2 else content.strip() if total_chars + len(text) > max_chars and messages: log.warning("messages_chars_capped", total_chars=total_chars + len(text), cap=max_chars) @@ -99,7 +116,14 @@ def mark_read(project_path: Path, message_ids: list[str]) -> None: read_dir.mkdir(parents=True, exist_ok=True) for msg_id in message_ids: - src = msg_dir / f"{msg_id}.md" + raw = msg_dir / f"{msg_id}.md" + if raw.is_symlink(): + log.warning("mark_read_symlink_skipped", id=msg_id) + continue + src = raw.resolve() + if not src.is_relative_to(msg_dir.resolve()): + log.warning("mark_read_path_traversal", id=msg_id) + continue if src.exists(): src.rename(read_dir / src.name) log.debug("message_marked_read", id=msg_id) diff --git a/tests/test_messages.py b/tests/test_messages.py index d25e29c..9aeaa0c 100644 --- a/tests/test_messages.py +++ b/tests/test_messages.py @@ -2,7 +2,16 @@ from pathlib import Path -from factory.messages import mark_read, read_pending, write_message +import pytest + +from factory.messages import ( + MAX_MESSAGE_CHARS, + MAX_PENDING_MESSAGES, + MAX_TOTAL_CHARS, + mark_read, + read_pending, + write_message, +) class TestWriteMessage: @@ -182,3 +191,135 @@ def test_cmd_message_rejects_non_factory_project(self, tmp_path: Path) -> None: project.mkdir() args = Namespace(path=str(project), text="hello") assert cmd_message(args) == 1 + + def test_cmd_message_rejects_empty_text(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text="") + assert cmd_message(args) == 1 + + def test_cmd_message_rejects_whitespace_text(self, tmp_path: Path) -> None: + from argparse import Namespace + + from factory.cli import cmd_message + + project = tmp_path / "proj" + project.mkdir() + (project / ".factory").mkdir() + args = Namespace(path=str(project), text=" \n ") + assert cmd_message(args) == 1 + + +class TestReadPendingCaps: + def test_read_pending_caps_at_max_messages(self, tmp_path: Path) -> None: + """Writing 25 messages, read_pending returns only 20 (MAX_PENDING_MESSAGES).""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Write 25 messages directly to bypass rate limiting + for i in range(25): + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\nhello {i}\n" + ) + + pending = read_pending(project) + assert len(pending) == MAX_PENDING_MESSAGES # 20 + + def test_read_pending_truncates_at_max_total_chars(self, tmp_path: Path) -> None: + """Messages exceeding MAX_TOTAL_CHARS are truncated.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Each message is ~10k chars; at 50k cap we should get ~5 + for i in range(10): + text = "x" * 10_000 + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\n{text}\n" + ) + + pending = read_pending(project) + total = sum(len(m.text) for m in pending) + assert total <= MAX_TOTAL_CHARS + assert len(pending) < 10 + + def test_single_large_message_bypasses_char_cap(self, tmp_path: Path) -> None: + """First message is always included even if it alone exceeds max_chars.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Single message larger than MAX_TOTAL_CHARS — the `and messages` guard + # means the first message always passes through. + big_text = "y" * (MAX_TOTAL_CHARS + 1_000) + (msg_dir / "20260504T120000000000-aaaaaaaa.md").write_text( + f"timestamp: 2026-05-04T12:00:00+00:00\n\n{big_text}\n" + ) + + pending = read_pending(project) + assert len(pending) == 1 + assert len(pending[0].text) > MAX_TOTAL_CHARS + + +class TestMessageSizeValidation: + def test_write_rejects_empty_text(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="empty"): + write_message(project, "") + + def test_write_rejects_whitespace_only_text(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="empty"): + write_message(project, " \n\t ") + + def test_write_rejects_oversized_message(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + with pytest.raises(ValueError, match="exceeds maximum"): + write_message(project, "x" * (MAX_MESSAGE_CHARS + 1)) + + +class TestRateLimiting: + def test_write_rejects_when_too_many_pending(self, tmp_path: Path) -> None: + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + # Create MAX_PENDING_MESSAGES files directly + for i in range(MAX_PENDING_MESSAGES): + (msg_dir / f"20260504T120000{i:06d}-abcd{i:04d}.md").write_text( + f"timestamp: 2026-05-04T12:00:00.{i:06d}+00:00\n\nmsg {i}\n" + ) + + with pytest.raises(ValueError, match="Too many pending"): + write_message(project, "one more") + + +class TestMalformedMessages: + def test_malformed_file_no_timestamp_header(self, tmp_path: Path) -> None: + """A message file without a valid timestamp header is still read gracefully.""" + project = tmp_path / "proj" + project.mkdir() + msg_dir = project / ".factory" / "messages" + msg_dir.mkdir(parents=True) + + (msg_dir / "20260504T120000000000-badbadbad.md").write_text( + "no timestamp here\n\njust some text\n" + ) + + pending = read_pending(project) + assert len(pending) == 1 + # The text should still be extracted from line index 2 + assert pending[0].text == "just some text"