diff --git a/cookoff/FIXES.md b/cookoff/FIXES.md
new file mode 100644
index 0000000..43b003c
--- /dev/null
+++ b/cookoff/FIXES.md
@@ -0,0 +1,159 @@
+# FIXES.md — Cosmos Cookoff Training Record
+
+Labeled training tuples from the 2026-03-05 sorting controller rebuild session.
+Each entry captures: **(observation, state, root cause, fix)** for future Layer 0 codification.
+
+---
+
+## Fix 1: Controller crashes on startup — missing FactoryIOClient
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Factory I/O scene running but no sorting happening; Python traceback in terminal |
+| **System state** | `sorting_controller.py` fails to import — `FactoryIOClient` undefined |
+| **Root cause** | Original controller split across multiple files; monolithic rewrite needed single-file `FactoryIOClient` with Web API HTTP client |
+| **Fix applied** | Rebuilt `sim/sorting_controller.py` from scratch — embedded `FactoryIOClient` class with httpx, id-based tag lookup, name→UUID mapping |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 2: Tag writes silently fail — wrong API contract
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Conveyor entry light stays OFF; boxes never move despite state machine showing RUNNING |
+| **System state** | `write_tag()` returns 200 but Factory I/O ignores the values |
+| **Root cause** | Factory I/O Web API expects `[{id, value}]` with UUIDs, not `{name: value}` dicts. Previous code used name-based writes. |
+| **Fix applied** | Rewrote `FactoryIOClient.write_tag()` to use UUID lookup via `_name_to_id` map; PUT to `/api/tag/values` with `[{"id": uuid, "value": val}]` |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 3: Height sensors never trigger — input/output UUID collision
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Boxes reach sort zone but always go right (default); never sorted left for tall boxes |
+| **System state** | `High sensor` and `Low sensor` always read `False` even when box is present |
+| **Root cause** | Factory I/O has duplicate tag names for Input and Output. `_name_to_id` was storing the Output UUID, overwriting the Input UUID. Reads need the Input UUID. |
+| **Fix applied** | Separated lookups: `_name_to_id` prefers Output (for writes), added `_input_name_to_id` for Input tags. `read_all_values()` returns Input values for sensor tags. |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 4: State machine stuck in SETTLING — settle counter never advances
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Box arrives at pallet sensor, conveyor stops, but nothing happens — box sits forever |
+| **System state** | State = SETTLING, `settle_count` stays at 0 each scan |
+| **Root cause** | `settle_count` was being reset to 0 at the top of `tick()` instead of being preserved across scans. Instance variable was shadowed by local variable. |
+| **Fix applied** | Made `settle_count` a proper instance variable (`self._settle_count`) initialized in `__init__`, incremented in SETTLING state, reset on state transitions |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 5: Transfer arms don't activate — wrong tag names in write map
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Box sorted correctly (state goes to SORTING_LEFT) but no physical arm movement; box stays on conveyor |
+| **System state** | State = SORTING_LEFT, but `Transf. left` Output tag stays False |
+| **Root cause** | Tag name mismatch: code used `"Transfer left"` but Factory I/O scene uses `"Transf. left"` (abbreviated) |
+| **Fix applied** | Updated all tag name references to match exact Factory I/O "Sorting by Height (Basic)" scene names: `"Transf. left"`, `"Transf. right"` |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 6: Watchdog timer fires during normal sort — too short timeout
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Box is mid-transfer on left arm, then suddenly arm retracts and state jumps to RUNNING |
+| **System state** | `WATCHDOG_SCANS=50` fires before transfer completes (~2.5s at 20ms scan) |
+| **Root cause** | Transfer arms in Factory I/O take 3-4 seconds to complete a full push. 50 scans at 20ms = 1s was too short. |
+| **Fix applied** | Increased `WATCHDOG_SCANS` from 50 to 150 (3 seconds at 20ms scan rate), giving arms time to complete |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 7: Emergency stop doesn't halt — inverted logic
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Pressing E-stop button in Factory I/O has no effect; conveyor keeps running |
+| **System state** | `Emergency stop` tag reads `True` when NOT pressed (NC contact), `False` when pressed |
+| **Root cause** | E-stop in Factory I/O is normally-closed (NC). Code was checking `if emergency_stop == True` to trigger ESTOP, but True means safe. |
+| **Fix applied** | Inverted the check: `if not tags["Emergency stop"]:` triggers ESTOP state. Added comment documenting NC logic. |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 8: Fault injector can't connect — missing force/release endpoints
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | `fault_injector.py --fault jam` exits with "Cannot force tag" error |
+| **System state** | `FactoryIOClient` had no `force_tag()` or `release_tag()` methods |
+| **Root cause** | Original rebuild focused on read/write only. Force/release are separate Factory I/O endpoints: PUT `/api/tag/values-force` and PUT `/api/tag/values-release`. |
+| **Fix applied** | Added `force_tag(name, value)` → PUT `[{id, value}]` to `/api/tag/values-force` and `release_tag(name)` → PUT `["id"]` to `/api/tag/values-release` |
+| **Files changed** | `sim/sorting_controller.py` |
+| **Outcome** | verified working |
+
+---
+
+## Fix 9: diagnosis_engine.py Windows encoding crash on R2 Unicode output
+
+| Field | Value |
+|-------|-------|
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff setup |
+| **Visual observation** | Cosmos R2 returns diagnosis but script crashes with `UnicodeEncodeError` when printing to terminal |
+| **System state** | Windows console (cp1252) can't encode em-dash, bullet, and other Unicode chars in R2's chain-of-thought |
+| **Root cause** | Python defaults to system encoding (cp1252 on Windows) for stdout. R2 generates UTF-8 output with rich Unicode punctuation. |
+| **Fix applied** | Added `sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")` at top of `diagnosis_engine.py` |
+| **Files changed** | `cookoff/diagnosis_engine.py` |
+| **Outcome** | verified working |
+
+---
+
+## Session Summary
+
+| Metric | Value |
+|--------|-------|
+| **Total fixes** | 9 |
+| **Date** | 2026-03-05 |
+| **Session** | Cosmos Cookoff — sorting controller rebuild |
+| **Components** | sorting_controller (7), diagnosis_engine (1), fault_injector (1) |
+| **All verified** | Yes |
+
+### By Component
+
+| Component | Fixes | IDs |
+|-----------|-------|-----|
+| `sim/sorting_controller.py` | 7 | 1, 2, 3, 4, 5, 6, 7 |
+| `sim/fault_injector.py` | 1 | 8 |
+| `cookoff/diagnosis_engine.py` | 1 | 9 |
diff --git a/cookoff/__init__.py b/cookoff/__init__.py
new file mode 100644
index 0000000..d98188b
--- /dev/null
+++ b/cookoff/__init__.py
@@ -0,0 +1 @@
+# cookoff — Cosmos Cookoff demo tools
diff --git a/cookoff/capture_fio.py b/cookoff/capture_fio.py
index 3c6b3ac..2d0c757 100644
--- a/cookoff/capture_fio.py
+++ b/cookoff/capture_fio.py
@@ -8,6 +8,8 @@
python cookoff/capture_fio.py record --duration 15 --label normal
python cookoff/capture_fio.py record --duration 30 --label box_jam
python cookoff/capture_fio.py screenshot --label snapshot_01
+ python cookoff/capture_fio.py session --chunk-duration 8
+ python cookoff/capture_fio.py stop
python cookoff/capture_fio.py auto --scenarios normal,jam,stop --duration 20
Outputs to cookoff/clips/ as labeled MP4/PNG files.
@@ -19,8 +21,11 @@
import time
from pathlib import Path
+import cv2
import mss
import mss.tools
+import numpy as np
+import pygetwindow as gw
# Add repo root to path for imports
REPO_ROOT = Path(__file__).parent.parent
@@ -32,9 +37,229 @@
TARGET_FPS = 4
FRAME_INTERVAL = 1.0 / TARGET_FPS
+STOP_FILE = CLIPS_DIR / ".stop_recording"
+
+
+def focus_factoryio():
+ """Bring the Factory I/O window to the foreground.
+
+ Returns the window object, or None if the window is not found.
+ """
+ try:
+ windows = gw.getWindowsWithTitle("Factory IO")
+ if not windows:
+ # Try alternate title with hyphen
+ windows = gw.getWindowsWithTitle("Factory I/O")
+ if not windows:
+ print("WARNING: Factory I/O window not found -- capturing whatever is on screen")
+ return None
+ win = windows[0]
+ if win.isMinimized:
+ win.restore()
+ time.sleep(0.3)
+ win.activate()
+ time.sleep(0.5)
+ return win
+ except Exception as e:
+ print(f"WARNING: Could not focus Factory I/O window: {e}")
+ return None
+
+
+def capture_clip(
+ duration_seconds: float = 8,
+ fps: int = TARGET_FPS,
+ label: str = "clip",
+ monitor_index: int = 1,
+) -> Path:
+ """Record a short MP4 clip of Factory I/O for Cosmos R2.
+
+ Uses OpenCV VideoWriter (mp4v codec) for direct frame-to-MP4 encoding
+ without needing ffmpeg.
+ """
+ focus_factoryio()
+
+ timestamp = time.strftime("%Y%m%d_%H%M%S")
+ output_path = CLIPS_DIR / f"{label}_{timestamp}.mp4"
+
+ with mss.mss() as sct:
+ monitor = sct.monitors[monitor_index]
+ width, height = monitor["width"], monitor["height"]
+
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
+ out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height))
+
+ frame_count = int(duration_seconds * fps)
+ interval = 1.0 / fps
+
+ print(f"Recording: {duration_seconds}s at {fps} FPS ({frame_count} frames, {width}x{height})")
+
+ for i in range(frame_count):
+ t0 = time.perf_counter()
+ img = np.array(sct.grab(monitor))
+ frame = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
+ out.write(frame)
+
+ if i % fps == 0:
+ elapsed_s = i / fps
+ print(f" {elapsed_s:.0f}s / {duration_seconds:.0f}s ({i}/{frame_count} frames)")
+
+ elapsed = time.perf_counter() - t0
+ time.sleep(max(0, interval - elapsed))
+
+ out.release()
+
+ file_size = output_path.stat().st_size / (1024 * 1024)
+ print(f"Clip saved: {output_path} ({duration_seconds}s @ {fps}fps, {file_size:.1f} MB)")
+ return output_path
+
+
+def record_session(
+ fps: int = TARGET_FPS,
+ label: str = "session",
+ monitor_index: int = 1,
+ chunk_duration: float = 8,
+) -> list[Path]:
+ """Record Factory IO as rolling 8s chunks until stop file appears.
+
+ Each chunk is a complete MP4 -- crash-safe, no data loss.
+ Stop by creating: cookoff/clips/.stop_recording
+ """
+ # Clean up any stale stop file
+ STOP_FILE.unlink(missing_ok=True)
+
+ focus_factoryio()
+
+ timestamp = time.strftime("%Y%m%d_%H%M%S")
+ session_dir = CLIPS_DIR / f"{label}_{timestamp}"
+ session_dir.mkdir(parents=True, exist_ok=True)
+
+ frames_per_chunk = int(chunk_duration * fps)
+ interval = 1.0 / fps
+ chunk_paths: list[Path] = []
+ chunk_idx = 0
+
+ with mss.mss() as sct:
+ monitor = sct.monitors[monitor_index]
+ width, height = monitor["width"], monitor["height"]
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
+
+ print(f"Recording session: {width}x{height} @ {fps} FPS, {chunk_duration}s chunks")
+ print(f"Output: {session_dir}")
+ print(f"To stop: create {STOP_FILE}\n")
+
+ try:
+ while not STOP_FILE.exists():
+ # Start new chunk
+ chunk_path = session_dir / f"chunk_{chunk_idx:03d}.mp4"
+ out = cv2.VideoWriter(str(chunk_path), fourcc, fps, (width, height))
+
+ for frame_i in range(frames_per_chunk):
+ t0 = time.perf_counter()
+ img = np.array(sct.grab(monitor))
+ frame = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR)
+ out.write(frame)
+ elapsed = time.perf_counter() - t0
+ time.sleep(max(0, interval - elapsed))
+
+ out.release() # chunk is now a valid MP4
+ chunk_paths.append(chunk_path)
+
+ elapsed_total = (chunk_idx + 1) * chunk_duration
+ size_mb = chunk_path.stat().st_size / (1024 * 1024)
+ print(f" Chunk {chunk_idx}: {elapsed_total:.0f}s ({size_mb:.1f} MB)")
+ chunk_idx += 1
+
+ except KeyboardInterrupt:
+ pass
+
+ # Clean up stop file
+ STOP_FILE.unlink(missing_ok=True)
+
+ total_s = len(chunk_paths) * chunk_duration
+ print(f"\nSession complete: {len(chunk_paths)} chunks, {total_s:.0f}s total")
+ print(f"Chunks in: {session_dir}")
+ return chunk_paths
+
+
+def stop_recording():
+ """Signal a running record_session() to stop."""
+ STOP_FILE.touch()
+ print(f"Stop signal sent: {STOP_FILE}")
+
+
+def chunk_session(video_path: Path, chunk_duration: float = 8, output_dir: Path | None = None) -> list[dict]:
+ """Split an MP4 into fixed-length chunks using OpenCV (no ffmpeg needed).
+
+ Returns list of chunk metadata dicts with keys:
+ chunk_file, start_time, end_time, duration
+ """
+ cap = cv2.VideoCapture(str(video_path))
+ if not cap.isOpened():
+ print(f"ERROR: Cannot open {video_path}")
+ return []
+
+ fps = cap.get(cv2.CAP_PROP_FPS) or TARGET_FPS
+ total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT))
+ width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH))
+ height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT))
+ total_duration = total_frames / fps
+
+ if output_dir is None:
+ output_dir = CLIPS_DIR / "chunks"
+ output_dir.mkdir(parents=True, exist_ok=True)
+
+ fourcc = cv2.VideoWriter_fourcc(*"mp4v")
+ frames_per_chunk = int(chunk_duration * fps)
+ chunks = []
+ chunk_idx = 0
+ frame_num = 0
+
+ print(f"Chunking {video_path.name}: {total_duration:.1f}s total, {chunk_duration}s per chunk, {fps} FPS")
+
+ while True:
+ start_time = chunk_idx * chunk_duration
+ if start_time >= total_duration:
+ break
+
+ # Skip tiny tail chunks (< 3s)
+ remaining = total_duration - start_time
+ if remaining < 3:
+ break
+
+ chunk_path = output_dir / f"{video_path.stem}_chunk{chunk_idx:03d}.mp4"
+ out = cv2.VideoWriter(str(chunk_path), fourcc, fps, (width, height))
+
+ frames_written = 0
+ for _ in range(frames_per_chunk):
+ ret, frame = cap.read()
+ if not ret:
+ break
+ out.write(frame)
+ frames_written += 1
+ frame_num += 1
+
+ out.release()
+
+ if frames_written > 0:
+ end_time = start_time + (frames_written / fps)
+ chunks.append({
+ "chunk_file": str(chunk_path),
+ "start_time": round(start_time, 2),
+ "end_time": round(end_time, 2),
+ "duration": round(frames_written / fps, 2),
+ })
+ print(f" Chunk {chunk_idx+1}: {start_time:.0f}s-{end_time:.0f}s ({frames_written} frames)")
+
+ chunk_idx += 1
+
+ cap.release()
+ print(f"Done: {len(chunks)} chunks created in {output_dir}")
+ return chunks
+
def capture_screenshot(label: str = "screenshot", monitor_index: int = 1) -> Path:
"""Capture a single screenshot and save as PNG."""
+ focus_factoryio()
timestamp = time.strftime("%Y%m%d_%H%M%S")
filename = f"{label}_{timestamp}.png"
output_path = CLIPS_DIR / filename
@@ -151,6 +376,16 @@ def main():
rec.add_argument("--fps", type=int, default=TARGET_FPS, help="Frames per second")
rec.add_argument("--monitor", type=int, default=1, help="Monitor index (1=primary)")
+ # Session command (rolling chunks until stop file)
+ sess = subparsers.add_parser("session", help="Record rolling chunks until stop file")
+ sess.add_argument("--label", default="session", help="Label for the session")
+ sess.add_argument("--fps", type=int, default=TARGET_FPS, help="Frames per second")
+ sess.add_argument("--monitor", type=int, default=1, help="Monitor index (1=primary)")
+ sess.add_argument("--chunk-duration", type=float, default=8, help="Seconds per chunk")
+
+ # Stop command
+ subparsers.add_parser("stop", help="Signal a running session to stop")
+
# Auto command (multiple scenarios)
auto = subparsers.add_parser("auto", help="Record multiple scenarios")
auto.add_argument("--scenarios", default="normal,box_jam,conveyor_stop",
@@ -163,6 +398,10 @@ def main():
capture_screenshot(label=args.label, monitor_index=args.monitor)
elif args.command == "record":
record_clip(duration=args.duration, label=args.label, fps=args.fps, monitor_index=args.monitor)
+ elif args.command == "session":
+ record_session(fps=args.fps, label=args.label, monitor_index=args.monitor, chunk_duration=args.chunk_duration)
+ elif args.command == "stop":
+ stop_recording()
elif args.command == "auto":
scenarios = [s.strip() for s in args.scenarios.split(",")]
record_scenarios(scenarios, duration=args.duration)
diff --git a/cookoff/diagnosis_engine.py b/cookoff/diagnosis_engine.py
index 528a165..2441d43 100644
--- a/cookoff/diagnosis_engine.py
+++ b/cookoff/diagnosis_engine.py
@@ -33,8 +33,13 @@
import time
# Fix Windows console encoding for Unicode output from R2
-sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
-sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
+if sys.platform == "win32" and hasattr(sys.stdout, "buffer"):
+ if getattr(sys.stdout, "encoding", "").lower() != "utf-8":
+ try:
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+ sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
+ except (AttributeError, ValueError):
+ pass
from pathlib import Path
from typing import Optional
diff --git a/cookoff/prompts/factory_diagnosis.yaml b/cookoff/prompts/factory_diagnosis.yaml
index 99cca93..fe865fd 100644
--- a/cookoff/prompts/factory_diagnosis.yaml
+++ b/cookoff/prompts/factory_diagnosis.yaml
@@ -72,3 +72,29 @@ user_describe: |
Your observations about the physical scene.
Then provide your description.
+
+# User prompt for visual test loop — diagnosis + code prescription
+# Variables: {plc_registers}, {fault_analysis}, {code_snippet}
+user_prescribe: |
+ ## Live PLC Register Data
+ {plc_registers}
+
+ ## Automated Fault Analysis
+ {fault_analysis}
+
+ ## Current Controller Code (sorting state machine tick method)
+ ```python
+ {code_snippet}
+ ```
+
+ ## Task
+ You can see the factory simulation AND the controller code that drives it.
+ 1. Describe what you SEE happening (boxes moving, piling up, stopped, etc.)
+ 2. Cross-reference with PLC tags to identify the fault
+ 3. State your DIAGNOSIS: what is wrong and why
+ 4. Give a PRESCRIPTION: the exact code change needed (function, variable, logic)
+ 5. Rate confidence: HIGH / MEDIUM / LOW
+
+ Reason step-by-step about visual observations, tag values, and code logic.
+
+ Then provide your diagnosis and prescription clearly.
diff --git a/cookoff/visual_test_loop.py b/cookoff/visual_test_loop.py
new file mode 100644
index 0000000..9f2d37d
--- /dev/null
+++ b/cookoff/visual_test_loop.py
@@ -0,0 +1,673 @@
+#!/usr/bin/env python3
+"""
+FactoryLM Visual Test Loop — AI-Driven Fault Diagnosis Demo
+============================================================
+Cosmos R2 watches Factory I/O, diagnoses faults, and prescribes code fixes.
+
+Loop: record clip -> read FIO tags -> send to Cosmos R2 -> display diagnosis
+
+Usage:
+ python cookoff/visual_test_loop.py # Interactive menu
+ python cookoff/visual_test_loop.py --scenario jam # Single scenario
+ python cookoff/visual_test_loop.py --all # Run all scenarios
+ python cookoff/visual_test_loop.py --dry-run # Skip Cosmos call
+ python cookoff/visual_test_loop.py --vllm-url http://... # Custom vLLM endpoint
+ python cookoff/visual_test_loop.py --fio-url http://... # Custom FIO API
+
+Requires:
+ - Factory I/O running with Web API enabled (default :7410)
+ - vLLM serving Cosmos R2 (default localhost:8000)
+"""
+
+import argparse
+import io
+import json
+import os
+import sys
+import time
+from pathlib import Path
+
+# Fix Windows console encoding (skip if already wrapped by diagnosis_engine)
+if sys.platform == "win32" and hasattr(sys.stdout, "buffer"):
+ if getattr(sys.stdout, "encoding", "").lower() != "utf-8":
+ try:
+ sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")
+ sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace")
+ except (AttributeError, ValueError):
+ pass
+
+# Add repo root to path
+REPO_ROOT = Path(__file__).resolve().parent.parent
+sys.path.insert(0, str(REPO_ROOT))
+
+from cookoff.capture_fio import capture_clip, capture_screenshot, chunk_session, record_session, stop_recording
+from cookoff.diagnosis_engine import (
+ diagnose,
+ encode_media,
+ format_fault_analysis,
+ format_plc_registers,
+ load_prompts,
+ VLLM_URL,
+ MODEL_NAME,
+ DEFAULT_TEMPERATURE,
+ DEFAULT_TOP_P,
+ DEFAULT_MAX_TOKENS,
+)
+from sim.sorting_controller import FactoryIOClient
+from sim.fault_injector import FaultInjector, FAULTS
+
+import requests
+import yaml
+
+
+# ---------------------------------------------------------------------------
+# Fault scenarios for the visual test loop
+# ---------------------------------------------------------------------------
+SCENARIOS = {
+ "normal": {
+ "id": None,
+ "note": "Baseline -- healthy operation",
+ "settle_time": 2.0,
+ },
+ "pile_up": {
+ "id": "jam",
+ "note": "Pallet sensor stuck ON -- boxes pile up",
+ "settle_time": 3.0,
+ },
+ "blind": {
+ "id": "sensor",
+ "note": "Height sensors dead -- all boxes route wrong",
+ "settle_time": 2.0,
+ },
+ "estop": {
+ "id": "estop",
+ "note": "Emergency stop engaged -- immediate halt",
+ "settle_time": 1.0,
+ },
+ "stuck_left": {
+ "id": "sorter_stuck_left",
+ "note": "Left transfer arm locked -- all boxes go left",
+ "settle_time": 2.0,
+ },
+ "no_parts": {
+ "id": "emitter_off",
+ "note": "No new parts entering system",
+ "settle_time": 3.0,
+ },
+}
+
+
+# ---------------------------------------------------------------------------
+# Visual Test Loop
+# ---------------------------------------------------------------------------
+class VisualTestLoop:
+ """Three-mode visual testing harness for Cosmos R2 + Factory I/O.
+
+ Modes:
+ 1. Observe-only (--dry-run): capture + tags, no Cosmos call
+ 2. Diagnose: capture + tags + Cosmos R2 diagnosis
+ 3. Prescribe: capture + tags + code snippet + Cosmos R2 diagnosis + fix
+ """
+
+ def __init__(
+ self,
+ vllm_url: str = VLLM_URL,
+ fio_url: str = "http://localhost:7410",
+ controller_path: str = "sim/sorting_controller.py",
+ dry_run: bool = False,
+ obs_clip: str | None = None,
+ clip_duration: float = 8,
+ chunk_duration: float = 8,
+ ):
+ self.vllm_url = vllm_url
+ self.fio_url = fio_url
+ self.controller_path = Path(REPO_ROOT) / controller_path
+ self.dry_run = dry_run
+ self.obs_clip = obs_clip
+ self.clip_duration = clip_duration
+ self.chunk_duration = chunk_duration
+ self.iteration = 0
+ self.results: list[dict] = []
+
+ # Initialize Factory I/O client and fault injector
+ self.fio = FactoryIOClient(base_url=fio_url)
+ self.injector = FaultInjector(self.fio)
+
+ def check_prerequisites(self) -> bool:
+ """Verify Factory I/O is reachable."""
+ if not self.fio.check_connection():
+ print(f"ERROR: Cannot connect to Factory I/O at {self.fio_url}")
+ print("Make sure Factory I/O is running with Web API enabled (Edit > Options > Web API).")
+ return False
+ print(f"Factory I/O connected: {self.fio_url}")
+
+ if not self.dry_run:
+ try:
+ r = requests.get(
+ self.vllm_url.replace("/chat/completions", "/models"),
+ timeout=5,
+ )
+ if r.status_code == 200:
+ print(f"vLLM connected: {self.vllm_url}")
+ else:
+ print(f"WARNING: vLLM returned {r.status_code} -- diagnoses may fail")
+ except requests.ConnectionError:
+ print(f"WARNING: Cannot reach vLLM at {self.vllm_url}")
+ print("Diagnoses will fail. Use --dry-run for capture-only mode.")
+
+ return True
+
+ def run_iteration(self, scenario_name: str) -> dict:
+ """One full observe -> diagnose -> prescribe cycle."""
+ self.iteration += 1
+ scenario = SCENARIOS.get(scenario_name)
+ if not scenario:
+ print(f"ERROR: Unknown scenario '{scenario_name}'. Available: {list(SCENARIOS.keys())}")
+ return {"error": f"Unknown scenario: {scenario_name}"}
+
+ fault_id = scenario["id"]
+ settle_time = scenario.get("settle_time", 2.0)
+
+ print(f"\n{'='*70}")
+ print(f" VISUAL TEST LOOP -- Iteration {self.iteration}")
+ print(f" Scenario: {scenario_name} ({scenario['note']})")
+ print(f"{'='*70}\n")
+
+ # 1. Inject fault (if any)
+ if fault_id:
+ print(f"[1/6] Injecting fault: {fault_id}")
+ self.injector.inject(fault_id)
+ print(f" Settling for {settle_time}s...")
+ time.sleep(settle_time)
+ else:
+ print("[1/6] No fault injection (baseline)")
+
+ # 2. Record clip (or use pre-recorded)
+ if self.obs_clip:
+ print(f"[2/6] Using pre-recorded clip: {self.obs_clip}")
+ frame_path = Path(self.obs_clip)
+ else:
+ print(f"[2/6] Recording {self.clip_duration}s clip...")
+ frame_path = capture_clip(
+ duration_seconds=self.clip_duration,
+ label=f"iter{self.iteration}_{scenario_name}",
+ )
+ print(f" Media: {frame_path}")
+
+ # 3. Read live FIO tags
+ print("[3/6] Reading Factory I/O tags...")
+ tags = self.fio.read_all_values()
+ if tags:
+ # Show compact tag summary
+ bool_tags = {k: v for k, v in tags.items() if isinstance(v, bool)}
+ on_tags = [k for k, v in bool_tags.items() if v]
+ print(f" {len(tags)} tags read. Active: {on_tags or 'none'}")
+ else:
+ print(" WARNING: Could not read tags")
+ tags = {}
+
+ # 4. Extract controller code snippet
+ print("[4/6] Extracting tick() method...")
+ code_snippet = self._extract_tick_method()
+ print(f" {len(code_snippet.splitlines())} lines extracted")
+
+ # 5. Call Cosmos R2 (or skip in dry-run)
+ result = {
+ "iteration": self.iteration,
+ "scenario": scenario_name,
+ "fault_id": fault_id,
+ "note": scenario["note"],
+ "frame_path": str(frame_path),
+ "tags": tags,
+ "code_lines": len(code_snippet.splitlines()),
+ "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
+ }
+
+ if self.dry_run:
+ print("[5/6] DRY RUN -- skipping Cosmos R2 call")
+ result["reasoning"] = "(dry run)"
+ result["diagnosis"] = "(dry run -- no Cosmos R2 call)"
+ result["prescription"] = "(dry run)"
+ result["confidence"] = "N/A"
+ result["elapsed_s"] = 0
+ result["tokens"] = 0
+ else:
+ print("[5/6] Sending to Cosmos R2...")
+ cosmos_result = self._call_cosmos(frame_path, tags, code_snippet, scenario_name)
+ result.update(cosmos_result)
+
+ # 6. Print formatted output
+ print(f"\n[6/6] Results:")
+ self._print_result(result)
+
+ # 7. Clear fault
+ if fault_id:
+ self.injector.clear(fault_id)
+ print(f"\nFault '{fault_id}' cleared.")
+
+ # 8. Log to results
+ self.results.append(result)
+ return result
+
+ def _call_cosmos(self, frame_path: Path, tags: dict, code_snippet: str, scenario_name: str) -> dict:
+ """Send frame + tags + code to Cosmos R2 and parse response."""
+ prompts = load_prompts()
+ system_prompt = prompts["system"]
+
+ # Build the prescribe prompt
+ user_template = prompts.get("user_prescribe", prompts["user_diagnosis"])
+ plc_registers = format_plc_registers(tags) if tags else "No PLC data available"
+ fault_analysis = format_fault_analysis(tags) if tags else "No fault data available"
+
+ try:
+ user_prompt = user_template.format(
+ plc_registers=plc_registers,
+ fault_analysis=fault_analysis,
+ code_snippet=code_snippet,
+ )
+ except KeyError:
+ # Fall back to diagnosis template if prescribe template missing code_snippet
+ user_prompt = prompts["user_diagnosis"].format(
+ plc_registers=plc_registers,
+ fault_analysis=fault_analysis,
+ )
+
+ # Encode frame/clip — encode_media returns correct MIME for .mp4 vs .png
+ b64, mime = encode_media(str(frame_path))
+ if mime.startswith("video"):
+ media_content = {
+ "type": "video_url",
+ "video_url": {"url": f"data:{mime};base64,{b64}"},
+ }
+ else:
+ media_content = {
+ "type": "image_url",
+ "image_url": {"url": f"data:{mime};base64,{b64}"},
+ }
+ user_content = [media_content, {"type": "text", "text": user_prompt}]
+
+ payload = {
+ "model": MODEL_NAME,
+ "messages": [
+ {"role": "system", "content": system_prompt},
+ {"role": "user", "content": user_content},
+ ],
+ "max_tokens": DEFAULT_MAX_TOKENS,
+ "temperature": DEFAULT_TEMPERATURE,
+ "top_p": DEFAULT_TOP_P,
+ }
+
+ # Tell vLLM how to sample video frames
+ if mime.startswith("video"):
+ payload["media_io_kwargs"] = {"video": {"fps": 1.0}}
+
+ t_start = time.time()
+ try:
+ response = requests.post(
+ self.vllm_url,
+ headers={"Content-Type": "application/json"},
+ json=payload,
+ timeout=300,
+ )
+ elapsed = time.time() - t_start
+
+ if response.status_code != 200:
+ return {
+ "error": f"HTTP {response.status_code}: {response.text[:500]}",
+ "elapsed_s": round(elapsed, 1),
+ "reasoning": "",
+ "diagnosis": f"Error: HTTP {response.status_code}",
+ "prescription": "",
+ "confidence": "N/A",
+ "tokens": 0,
+ }
+
+ data = response.json()
+ content = data["choices"][0]["message"]["content"]
+ usage = data.get("usage", {})
+
+ # Parse blocks
+ reasoning = ""
+ diagnosis_text = content
+ if "" in content and "" in content:
+ think_start = content.index("") + len("")
+ think_end = content.index("")
+ reasoning = content[think_start:think_end].strip()
+ diagnosis_text = content[think_end + len(""):].strip()
+
+ # Try to extract confidence from response
+ confidence = "UNKNOWN"
+ for level in ["HIGH", "MEDIUM", "LOW"]:
+ if level in diagnosis_text.upper():
+ confidence = level
+ break
+
+ return {
+ "reasoning": reasoning,
+ "diagnosis": diagnosis_text,
+ "prescription": self._extract_prescription(diagnosis_text),
+ "confidence": confidence,
+ "elapsed_s": round(elapsed, 1),
+ "tokens": usage.get("completion_tokens", 0),
+ "raw_response": content,
+ }
+
+ except requests.ConnectionError as e:
+ return {
+ "error": f"Connection failed: {e}",
+ "elapsed_s": 0,
+ "reasoning": "",
+ "diagnosis": "Error: Cannot connect to vLLM",
+ "prescription": "",
+ "confidence": "N/A",
+ "tokens": 0,
+ }
+
+ def _extract_prescription(self, diagnosis_text: str) -> str:
+ """Extract the PRESCRIPTION section from diagnosis text."""
+ lines = diagnosis_text.splitlines()
+ in_prescription = False
+ prescription_lines = []
+
+ for line in lines:
+ upper = line.strip().upper()
+ if upper.startswith("PRESCRIPTION") or upper.startswith("**PRESCRIPTION"):
+ in_prescription = True
+ continue
+ elif in_prescription and (
+ upper.startswith("CONFIDENCE") or upper.startswith("**CONFIDENCE")
+ or upper.startswith("DIAGNOSIS") or upper.startswith("**DIAGNOSIS")
+ ):
+ break
+ elif in_prescription:
+ prescription_lines.append(line)
+
+ return "\n".join(prescription_lines).strip() if prescription_lines else "(no prescription section found)"
+
+ def _extract_tick_method(self) -> str:
+ """Extract the tick() method from sorting_controller.py for the prompt."""
+ try:
+ source = self.controller_path.read_text(encoding="utf-8")
+ except FileNotFoundError:
+ return "# Controller file not found"
+
+ lines = source.splitlines()
+ # Find "def tick(self):" and extract until next "def " at same indent level
+ start = None
+ for i, line in enumerate(lines):
+ if "def tick(self)" in line:
+ start = i
+ break
+
+ if start is None:
+ return "# tick() method not found in controller"
+
+ end = len(lines)
+ for i in range(start + 1, len(lines)):
+ # Next method at same indent level (4 spaces for class method)
+ stripped = lines[i]
+ if stripped.startswith(" def ") and i > start + 5:
+ end = i
+ break
+
+ # Cap at ~120 lines to avoid blowing context
+ if end - start > 120:
+ end = start + 120
+
+ return "\n".join(lines[start:end])
+
+ def _print_result(self, result: dict):
+ """Print formatted iteration result."""
+ print(f"\n{'- '*35}")
+ print(f"Scenario: {result['scenario']} ({result.get('note', '')})")
+ print(f"Frame: {result['frame_path']}")
+
+ if result.get("reasoning") and result["reasoning"] != "(dry run)":
+ print(f"\nCOSMOS R2 CHAIN-OF-THOUGHT:")
+ # Truncate reasoning for display
+ reasoning = result["reasoning"]
+ if len(reasoning) > 500:
+ reasoning = reasoning[:500] + "..."
+ for line in reasoning.splitlines():
+ print(f" {line}")
+
+ if result.get("diagnosis") and result["diagnosis"] != "(dry run -- no Cosmos R2 call)":
+ print(f"\nDIAGNOSIS:")
+ print(f" {result['diagnosis'][:300]}")
+
+ if result.get("prescription") and result["prescription"] != "(dry run)":
+ print(f"\nPRESCRIPTION:")
+ for line in result["prescription"].splitlines()[:10]:
+ print(f" {line}")
+
+ confidence = result.get("confidence", "N/A")
+ elapsed = result.get("elapsed_s", 0)
+ tokens = result.get("tokens", 0)
+ print(f"\nConfidence: {confidence} | Elapsed: {elapsed}s | Tokens: {tokens}")
+
+ if result.get("error"):
+ print(f"\nERROR: {result['error']}")
+
+ def run_session(self):
+ """Record continuous session, chunk it, diagnose each chunk."""
+ if self.obs_clip:
+ # Pre-recorded file -- still needs chunking
+ session_path = Path(self.obs_clip)
+ print(f"Using pre-recorded session: {session_path}")
+ print(f"\nSplitting into {self.chunk_duration}s chunks...")
+ chunks = chunk_session(session_path, chunk_duration=self.chunk_duration)
+ print(f"Split into {len(chunks)} chunks.\n")
+ if not chunks:
+ print("ERROR: No chunks produced. Recording may be too short.")
+ return
+ chunk_paths = [Path(c["chunk_file"]) for c in chunks]
+ else:
+ # Live recording -- already produces rolling chunks
+ print("Recording rolling chunks. To stop:")
+ print(" python cookoff/capture_fio.py stop")
+ chunk_paths = record_session(
+ label="session",
+ chunk_duration=self.chunk_duration,
+ )
+
+ if not chunk_paths:
+ print("ERROR: No chunks produced.")
+ return
+
+ # Diagnose each chunk
+ for i, chunk_path in enumerate(chunk_paths):
+ print(f"\n{'='*70}")
+ print(f" CHUNK {i+1}/{len(chunk_paths)}: {chunk_path.name}")
+ print(f"{'='*70}")
+
+ # Read live tags
+ tags = self.fio.read_all_values() or {}
+ if tags:
+ on_tags = [k for k, v in tags.items() if isinstance(v, bool) and v]
+ print(f" Tags: {len(tags)} total, active: {on_tags or 'none'}")
+
+ if self.dry_run:
+ print(f" DRY RUN -- skipping R2 for {chunk_path.name}")
+ self.results.append({
+ "iteration": i + 1,
+ "scenario": f"chunk_{i+1}",
+ "frame_path": str(chunk_path),
+ "tags": tags,
+ "diagnosis": "(dry run)",
+ "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
+ })
+ continue
+
+ code_snippet = self._extract_tick_method()
+ cosmos_result = self._call_cosmos(chunk_path, tags, code_snippet, f"chunk_{i+1}")
+
+ result = {
+ "iteration": i + 1,
+ "scenario": f"chunk_{i+1}",
+ "frame_path": str(chunk_path),
+ "tags": tags,
+ "code_lines": len(code_snippet.splitlines()),
+ "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"),
+ **cosmos_result,
+ }
+ self.results.append(result)
+ self._print_result(result)
+
+ self._print_summary()
+
+ def run_all(self):
+ """Run all scenarios sequentially."""
+ for name in SCENARIOS:
+ self.run_iteration(name)
+ if name != list(SCENARIOS.keys())[-1]:
+ print("\n Waiting 3s before next scenario...")
+ time.sleep(3.0)
+ self._print_summary()
+
+ def run_interactive(self):
+ """Interactive menu for running scenarios."""
+ print("\n" + "="*70)
+ print(" FACTORYLM VISUAL TEST LOOP -- Interactive Mode")
+ print("="*70)
+ print(f"\n Factory I/O: {self.fio_url}")
+ print(f" vLLM: {self.vllm_url}")
+ print(f" Controller: {self.controller_path}")
+ print(f" Dry run: {self.dry_run}")
+ print()
+
+ while True:
+ print("\nAvailable scenarios:")
+ for i, (name, sc) in enumerate(SCENARIOS.items(), 1):
+ fault = sc["id"] or "none"
+ print(f" {i}. {name:<15} (fault: {fault}) -- {sc['note']}")
+
+ print(f"\n a. Run ALL scenarios")
+ print(f" s. Show summary")
+ print(f" q. Quit")
+
+ try:
+ choice = input("\nSelect scenario (name, number, a/s/q): ").strip().lower()
+ except (EOFError, KeyboardInterrupt):
+ print()
+ break
+
+ if choice in ("q", "quit", "exit"):
+ break
+ elif choice in ("a", "all"):
+ self.run_all()
+ elif choice in ("s", "summary"):
+ self._print_summary()
+ elif choice in SCENARIOS:
+ self.run_iteration(choice)
+ self._prompt_apply_fix()
+ elif choice.isdigit():
+ idx = int(choice) - 1
+ names = list(SCENARIOS.keys())
+ if 0 <= idx < len(names):
+ self.run_iteration(names[idx])
+ self._prompt_apply_fix()
+ else:
+ print(f" Invalid number. Choose 1-{len(names)}.")
+ else:
+ print(f" Unknown: '{choice}'")
+
+ self._print_summary()
+
+ def _prompt_apply_fix(self):
+ """Ask operator whether to apply the prescribed fix."""
+ if self.dry_run or not self.results:
+ return
+ last = self.results[-1]
+ if last.get("prescription") and last["prescription"] != "(no prescription section found)":
+ try:
+ answer = input("\nApply fix? (y/n/skip): ").strip().lower()
+ except (EOFError, KeyboardInterrupt):
+ return
+ if answer == "y":
+ print(" -> Manual fix application mode: review prescription above and edit code.")
+ print(" (Automatic code mutation not implemented -- safe by design)")
+ elif answer == "n":
+ print(" -> Fix skipped.")
+
+ def _print_summary(self):
+ """Print summary of all iterations."""
+ if not self.results:
+ print("\nNo iterations completed yet.")
+ return
+
+ print(f"\n{'='*70}")
+ print(f" SESSION SUMMARY -- {len(self.results)} iterations")
+ print(f"{'='*70}")
+
+ for r in self.results:
+ status = "OK" if not r.get("error") else "ERR"
+ conf = r.get("confidence", "N/A")
+ elapsed = r.get("elapsed_s", 0)
+ print(f" [{status}] iter{r['iteration']} {r['scenario']:<15} "
+ f"confidence={conf:<7} elapsed={elapsed}s")
+
+ # Save results to JSON
+ out_path = REPO_ROOT / "cookoff" / "clips" / f"session_{time.strftime('%Y%m%d_%H%M%S')}.json"
+ out_path.parent.mkdir(parents=True, exist_ok=True)
+ with open(out_path, "w") as f:
+ # Strip raw_response and tags to keep file small
+ slim = []
+ for r in self.results:
+ entry = {k: v for k, v in r.items() if k not in ("raw_response", "tags")}
+ slim.append(entry)
+ json.dump(slim, f, indent=2, default=str)
+ print(f"\n Results saved: {out_path}")
+
+
+# ---------------------------------------------------------------------------
+# CLI
+# ---------------------------------------------------------------------------
+def main():
+ parser = argparse.ArgumentParser(
+ description="FactoryLM Visual Test Loop -- Cosmos R2 + Factory I/O",
+ formatter_class=argparse.RawDescriptionHelpFormatter,
+ )
+ parser.add_argument("--scenario", help="Run a single scenario by name")
+ parser.add_argument("--all", action="store_true", help="Run all scenarios")
+ parser.add_argument("--dry-run", action="store_true", help="Skip Cosmos R2 call")
+ parser.add_argument("--vllm-url", default=os.getenv("VLLM_URL", VLLM_URL),
+ help="vLLM endpoint URL")
+ parser.add_argument("--fio-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410"),
+ help="Factory I/O Web API URL")
+ parser.add_argument("--controller", default="sim/sorting_controller.py",
+ help="Path to sorting controller (relative to repo root)")
+ parser.add_argument("--obs-clip", default=None,
+ help="Path to a pre-recorded clip (skip live capture)")
+ parser.add_argument("--clip-duration", type=float, default=8,
+ help="Seconds to record per clip (default: 8)")
+ parser.add_argument("--session", action="store_true",
+ help="Record continuous session, chunk it, diagnose each chunk")
+ parser.add_argument("--chunk-duration", type=float, default=8,
+ help="Seconds per chunk in session mode (default: 8)")
+ args = parser.parse_args()
+
+ loop = VisualTestLoop(
+ vllm_url=args.vllm_url,
+ fio_url=args.fio_url,
+ controller_path=args.controller,
+ dry_run=args.dry_run,
+ obs_clip=args.obs_clip,
+ clip_duration=args.clip_duration,
+ chunk_duration=args.chunk_duration,
+ )
+
+ if not loop.check_prerequisites():
+ sys.exit(1)
+
+ if args.session:
+ loop.run_session()
+ elif args.all:
+ loop.run_all()
+ elif args.scenario:
+ loop.run_iteration(args.scenario)
+ else:
+ loop.run_interactive()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/sim/fault_injector.py b/sim/fault_injector.py
new file mode 100644
index 0000000..318d7ba
--- /dev/null
+++ b/sim/fault_injector.py
@@ -0,0 +1,231 @@
+"""
+Factory I/O Fault Injector — Demo fault injection via Web API.
+
+Uses Factory I/O's force/release endpoints to simulate equipment faults
+for the Cosmos Cookoff diagnosis demo.
+
+Usage:
+ python sim/fault_injector.py --fault sensor # Force sensor failure
+ python sim/fault_injector.py --fault jam # Simulate conveyor jam
+ python sim/fault_injector.py --fault estop # Trigger E-stop
+ python sim/fault_injector.py --fault motor # Motor overload (stop conveyor)
+ python sim/fault_injector.py --clear # Release all forced tags
+ python sim/fault_injector.py --list # Show available faults
+ python sim/fault_injector.py --interactive # Interactive mode
+"""
+
+import argparse
+import json
+import logging
+import os
+import sys
+import time
+from pathlib import Path
+
+_repo_root = str(Path(__file__).resolve().parent.parent)
+if _repo_root not in sys.path:
+ sys.path.insert(0, _repo_root)
+
+from sim.sorting_controller import FactoryIOClient, load_config
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+ datefmt="%H:%M:%S",
+)
+logger = logging.getLogger("fault_injector")
+
+
+# Fault definitions using actual Factory I/O tag names for "Sorting by Height (Basic)"
+FAULTS = {
+ "sensor": {
+ "name": "Sensor Failure",
+ "description": "Height sensors forced OFF — parts pass unsorted",
+ "force": {"High sensor": False, "Low sensor": False},
+ },
+ "jam": {
+ "name": "Conveyor Jam",
+ "description": "Pallet sensor stuck ON — appears as permanent blockage",
+ "force": {"Pallet sensor": True},
+ },
+ "estop": {
+ "name": "Emergency Stop",
+ "description": "E-stop forced active (False = pressed in Factory I/O)",
+ "force": {"Emergency stop": False},
+ },
+ "motor": {
+ "name": "Motor Failure",
+ "description": "Entry conveyor stops — parts pile up",
+ "force": {"Conveyor entry": False},
+ },
+ "sorter_stuck_left": {
+ "name": "Sorter Stuck Left",
+ "description": "Left transfer arm stays engaged — all parts go left",
+ "force": {"Transf. left": True, "Transf. right": False},
+ },
+ "sorter_stuck_right": {
+ "name": "Sorter Stuck Right",
+ "description": "Right transfer arm stays engaged — all parts go right",
+ "force": {"Transf. right": True, "Transf. left": False},
+ },
+ "blind": {
+ "name": "Sensor Blind",
+ "description": "Low sensor forced OFF — controller can't detect any parts",
+ "force": {"Low sensor": False},
+ },
+ "emitter_off": {
+ "name": "No Parts",
+ "description": "Emitter forced OFF — no new parts entering system",
+ "force": {"Emitter": False},
+ },
+}
+
+
+class FaultInjector:
+ """Inject and clear faults via Factory I/O Web API force endpoints."""
+
+ def __init__(self, client: FactoryIOClient):
+ self.fio = client
+ self._active_faults: dict[str, list[str]] = {}
+
+ def inject(self, fault_id: str, duration: float = 0) -> bool:
+ if fault_id not in FAULTS:
+ logger.error("Unknown fault: %s. Available: %s", fault_id, list(FAULTS.keys()))
+ return False
+
+ fault = FAULTS[fault_id]
+ logger.info("Injecting: %s — %s", fault["name"], fault["description"])
+
+ forced_tags = []
+ for tag_name, value in fault["force"].items():
+ if self.fio.force_tag(tag_name, value):
+ forced_tags.append(tag_name)
+ logger.info(" Forced: %s = %s", tag_name, value)
+ else:
+ logger.warning(" FAILED to force: %s", tag_name)
+
+ self._active_faults[fault_id] = forced_tags
+
+ if duration > 0:
+ logger.info("Auto-clear in %.1fs", duration)
+ time.sleep(duration)
+ self.clear(fault_id)
+
+ return len(forced_tags) > 0
+
+ def clear(self, fault_id: str = "") -> bool:
+ if fault_id and fault_id in self._active_faults:
+ for tag_name in self._active_faults.pop(fault_id):
+ self.fio.release_tag(tag_name)
+ logger.info(" Released: %s", tag_name)
+ logger.info("Cleared: %s", fault_id)
+ return True
+ elif not fault_id:
+ # Clear everything
+ all_tags = set()
+ for tags in self._active_faults.values():
+ all_tags.update(tags)
+ for tag_name in all_tags:
+ self.fio.release_tag(tag_name)
+ logger.info(" Released: %s", tag_name)
+ # Safety: also release all known fault tags
+ for fault in FAULTS.values():
+ for tag_name in fault["force"]:
+ self.fio.release_tag(tag_name)
+ count = len(self._active_faults)
+ self._active_faults.clear()
+ logger.info("Cleared all faults (%d)", count)
+ return True
+ else:
+ logger.warning("Fault '%s' not active", fault_id)
+ return False
+
+ @property
+ def active_faults(self) -> list[str]:
+ return list(self._active_faults.keys())
+
+ def status(self) -> dict:
+ return {
+ "active": [
+ {"id": fid, "name": FAULTS[fid]["name"], "forced_tags": tags}
+ for fid, tags in self._active_faults.items()
+ ],
+ "available": [
+ {"id": fid, "name": f["name"], "description": f["description"]}
+ for fid, f in FAULTS.items()
+ ],
+ }
+
+
+def main():
+ parser = argparse.ArgumentParser(description="Factory I/O Fault Injector")
+ parser.add_argument("--api-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410"))
+ parser.add_argument("--config", default="config/sorting_tags.yaml")
+ parser.add_argument("--fault", help="Fault ID to inject")
+ parser.add_argument("--duration", type=float, default=0, help="Auto-clear after N seconds")
+ parser.add_argument("--clear", action="store_true", help="Clear all faults")
+ parser.add_argument("--list", action="store_true", help="List available faults")
+ parser.add_argument("--interactive", action="store_true", help="Interactive mode")
+ args = parser.parse_args()
+
+ if args.list:
+ print(f"\n{'='*60}")
+ print("Available Faults")
+ print(f"{'='*60}")
+ for fid, f in FAULTS.items():
+ print(f" {fid:<20} {f['name']}")
+ print(f" {'':20} {f['description']}")
+ print(f" {'':20} Forces: {json.dumps(f['force'])}")
+ print()
+ return
+
+ config = load_config(args.config)
+ client = FactoryIOClient(base_url=args.api_url)
+
+ if not client.check_connection():
+ logger.error("Cannot connect to Factory I/O at %s", args.api_url)
+ sys.exit(1)
+
+ injector = FaultInjector(client)
+
+ if args.clear:
+ injector.clear()
+ return
+
+ if args.fault:
+ injector.inject(args.fault, duration=args.duration)
+ return
+
+ if args.interactive:
+ print("\nInteractive Fault Injector")
+ print("Commands: inject , clear [fault], status, list, quit\n")
+ while True:
+ try:
+ cmd = input("fault> ").strip().split()
+ except (EOFError, KeyboardInterrupt):
+ print()
+ break
+ if not cmd:
+ continue
+ if cmd[0] in ("quit", "exit"):
+ break
+ elif cmd[0] == "inject" and len(cmd) > 1:
+ dur = float(cmd[2]) if len(cmd) > 2 else 0
+ injector.inject(cmd[1], duration=dur)
+ elif cmd[0] == "clear":
+ injector.clear(cmd[1] if len(cmd) > 1 else "")
+ elif cmd[0] == "status":
+ print(json.dumps(injector.status(), indent=2))
+ elif cmd[0] == "list":
+ for fid in FAULTS:
+ print(f" {fid}: {FAULTS[fid]['name']}")
+ else:
+ print(f"Unknown: {cmd[0]}")
+ injector.clear()
+ return
+
+ parser.print_help()
+
+
+if __name__ == "__main__":
+ main()
diff --git a/sim/sorting_controller.py b/sim/sorting_controller.py
new file mode 100644
index 0000000..6855e48
--- /dev/null
+++ b/sim/sorting_controller.py
@@ -0,0 +1,791 @@
+"""
+Factory I/O Sorting Controller — Python Soft PLC.
+
+Drives the "Sorting by Height (Basic)" scene via Factory I/O's Web API.
+No CODESYS or PLC hardware required.
+
+Architecture:
+ Factory I/O (scene running, Web API enabled on :7410)
+ | HTTP REST (id-based, lowercase fields)
+ This script (reads sensors, runs state machine, writes actuators)
+ | Optional
+ /api/plc/live endpoint for Cosmos diagnosis
+
+API contract (Factory I/O Web API — EmbedIO server):
+ GET /api/tags -> [{name, id, address, type, kind, value, isForced, ...}]
+ GET /api/tag/values -> [{id, value}]
+ PUT /api/tag/values <- [{id, value}] (output tags only)
+ PUT /api/tag/values-force <- [{id, value}]
+ PUT /api/tag/values-release <- ["id"]
+
+Usage:
+ python sim/sorting_controller.py # Run controller
+ python sim/sorting_controller.py --discover # Dump all tags
+ python sim/sorting_controller.py --api-url http://host:7410
+ python sim/sorting_controller.py --dry-run # Read-only mode
+"""
+
+import argparse
+import datetime
+import json
+import logging
+import os
+import signal
+import sys
+import time
+from enum import IntEnum
+from pathlib import Path
+from threading import Event, Lock, Thread
+
+_repo_root = str(Path(__file__).resolve().parent.parent)
+if _repo_root not in sys.path:
+ sys.path.insert(0, _repo_root)
+
+import httpx
+
+logging.basicConfig(
+ level=logging.INFO,
+ format="%(asctime)s %(levelname)s %(message)s",
+ datefmt="%H:%M:%S",
+)
+logger = logging.getLogger("sorting_controller")
+# Suppress noisy httpx request logging
+logging.getLogger("httpx").setLevel(logging.WARNING)
+logging.getLogger("httpcore").setLevel(logging.WARNING)
+
+
+# ---------------------------------------------------------------------------
+# State machine (modeled after recovery/Micro820_v1.9.st CASE pattern)
+# ---------------------------------------------------------------------------
+class State(IntEnum):
+ INIT = 0 # All outputs OFF, stop light ON, waiting for Start
+ RUNNING = 1 # Entry conveyor ON, green light ON, waiting for box
+ SETTLING = 2 # Conveyor stopped, waiting 3 scans for sensors to settle
+ SORTING_LEFT = 3 # Transfer left active, waiting for exit sensor
+ SORTING_RIGHT = 4 # Transfer right active, waiting for exit sensor
+ STOPPED = 5 # All outputs OFF, yellow light ON, waiting for Reset
+ ESTOP = 6 # All outputs OFF immediately, red light ON
+
+
+STATE_NAMES = {s: s.name for s in State}
+
+
+# ---------------------------------------------------------------------------
+# Factory I/O Web API client (id-based, lowercase fields)
+# ---------------------------------------------------------------------------
+class FactoryIOClient:
+ """HTTP client for Factory I/O Web API (port 7410).
+
+ Factory I/O uses UUIDs (id) to identify tags, not names.
+ On init, we fetch /api/tags to build name<->id lookup tables.
+ All read/write operations use IDs internally.
+ """
+
+ def __init__(self, base_url: str = "http://localhost:7410", timeout: float = 2.0):
+ self.base_url = base_url.rstrip("/")
+ self.client = httpx.Client(timeout=timeout)
+ self._connected = False
+ # Lookup tables built from /api/tags
+ self._name_to_id: dict[str, str] = {} # "Conveyor entry" -> "uuid" (prefers Output)
+ self._id_to_name: dict[str, str] = {} # "uuid" -> "Conveyor entry"
+ self._tag_meta: dict[str, dict] = {} # name -> full tag dict
+ self._output_names: set[str] = set()
+ self._input_name_to_id: dict[str, str] = {} # "Start" -> input UUID
+ self._kind_by_id: dict[str, str] = {} # UUID -> "Input"/"Output"
+
+ def check_connection(self) -> bool:
+ """Connect and build name<->id lookup from /api/tags."""
+ try:
+ r = self.client.get(f"{self.base_url}/api/tags")
+ r.raise_for_status()
+ tags = r.json()
+ self._name_to_id.clear()
+ self._id_to_name.clear()
+ self._tag_meta.clear()
+ self._output_names.clear()
+ self._input_name_to_id.clear()
+ self._kind_by_id.clear()
+ collisions = []
+ input_count = 0
+ output_count = 0
+ for t in tags:
+ name = t["name"]
+ tid = t["id"]
+ kind = t.get("kind", "")
+ # Track kind for every UUID
+ self._kind_by_id[tid] = kind
+ # Map ALL UUIDs to names (not just the winner)
+ self._id_to_name[tid] = name
+ if kind == "Input":
+ input_count += 1
+ self._input_name_to_id[name] = tid
+ if kind == "Output":
+ output_count += 1
+ self._output_names.add(name)
+ # _name_to_id prefers Output (for writes via _name_id)
+ if name in self._name_to_id:
+ if kind == "Output":
+ collisions.append(name)
+ self._name_to_id[name] = tid
+ # else: keep existing (Output already stored, or first-seen)
+ else:
+ self._name_to_id[name] = tid
+ self._tag_meta[name] = t
+ self._connected = True
+ for c in collisions:
+ logger.warning("Tag name collision: '%s' has both Input and Output — reads will prefer Input", c)
+ logger.info("Loaded %d tags (%d inputs, %d outputs)", len(self._name_to_id), input_count, output_count)
+ return True
+ except Exception as e:
+ logger.warning("Factory I/O not reachable at %s: %s", self.base_url, e)
+ self._connected = False
+ return False
+
+ @property
+ def connected(self) -> bool:
+ return self._connected
+
+ def _name_id(self, name: str) -> str | None:
+ tid = self._name_to_id.get(name)
+ if tid is None:
+ logger.warning("Unknown tag name: %s", name)
+ return tid
+
+ def get_all_tags(self) -> list[dict] | None:
+ """GET /api/tags — full tag metadata."""
+ try:
+ r = self.client.get(f"{self.base_url}/api/tags")
+ r.raise_for_status()
+ return r.json()
+ except Exception as e:
+ logger.error("Failed to get tags: %s", e)
+ return None
+
+ def read_all_values(self) -> dict | None:
+ """GET /api/tag/values -> dict of tag_name: value.
+
+ When a name has both Input and Output tags, the Input value wins.
+ This ensures sensor reads are never shadowed by actuator values.
+ """
+ try:
+ r = self.client.get(f"{self.base_url}/api/tag/values")
+ r.raise_for_status()
+ raw = r.json() # [{id, value}, ...]
+ result = {}
+ for item in raw:
+ tid = item["id"]
+ name = self._id_to_name.get(tid)
+ if not name:
+ continue
+ kind = self._kind_by_id.get(tid, "")
+ # If name already stored and current tag is NOT Input, skip —
+ # Input (sensor) values always win over Output (actuator) values.
+ if name in result and kind != "Input":
+ continue
+ result[name] = item["value"]
+ return result
+ except Exception as e:
+ logger.warning("Read values failed: %s", e)
+ self._connected = False
+ return None
+
+ def write_tags(self, tag_values: dict) -> bool:
+ """Write multiple tags by name. Only writes Output tags."""
+ payload = []
+ for name, value in tag_values.items():
+ tid = self._name_id(name)
+ if tid and name in self._output_names:
+ payload.append({"id": tid, "value": value})
+ if not payload:
+ return True
+ try:
+ r = self.client.put(
+ f"{self.base_url}/api/tag/values",
+ json=payload,
+ )
+ r.raise_for_status()
+ # Check for errors in response
+ resp = r.json()
+ for item in resp:
+ if item.get("error"):
+ logger.warning("Write error: %s", item)
+ return True
+ except Exception as e:
+ logger.warning("Write tags failed: %s", e)
+ return False
+
+ def force_tag(self, tag_name: str, value) -> bool:
+ """Force a tag value (overrides scene logic)."""
+ tid = self._name_id(tag_name)
+ if not tid:
+ return False
+ try:
+ r = self.client.put(
+ f"{self.base_url}/api/tag/values-force",
+ json=[{"id": tid, "value": value}],
+ )
+ r.raise_for_status()
+ return True
+ except Exception as e:
+ logger.warning("Force %s=%s failed: %s", tag_name, value, e)
+ return False
+
+ def release_tag(self, tag_name: str) -> bool:
+ """Release a forced tag (return to scene logic)."""
+ tid = self._name_id(tag_name)
+ if not tid:
+ return False
+ try:
+ r = self.client.put(
+ f"{self.base_url}/api/tag/values-release",
+ json=[tid],
+ )
+ r.raise_for_status()
+ return True
+ except Exception as e:
+ logger.warning("Release %s failed: %s", tag_name, e)
+ return False
+
+
+# ---------------------------------------------------------------------------
+# Config loader
+# ---------------------------------------------------------------------------
+def load_config(path: str = "config/sorting_tags.yaml") -> dict:
+ """Load tag mapping config. Returns defaults matching 'Sorting by Height (Basic)' scene."""
+ defaults = {
+ "api_url": "http://localhost:7410",
+ "poll_interval_ms": 100,
+ # Actual Factory I/O tag names for "Sorting by Height (Basic)"
+ "sensors": {
+ "high_sensor": "High sensor",
+ "low_sensor": "Low sensor",
+ "pallet_sensor": "Pallet sensor",
+ "at_left_entry": "At left entry",
+ "at_left_exit": "At left exit",
+ "at_right_entry": "At right entry",
+ "at_right_exit": "At right exit",
+ "start": "Start",
+ "stop": "Stop",
+ "reset": "Reset",
+ "emergency_stop": "Emergency stop",
+ "auto": "Auto",
+ "manual": "Manual",
+ "loaded": "Loaded",
+ },
+ "actuators": {
+ "entry_conveyor": "Conveyor entry",
+ "left_conveyor": "Conveyor left",
+ "right_conveyor": "Conveyor right",
+ "transfer_left": "Transf. left",
+ "transfer_right": "Transf. right",
+ "load": "Load",
+ "unload": "Unload",
+ "emitter": "Emitter",
+ "remover_left": "Remover left",
+ "remover_right": "Remover right",
+ "counter": "Counter",
+ "start_light": "Start light",
+ "stop_light": "Stop light",
+ "reset_light": "Reset light",
+ },
+ }
+
+ cfg_file = Path(path)
+ if cfg_file.exists():
+ try:
+ import yaml
+ with cfg_file.open("r", encoding="utf-8") as f:
+ raw = yaml.safe_load(f) or {}
+ fio = raw.get("factoryio", raw)
+ defaults["api_url"] = fio.get("api_url", defaults["api_url"])
+ defaults["poll_interval_ms"] = fio.get("poll_interval_ms", defaults["poll_interval_ms"])
+ for section in ("sensors", "actuators"):
+ if section in fio:
+ defaults[section].update(fio[section])
+ logger.info("Config loaded from %s", cfg_file)
+ except ImportError:
+ logger.warning("PyYAML not installed — using defaults")
+ except Exception:
+ logger.exception("Config load error from %s", cfg_file)
+ return defaults
+
+
+# ---------------------------------------------------------------------------
+# Sorting Controller
+# ---------------------------------------------------------------------------
+class SortingController:
+ """State machine for "Sorting by Height (Basic)" via Factory I/O Web API.
+
+ Rebuilt from official Factory I/O template logic:
+ - Rising edge detection on Start/Stop/Reset (no re-trigger on held buttons)
+ - E-stop and Stop use NC convention (False=engaged/pressed)
+ - Height classification uses BOTH high and low sensors
+ - Settling delay (3 scans) before reading height after conveyor stop
+ - Sensor-based transfer completion with 15s watchdog safety net
+ """
+
+ # Settling delay: number of scans to wait after conveyor stop
+ SETTLE_SCANS = 3
+ # Watchdog: max scans before force-completing a transfer (15s at 100ms)
+ WATCHDOG_SCANS = 150
+
+ def __init__(self, client: FactoryIOClient, config: dict, dry_run: bool = False):
+ self.fio = client
+ self.cfg = config
+ self.dry_run = dry_run
+
+ self.s = config["sensors"]
+ self.a = config["actuators"]
+
+ # State
+ self.state = State.INIT
+ self.tags: dict = {}
+ self.sorted_count = 0
+ self.total_count = 0
+ self.tall_count = 0
+ self.short_count = 0
+ self.last_sort_result = ""
+ self.start_time = time.monotonic()
+
+ # Settling / watchdog counters
+ self._settle_count = 0
+ self._watchdog_count = 0
+
+ # Rising edge detection: track "active" state (True=action requested)
+ # Start/Reset are NO (True=pressed), Stop is NC (True=released, invert)
+ self._prev_start = False
+ self._prev_stop = False # tracks stop_active (inverted NC signal)
+ self._prev_reset = False
+ # Exit sensor edges for transfer completion
+ self._prev_at_left_exit = False
+ self._prev_at_right_exit = False
+
+ # Thread-safe tag snapshot for diagnosis API
+ self._lock = Lock()
+ self._snapshot: dict = {}
+
+ def get_snapshot(self) -> dict:
+ with self._lock:
+ return dict(self._snapshot)
+
+ def _update_snapshot(self):
+ with self._lock:
+ self._snapshot = {
+ "timestamp": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(),
+ "state": STATE_NAMES.get(self.state, "UNKNOWN"),
+ "state_id": int(self.state),
+ "sorted_count": self.sorted_count,
+ "tall_count": self.tall_count,
+ "short_count": self.short_count,
+ "total_count": self.total_count,
+ "last_sort_result": self.last_sort_result,
+ "uptime_s": round(time.monotonic() - self.start_time, 1),
+ "dry_run": self.dry_run,
+ "tags": dict(self.tags),
+ "tag_source": "factoryio_webapi",
+ }
+
+ def _read_sensors(self) -> bool:
+ values = self.fio.read_all_values()
+ if values is None:
+ return False
+ self.tags = values
+ return True
+
+ def _write(self, tag_values: dict):
+ if self.dry_run:
+ return
+ self.fio.write_tags(tag_values)
+
+ def _sensor(self, logical_name: str, default=False):
+ tag_name = self.s.get(logical_name, logical_name)
+ return self.tags.get(tag_name, default)
+
+ def _all_outputs_off(self):
+ """Kill all actuators immediately."""
+ self._write({
+ self.a["entry_conveyor"]: False,
+ self.a["left_conveyor"]: False,
+ self.a["right_conveyor"]: False,
+ self.a["emitter"]: False,
+ self.a["transfer_left"]: False,
+ self.a["transfer_right"]: False,
+ self.a["start_light"]: False,
+ self.a["stop_light"]: False,
+ self.a["reset_light"]: False,
+ })
+
+ def tick(self):
+ """One scan cycle of the sorting state machine."""
+ if not self._read_sensors():
+ return
+
+ prev_state = self.state
+
+ # --- Button active states (normalized: True = action requested) ---
+ start_active = self._sensor("start") # NO: True=pressed
+ stop_active = not self._sensor("stop") # NC: True=released -> invert
+ reset_active = self._sensor("reset") # NO: True=pressed
+ estop_engaged = not self._sensor("emergency_stop") # NC: True=safe -> invert
+
+ # --- Rising edge detection (curr AND NOT prev) ---
+ start_rising = start_active and not self._prev_start
+ stop_rising = stop_active and not self._prev_stop
+ reset_rising = reset_active and not self._prev_reset
+
+ # --- Exit sensor rising edges (for transfer completion) ---
+ at_left_exit = self._sensor("at_left_exit")
+ at_right_exit = self._sensor("at_right_exit")
+ left_exit_rising = at_left_exit and not self._prev_at_left_exit
+ right_exit_rising = at_right_exit and not self._prev_at_right_exit
+
+ # --- E-stop check: highest priority, every tick, before state logic ---
+ if estop_engaged and self.state != State.ESTOP:
+ self._all_outputs_off()
+ self._write({self.a["stop_light"]: True}) # red indicator
+ self.state = State.ESTOP
+ logger.warning("E-STOP ENGAGED — all outputs OFF")
+
+ # --- State machine ---
+ elif self.state == State.INIT:
+ # All outputs off, stop light on, waiting for Start
+ self._write({
+ self.a["entry_conveyor"]: False,
+ self.a["left_conveyor"]: True,
+ self.a["right_conveyor"]: True,
+ self.a["emitter"]: False,
+ self.a["remover_left"]: True,
+ self.a["remover_right"]: True,
+ self.a["transfer_left"]: False,
+ self.a["transfer_right"]: False,
+ self.a["start_light"]: True,
+ self.a["stop_light"]: False,
+ self.a["reset_light"]: False,
+ self.a["counter"]: 0,
+ })
+ if start_rising:
+ self.state = State.RUNNING
+ logger.info("INIT -> RUNNING: start pressed")
+
+ elif self.state == State.RUNNING:
+ if stop_rising:
+ self.state = State.STOPPED
+ logger.info("RUNNING -> STOPPED: stop pressed")
+ else:
+ # Entry conveyor ON, waiting for box at pallet sensor
+ pallet = self._sensor("pallet_sensor")
+ if pallet:
+ # Box arrived — stop conveyor, begin settling
+ self._write({self.a["entry_conveyor"]: False})
+ self._settle_count = 0
+ self.state = State.SETTLING
+ logger.info("RUNNING -> SETTLING: box at pallet sensor")
+ else:
+ # Keep running
+ self._write({
+ self.a["entry_conveyor"]: True,
+ self.a["left_conveyor"]: True,
+ self.a["right_conveyor"]: True,
+ self.a["emitter"]: True,
+ self.a["remover_left"]: True,
+ self.a["remover_right"]: True,
+ self.a["transfer_left"]: False,
+ self.a["transfer_right"]: False,
+ self.a["start_light"]: True,
+ self.a["stop_light"]: False,
+ })
+
+ elif self.state == State.SETTLING:
+ if stop_rising:
+ self.state = State.STOPPED
+ logger.info("SETTLING -> STOPPED: stop pressed")
+ else:
+ # Wait for sensors to settle after conveyor stop
+ self._settle_count += 1
+ if self._settle_count >= self.SETTLE_SCANS:
+ # Read height sensors
+ high = self._sensor("high_sensor")
+ low = self._sensor("low_sensor")
+ if high and low:
+ # TALL box -> route LEFT
+ self._write({
+ self.a["transfer_left"]: True,
+ self.a["transfer_right"]: False,
+ self.a["left_conveyor"]: True,
+ })
+ self._watchdog_count = 0
+ self.state = State.SORTING_LEFT
+ logger.info("SETTLING -> SORTING_LEFT: tall box (high=%s low=%s)", high, low)
+ elif low and not high:
+ # SHORT box -> route RIGHT
+ self._write({
+ self.a["transfer_left"]: False,
+ self.a["transfer_right"]: True,
+ self.a["right_conveyor"]: True,
+ })
+ self._watchdog_count = 0
+ self.state = State.SORTING_RIGHT
+ logger.info("SETTLING -> SORTING_RIGHT: short box (high=%s low=%s)", high, low)
+ else:
+ # Neither sensor triggered — box may have moved, go back to RUNNING
+ logger.warning("SETTLING: no height detected (high=%s low=%s) — returning to RUNNING", high, low)
+ self.state = State.RUNNING
+
+ elif self.state == State.SORTING_LEFT:
+ if stop_rising:
+ self._write({self.a["transfer_left"]: False})
+ self.state = State.STOPPED
+ logger.info("SORTING_LEFT -> STOPPED: stop pressed")
+ else:
+ self._watchdog_count += 1
+ if left_exit_rising:
+ # Box reached left exit — transfer complete
+ self._write({self.a["transfer_left"]: False})
+ self.tall_count += 1
+ self.sorted_count += 1
+ self.last_sort_result = "LEFT"
+ self._write({self.a["counter"]: self.sorted_count})
+ self.state = State.RUNNING
+ logger.info("Part sorted LEFT (tall) [#%d]", self.sorted_count)
+ elif self._watchdog_count >= self.WATCHDOG_SCANS:
+ # Watchdog: exit sensor never fired — force-complete
+ self._write({self.a["transfer_left"]: False})
+ self.tall_count += 1
+ self.sorted_count += 1
+ self.last_sort_result = "LEFT"
+ self._write({self.a["counter"]: self.sorted_count})
+ self.state = State.RUNNING
+ logger.warning("WATCHDOG: left transfer timed out after %d scans — force-completing", self._watchdog_count)
+
+ elif self.state == State.SORTING_RIGHT:
+ if stop_rising:
+ self._write({self.a["transfer_right"]: False})
+ self.state = State.STOPPED
+ logger.info("SORTING_RIGHT -> STOPPED: stop pressed")
+ else:
+ self._watchdog_count += 1
+ if right_exit_rising:
+ # Box reached right exit — transfer complete
+ self._write({self.a["transfer_right"]: False})
+ self.short_count += 1
+ self.sorted_count += 1
+ self.last_sort_result = "RIGHT"
+ self._write({self.a["counter"]: self.sorted_count})
+ self.state = State.RUNNING
+ logger.info("Part sorted RIGHT (short) [#%d]", self.sorted_count)
+ elif self._watchdog_count >= self.WATCHDOG_SCANS:
+ # Watchdog: exit sensor never fired — force-complete
+ self._write({self.a["transfer_right"]: False})
+ self.short_count += 1
+ self.sorted_count += 1
+ self.last_sort_result = "RIGHT"
+ self._write({self.a["counter"]: self.sorted_count})
+ self.state = State.RUNNING
+ logger.warning("WATCHDOG: right transfer timed out after %d scans — force-completing", self._watchdog_count)
+
+ elif self.state == State.STOPPED:
+ self._write({
+ self.a["entry_conveyor"]: False,
+ self.a["emitter"]: False,
+ self.a["transfer_left"]: False,
+ self.a["transfer_right"]: False,
+ self.a["start_light"]: False,
+ self.a["stop_light"]: True,
+ self.a["reset_light"]: True,
+ })
+ if reset_rising:
+ self.sorted_count = 0
+ self.tall_count = 0
+ self.short_count = 0
+ self.total_count = 0
+ self._write({self.a["counter"]: 0})
+ self.state = State.INIT
+ logger.info("STOPPED -> INIT: reset pressed (counters cleared)")
+
+ elif self.state == State.ESTOP:
+ # Hold all outputs off, red light on
+ self._all_outputs_off()
+ self._write({self.a["stop_light"]: True})
+ # Exit: e-stop released AND reset rising edge
+ if not estop_engaged and reset_rising:
+ self._write({self.a["stop_light"]: False, self.a["reset_light"]: False})
+ self.state = State.INIT
+ logger.info("ESTOP -> INIT: e-stop released + reset pressed")
+
+ # --- Update edge detection state (MUST be after all state logic) ---
+ self._prev_start = start_active
+ self._prev_stop = stop_active
+ self._prev_reset = reset_active
+ self._prev_at_left_exit = at_left_exit
+ self._prev_at_right_exit = at_right_exit
+
+ if self.state != prev_state:
+ logger.info("State: %s -> %s", STATE_NAMES[prev_state], STATE_NAMES[self.state])
+
+ self._update_snapshot()
+
+ def stop(self):
+ """Graceful shutdown."""
+ self.state = State.STOPPED
+ self._all_outputs_off()
+ self._write({self.a["stop_light"]: True})
+ logger.info("Controller stopped. Sorted %d parts (%d tall, %d short).",
+ self.sorted_count, self.tall_count, self.short_count)
+
+
+# ---------------------------------------------------------------------------
+# Optional HTTP API for diagnosis integration
+# ---------------------------------------------------------------------------
+def start_api_server(controller: SortingController, port: int = 8765):
+ """Start a minimal HTTP server exposing /api/plc/live."""
+ from http.server import HTTPServer, BaseHTTPRequestHandler
+
+ class Handler(BaseHTTPRequestHandler):
+ def log_message(self, *args):
+ pass
+
+ def do_GET(self):
+ if self.path in ("/api/plc/live", "/api/plc/live/"):
+ snapshot = controller.get_snapshot()
+ body = json.dumps(snapshot, default=str).encode()
+ self.send_response(200)
+ self.send_header("Content-Type", "application/json")
+ self.send_header("Access-Control-Allow-Origin", "*")
+ self.end_headers()
+ self.wfile.write(body)
+ elif self.path in ("/health", "/health/"):
+ self.send_response(200)
+ self.send_header("Content-Type", "application/json")
+ self.end_headers()
+ self.wfile.write(json.dumps({"status": "ok"}).encode())
+ else:
+ self.send_response(404)
+ self.end_headers()
+
+ server = HTTPServer(("0.0.0.0", port), Handler)
+ thread = Thread(target=server.serve_forever, daemon=True)
+ thread.start()
+ logger.info("API server listening on http://0.0.0.0:%d/api/plc/live", port)
+ return server
+
+
+# ---------------------------------------------------------------------------
+# Tag discovery
+# ---------------------------------------------------------------------------
+def discover_tags(client: FactoryIOClient):
+ """Print all Factory I/O tags with actual field names."""
+ tags = client.get_all_tags()
+ if tags is None:
+ logger.error("Could not retrieve tags. Is Factory I/O running with Web API enabled?")
+ return
+
+ # Filter system vs scene tags
+ system = [t for t in tags if t["name"].startswith("FACTORY I/O")]
+ scene = [t for t in tags if not t["name"].startswith("FACTORY I/O")]
+
+ print(f"\n{'='*70}")
+ print(f"Scene Tags ({len(scene)} tags)")
+ print(f"{'='*70}")
+ for t in sorted(scene, key=lambda x: (x["kind"], x["name"])):
+ forced = " [FORCED]" if t.get("isForced") else ""
+ print(f" {t['kind']:<8} {t['type']:<5} addr={t['address']:<4} "
+ f"{t['name']:<25} = {t['value']}{forced}")
+ print(f" id={t['id']}")
+
+ print(f"\n{'='*70}")
+ print(f"System Tags ({len(system)} tags)")
+ print(f"{'='*70}")
+ for t in sorted(system, key=lambda x: x["name"]):
+ print(f" {t['kind']:<8} {t['name']:<30} = {t['value']}")
+
+ # Dump values via /api/tag/values to confirm format
+ print(f"\n{'='*70}")
+ print("Live Values (via /api/tag/values)")
+ print(f"{'='*70}")
+ values = client.read_all_values()
+ if values:
+ for name, val in sorted(values.items()):
+ print(f" {name:<30} = {val}")
+ else:
+ print(" (failed to read values)")
+
+
+# ---------------------------------------------------------------------------
+# Main
+# ---------------------------------------------------------------------------
+def main():
+ parser = argparse.ArgumentParser(description="Factory I/O Sorting Controller (Python Soft PLC)")
+ parser.add_argument("--api-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410"))
+ parser.add_argument("--config", default="config/sorting_tags.yaml")
+ parser.add_argument("--discover", action="store_true", help="Dump all tags and exit")
+ parser.add_argument("--dry-run", action="store_true", help="Read-only mode")
+ parser.add_argument("--interval", type=int, default=0, help="Override poll interval (ms)")
+ parser.add_argument("--port", type=int, default=8765, help="API server port")
+ parser.add_argument("--no-api", action="store_true", help="Disable API server")
+ args = parser.parse_args()
+
+ config = load_config(args.config)
+ api_url = args.api_url or config["api_url"]
+ interval_ms = args.interval or config["poll_interval_ms"]
+
+ client = FactoryIOClient(base_url=api_url)
+
+ if not client.check_connection():
+ logger.error("Cannot connect to Factory I/O at %s", api_url)
+ logger.info("Enable: press \\ in Factory I/O, type: app.web_server = True")
+ sys.exit(1)
+
+ logger.info("Connected to Factory I/O at %s", api_url)
+
+ # Check if the scene is actually running (F5)
+ all_tags = client.get_all_tags()
+ if all_tags:
+ running_tag = next((t for t in all_tags if t["name"] == "FACTORY I/O (Running)"), None)
+ if running_tag and not running_tag.get("value"):
+ logger.warning("Scene is NOT running — press F5 in Factory I/O to start the scene")
+
+ if args.discover:
+ discover_tags(client)
+ return
+
+ controller = SortingController(client, config, dry_run=args.dry_run)
+
+ api_server = None
+ if not args.no_api:
+ api_server = start_api_server(controller, port=args.port)
+
+ stop_event = Event()
+
+ def on_signal(signum, frame):
+ logger.info("Shutdown signal received")
+ stop_event.set()
+
+ signal.signal(signal.SIGINT, on_signal)
+ signal.signal(signal.SIGTERM, on_signal)
+
+ logger.info("Starting sorting controller (poll=%dms, dry_run=%s)", interval_ms, args.dry_run)
+ cycle = 0
+ interval_s = interval_ms / 1000.0
+
+ while not stop_event.is_set():
+ controller.tick()
+ cycle += 1
+
+ if cycle % max(1, int(10000 / interval_ms)) == 0:
+ snap = controller.get_snapshot()
+ logger.info(
+ "Cycle %d | state=%s | sorted=%d (tall=%d short=%d) | total=%d | uptime=%.0fs",
+ cycle, snap["state"], snap["sorted_count"],
+ snap["tall_count"], snap["short_count"],
+ snap["total_count"], snap["uptime_s"],
+ )
+
+ stop_event.wait(interval_s)
+
+ controller.stop()
+ if api_server:
+ api_server.shutdown()
+
+
+if __name__ == "__main__":
+ main()