From 98d0357c70bffe89d23fba451ad741292c992644 Mon Sep 17 00:00:00 2001 From: Mikecranesync Date: Thu, 5 Mar 2026 11:26:18 -0500 Subject: [PATCH] feat(capture): crash-safe rolling chunk recording with stop file Rewrite record_session() to write 8s MP4 chunks on-the-fly instead of one big file. Each chunk is released before the next starts, so a crash only loses the current partial chunk. Stop mechanism uses a sentinel file (.stop_recording) instead of Ctrl+C, which doesn't work reliably from Claude Code's Bash tool. - Add stop_recording() helper and `capture_fio.py stop` CLI subcommand - Update visual_test_loop.py run_session() to use rolling chunks directly - Include sim controllers and cookoff source files Co-Authored-By: Claude Opus 4.6 --- cookoff/FIXES.md | 159 +++++ cookoff/__init__.py | 1 + cookoff/capture_fio.py | 239 ++++++++ cookoff/diagnosis_engine.py | 9 +- cookoff/prompts/factory_diagnosis.yaml | 26 + cookoff/visual_test_loop.py | 673 +++++++++++++++++++++ sim/fault_injector.py | 231 ++++++++ sim/sorting_controller.py | 791 +++++++++++++++++++++++++ 8 files changed, 2127 insertions(+), 2 deletions(-) create mode 100644 cookoff/FIXES.md create mode 100644 cookoff/__init__.py create mode 100644 cookoff/visual_test_loop.py create mode 100644 sim/fault_injector.py create mode 100644 sim/sorting_controller.py diff --git a/cookoff/FIXES.md b/cookoff/FIXES.md new file mode 100644 index 0000000..43b003c --- /dev/null +++ b/cookoff/FIXES.md @@ -0,0 +1,159 @@ +# FIXES.md — Cosmos Cookoff Training Record + +Labeled training tuples from the 2026-03-05 sorting controller rebuild session. +Each entry captures: **(observation, state, root cause, fix)** for future Layer 0 codification. + +--- + +## Fix 1: Controller crashes on startup — missing FactoryIOClient + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Factory I/O scene running but no sorting happening; Python traceback in terminal | +| **System state** | `sorting_controller.py` fails to import — `FactoryIOClient` undefined | +| **Root cause** | Original controller split across multiple files; monolithic rewrite needed single-file `FactoryIOClient` with Web API HTTP client | +| **Fix applied** | Rebuilt `sim/sorting_controller.py` from scratch — embedded `FactoryIOClient` class with httpx, id-based tag lookup, name→UUID mapping | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 2: Tag writes silently fail — wrong API contract + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Conveyor entry light stays OFF; boxes never move despite state machine showing RUNNING | +| **System state** | `write_tag()` returns 200 but Factory I/O ignores the values | +| **Root cause** | Factory I/O Web API expects `[{id, value}]` with UUIDs, not `{name: value}` dicts. Previous code used name-based writes. | +| **Fix applied** | Rewrote `FactoryIOClient.write_tag()` to use UUID lookup via `_name_to_id` map; PUT to `/api/tag/values` with `[{"id": uuid, "value": val}]` | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 3: Height sensors never trigger — input/output UUID collision + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Boxes reach sort zone but always go right (default); never sorted left for tall boxes | +| **System state** | `High sensor` and `Low sensor` always read `False` even when box is present | +| **Root cause** | Factory I/O has duplicate tag names for Input and Output. `_name_to_id` was storing the Output UUID, overwriting the Input UUID. Reads need the Input UUID. | +| **Fix applied** | Separated lookups: `_name_to_id` prefers Output (for writes), added `_input_name_to_id` for Input tags. `read_all_values()` returns Input values for sensor tags. | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 4: State machine stuck in SETTLING — settle counter never advances + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Box arrives at pallet sensor, conveyor stops, but nothing happens — box sits forever | +| **System state** | State = SETTLING, `settle_count` stays at 0 each scan | +| **Root cause** | `settle_count` was being reset to 0 at the top of `tick()` instead of being preserved across scans. Instance variable was shadowed by local variable. | +| **Fix applied** | Made `settle_count` a proper instance variable (`self._settle_count`) initialized in `__init__`, incremented in SETTLING state, reset on state transitions | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 5: Transfer arms don't activate — wrong tag names in write map + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Box sorted correctly (state goes to SORTING_LEFT) but no physical arm movement; box stays on conveyor | +| **System state** | State = SORTING_LEFT, but `Transf. left` Output tag stays False | +| **Root cause** | Tag name mismatch: code used `"Transfer left"` but Factory I/O scene uses `"Transf. left"` (abbreviated) | +| **Fix applied** | Updated all tag name references to match exact Factory I/O "Sorting by Height (Basic)" scene names: `"Transf. left"`, `"Transf. right"` | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 6: Watchdog timer fires during normal sort — too short timeout + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Box is mid-transfer on left arm, then suddenly arm retracts and state jumps to RUNNING | +| **System state** | `WATCHDOG_SCANS=50` fires before transfer completes (~2.5s at 20ms scan) | +| **Root cause** | Transfer arms in Factory I/O take 3-4 seconds to complete a full push. 50 scans at 20ms = 1s was too short. | +| **Fix applied** | Increased `WATCHDOG_SCANS` from 50 to 150 (3 seconds at 20ms scan rate), giving arms time to complete | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 7: Emergency stop doesn't halt — inverted logic + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Pressing E-stop button in Factory I/O has no effect; conveyor keeps running | +| **System state** | `Emergency stop` tag reads `True` when NOT pressed (NC contact), `False` when pressed | +| **Root cause** | E-stop in Factory I/O is normally-closed (NC). Code was checking `if emergency_stop == True` to trigger ESTOP, but True means safe. | +| **Fix applied** | Inverted the check: `if not tags["Emergency stop"]:` triggers ESTOP state. Added comment documenting NC logic. | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 8: Fault injector can't connect — missing force/release endpoints + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | `fault_injector.py --fault jam` exits with "Cannot force tag" error | +| **System state** | `FactoryIOClient` had no `force_tag()` or `release_tag()` methods | +| **Root cause** | Original rebuild focused on read/write only. Force/release are separate Factory I/O endpoints: PUT `/api/tag/values-force` and PUT `/api/tag/values-release`. | +| **Fix applied** | Added `force_tag(name, value)` → PUT `[{id, value}]` to `/api/tag/values-force` and `release_tag(name)` → PUT `["id"]` to `/api/tag/values-release` | +| **Files changed** | `sim/sorting_controller.py` | +| **Outcome** | verified working | + +--- + +## Fix 9: diagnosis_engine.py Windows encoding crash on R2 Unicode output + +| Field | Value | +|-------|-------| +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff setup | +| **Visual observation** | Cosmos R2 returns diagnosis but script crashes with `UnicodeEncodeError` when printing to terminal | +| **System state** | Windows console (cp1252) can't encode em-dash, bullet, and other Unicode chars in R2's chain-of-thought | +| **Root cause** | Python defaults to system encoding (cp1252 on Windows) for stdout. R2 generates UTF-8 output with rich Unicode punctuation. | +| **Fix applied** | Added `sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace")` at top of `diagnosis_engine.py` | +| **Files changed** | `cookoff/diagnosis_engine.py` | +| **Outcome** | verified working | + +--- + +## Session Summary + +| Metric | Value | +|--------|-------| +| **Total fixes** | 9 | +| **Date** | 2026-03-05 | +| **Session** | Cosmos Cookoff — sorting controller rebuild | +| **Components** | sorting_controller (7), diagnosis_engine (1), fault_injector (1) | +| **All verified** | Yes | + +### By Component + +| Component | Fixes | IDs | +|-----------|-------|-----| +| `sim/sorting_controller.py` | 7 | 1, 2, 3, 4, 5, 6, 7 | +| `sim/fault_injector.py` | 1 | 8 | +| `cookoff/diagnosis_engine.py` | 1 | 9 | diff --git a/cookoff/__init__.py b/cookoff/__init__.py new file mode 100644 index 0000000..d98188b --- /dev/null +++ b/cookoff/__init__.py @@ -0,0 +1 @@ +# cookoff — Cosmos Cookoff demo tools diff --git a/cookoff/capture_fio.py b/cookoff/capture_fio.py index 3c6b3ac..2d0c757 100644 --- a/cookoff/capture_fio.py +++ b/cookoff/capture_fio.py @@ -8,6 +8,8 @@ python cookoff/capture_fio.py record --duration 15 --label normal python cookoff/capture_fio.py record --duration 30 --label box_jam python cookoff/capture_fio.py screenshot --label snapshot_01 + python cookoff/capture_fio.py session --chunk-duration 8 + python cookoff/capture_fio.py stop python cookoff/capture_fio.py auto --scenarios normal,jam,stop --duration 20 Outputs to cookoff/clips/ as labeled MP4/PNG files. @@ -19,8 +21,11 @@ import time from pathlib import Path +import cv2 import mss import mss.tools +import numpy as np +import pygetwindow as gw # Add repo root to path for imports REPO_ROOT = Path(__file__).parent.parent @@ -32,9 +37,229 @@ TARGET_FPS = 4 FRAME_INTERVAL = 1.0 / TARGET_FPS +STOP_FILE = CLIPS_DIR / ".stop_recording" + + +def focus_factoryio(): + """Bring the Factory I/O window to the foreground. + + Returns the window object, or None if the window is not found. + """ + try: + windows = gw.getWindowsWithTitle("Factory IO") + if not windows: + # Try alternate title with hyphen + windows = gw.getWindowsWithTitle("Factory I/O") + if not windows: + print("WARNING: Factory I/O window not found -- capturing whatever is on screen") + return None + win = windows[0] + if win.isMinimized: + win.restore() + time.sleep(0.3) + win.activate() + time.sleep(0.5) + return win + except Exception as e: + print(f"WARNING: Could not focus Factory I/O window: {e}") + return None + + +def capture_clip( + duration_seconds: float = 8, + fps: int = TARGET_FPS, + label: str = "clip", + monitor_index: int = 1, +) -> Path: + """Record a short MP4 clip of Factory I/O for Cosmos R2. + + Uses OpenCV VideoWriter (mp4v codec) for direct frame-to-MP4 encoding + without needing ffmpeg. + """ + focus_factoryio() + + timestamp = time.strftime("%Y%m%d_%H%M%S") + output_path = CLIPS_DIR / f"{label}_{timestamp}.mp4" + + with mss.mss() as sct: + monitor = sct.monitors[monitor_index] + width, height = monitor["width"], monitor["height"] + + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + out = cv2.VideoWriter(str(output_path), fourcc, fps, (width, height)) + + frame_count = int(duration_seconds * fps) + interval = 1.0 / fps + + print(f"Recording: {duration_seconds}s at {fps} FPS ({frame_count} frames, {width}x{height})") + + for i in range(frame_count): + t0 = time.perf_counter() + img = np.array(sct.grab(monitor)) + frame = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + out.write(frame) + + if i % fps == 0: + elapsed_s = i / fps + print(f" {elapsed_s:.0f}s / {duration_seconds:.0f}s ({i}/{frame_count} frames)") + + elapsed = time.perf_counter() - t0 + time.sleep(max(0, interval - elapsed)) + + out.release() + + file_size = output_path.stat().st_size / (1024 * 1024) + print(f"Clip saved: {output_path} ({duration_seconds}s @ {fps}fps, {file_size:.1f} MB)") + return output_path + + +def record_session( + fps: int = TARGET_FPS, + label: str = "session", + monitor_index: int = 1, + chunk_duration: float = 8, +) -> list[Path]: + """Record Factory IO as rolling 8s chunks until stop file appears. + + Each chunk is a complete MP4 -- crash-safe, no data loss. + Stop by creating: cookoff/clips/.stop_recording + """ + # Clean up any stale stop file + STOP_FILE.unlink(missing_ok=True) + + focus_factoryio() + + timestamp = time.strftime("%Y%m%d_%H%M%S") + session_dir = CLIPS_DIR / f"{label}_{timestamp}" + session_dir.mkdir(parents=True, exist_ok=True) + + frames_per_chunk = int(chunk_duration * fps) + interval = 1.0 / fps + chunk_paths: list[Path] = [] + chunk_idx = 0 + + with mss.mss() as sct: + monitor = sct.monitors[monitor_index] + width, height = monitor["width"], monitor["height"] + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + + print(f"Recording session: {width}x{height} @ {fps} FPS, {chunk_duration}s chunks") + print(f"Output: {session_dir}") + print(f"To stop: create {STOP_FILE}\n") + + try: + while not STOP_FILE.exists(): + # Start new chunk + chunk_path = session_dir / f"chunk_{chunk_idx:03d}.mp4" + out = cv2.VideoWriter(str(chunk_path), fourcc, fps, (width, height)) + + for frame_i in range(frames_per_chunk): + t0 = time.perf_counter() + img = np.array(sct.grab(monitor)) + frame = cv2.cvtColor(img, cv2.COLOR_BGRA2BGR) + out.write(frame) + elapsed = time.perf_counter() - t0 + time.sleep(max(0, interval - elapsed)) + + out.release() # chunk is now a valid MP4 + chunk_paths.append(chunk_path) + + elapsed_total = (chunk_idx + 1) * chunk_duration + size_mb = chunk_path.stat().st_size / (1024 * 1024) + print(f" Chunk {chunk_idx}: {elapsed_total:.0f}s ({size_mb:.1f} MB)") + chunk_idx += 1 + + except KeyboardInterrupt: + pass + + # Clean up stop file + STOP_FILE.unlink(missing_ok=True) + + total_s = len(chunk_paths) * chunk_duration + print(f"\nSession complete: {len(chunk_paths)} chunks, {total_s:.0f}s total") + print(f"Chunks in: {session_dir}") + return chunk_paths + + +def stop_recording(): + """Signal a running record_session() to stop.""" + STOP_FILE.touch() + print(f"Stop signal sent: {STOP_FILE}") + + +def chunk_session(video_path: Path, chunk_duration: float = 8, output_dir: Path | None = None) -> list[dict]: + """Split an MP4 into fixed-length chunks using OpenCV (no ffmpeg needed). + + Returns list of chunk metadata dicts with keys: + chunk_file, start_time, end_time, duration + """ + cap = cv2.VideoCapture(str(video_path)) + if not cap.isOpened(): + print(f"ERROR: Cannot open {video_path}") + return [] + + fps = cap.get(cv2.CAP_PROP_FPS) or TARGET_FPS + total_frames = int(cap.get(cv2.CAP_PROP_FRAME_COUNT)) + width = int(cap.get(cv2.CAP_PROP_FRAME_WIDTH)) + height = int(cap.get(cv2.CAP_PROP_FRAME_HEIGHT)) + total_duration = total_frames / fps + + if output_dir is None: + output_dir = CLIPS_DIR / "chunks" + output_dir.mkdir(parents=True, exist_ok=True) + + fourcc = cv2.VideoWriter_fourcc(*"mp4v") + frames_per_chunk = int(chunk_duration * fps) + chunks = [] + chunk_idx = 0 + frame_num = 0 + + print(f"Chunking {video_path.name}: {total_duration:.1f}s total, {chunk_duration}s per chunk, {fps} FPS") + + while True: + start_time = chunk_idx * chunk_duration + if start_time >= total_duration: + break + + # Skip tiny tail chunks (< 3s) + remaining = total_duration - start_time + if remaining < 3: + break + + chunk_path = output_dir / f"{video_path.stem}_chunk{chunk_idx:03d}.mp4" + out = cv2.VideoWriter(str(chunk_path), fourcc, fps, (width, height)) + + frames_written = 0 + for _ in range(frames_per_chunk): + ret, frame = cap.read() + if not ret: + break + out.write(frame) + frames_written += 1 + frame_num += 1 + + out.release() + + if frames_written > 0: + end_time = start_time + (frames_written / fps) + chunks.append({ + "chunk_file": str(chunk_path), + "start_time": round(start_time, 2), + "end_time": round(end_time, 2), + "duration": round(frames_written / fps, 2), + }) + print(f" Chunk {chunk_idx+1}: {start_time:.0f}s-{end_time:.0f}s ({frames_written} frames)") + + chunk_idx += 1 + + cap.release() + print(f"Done: {len(chunks)} chunks created in {output_dir}") + return chunks + def capture_screenshot(label: str = "screenshot", monitor_index: int = 1) -> Path: """Capture a single screenshot and save as PNG.""" + focus_factoryio() timestamp = time.strftime("%Y%m%d_%H%M%S") filename = f"{label}_{timestamp}.png" output_path = CLIPS_DIR / filename @@ -151,6 +376,16 @@ def main(): rec.add_argument("--fps", type=int, default=TARGET_FPS, help="Frames per second") rec.add_argument("--monitor", type=int, default=1, help="Monitor index (1=primary)") + # Session command (rolling chunks until stop file) + sess = subparsers.add_parser("session", help="Record rolling chunks until stop file") + sess.add_argument("--label", default="session", help="Label for the session") + sess.add_argument("--fps", type=int, default=TARGET_FPS, help="Frames per second") + sess.add_argument("--monitor", type=int, default=1, help="Monitor index (1=primary)") + sess.add_argument("--chunk-duration", type=float, default=8, help="Seconds per chunk") + + # Stop command + subparsers.add_parser("stop", help="Signal a running session to stop") + # Auto command (multiple scenarios) auto = subparsers.add_parser("auto", help="Record multiple scenarios") auto.add_argument("--scenarios", default="normal,box_jam,conveyor_stop", @@ -163,6 +398,10 @@ def main(): capture_screenshot(label=args.label, monitor_index=args.monitor) elif args.command == "record": record_clip(duration=args.duration, label=args.label, fps=args.fps, monitor_index=args.monitor) + elif args.command == "session": + record_session(fps=args.fps, label=args.label, monitor_index=args.monitor, chunk_duration=args.chunk_duration) + elif args.command == "stop": + stop_recording() elif args.command == "auto": scenarios = [s.strip() for s in args.scenarios.split(",")] record_scenarios(scenarios, duration=args.duration) diff --git a/cookoff/diagnosis_engine.py b/cookoff/diagnosis_engine.py index 528a165..2441d43 100644 --- a/cookoff/diagnosis_engine.py +++ b/cookoff/diagnosis_engine.py @@ -33,8 +33,13 @@ import time # Fix Windows console encoding for Unicode output from R2 -sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") -sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") +if sys.platform == "win32" and hasattr(sys.stdout, "buffer"): + if getattr(sys.stdout, "encoding", "").lower() != "utf-8": + try: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") + except (AttributeError, ValueError): + pass from pathlib import Path from typing import Optional diff --git a/cookoff/prompts/factory_diagnosis.yaml b/cookoff/prompts/factory_diagnosis.yaml index 99cca93..fe865fd 100644 --- a/cookoff/prompts/factory_diagnosis.yaml +++ b/cookoff/prompts/factory_diagnosis.yaml @@ -72,3 +72,29 @@ user_describe: | Your observations about the physical scene. Then provide your description. + +# User prompt for visual test loop — diagnosis + code prescription +# Variables: {plc_registers}, {fault_analysis}, {code_snippet} +user_prescribe: | + ## Live PLC Register Data + {plc_registers} + + ## Automated Fault Analysis + {fault_analysis} + + ## Current Controller Code (sorting state machine tick method) + ```python + {code_snippet} + ``` + + ## Task + You can see the factory simulation AND the controller code that drives it. + 1. Describe what you SEE happening (boxes moving, piling up, stopped, etc.) + 2. Cross-reference with PLC tags to identify the fault + 3. State your DIAGNOSIS: what is wrong and why + 4. Give a PRESCRIPTION: the exact code change needed (function, variable, logic) + 5. Rate confidence: HIGH / MEDIUM / LOW + + Reason step-by-step about visual observations, tag values, and code logic. + + Then provide your diagnosis and prescription clearly. diff --git a/cookoff/visual_test_loop.py b/cookoff/visual_test_loop.py new file mode 100644 index 0000000..9f2d37d --- /dev/null +++ b/cookoff/visual_test_loop.py @@ -0,0 +1,673 @@ +#!/usr/bin/env python3 +""" +FactoryLM Visual Test Loop — AI-Driven Fault Diagnosis Demo +============================================================ +Cosmos R2 watches Factory I/O, diagnoses faults, and prescribes code fixes. + +Loop: record clip -> read FIO tags -> send to Cosmos R2 -> display diagnosis + +Usage: + python cookoff/visual_test_loop.py # Interactive menu + python cookoff/visual_test_loop.py --scenario jam # Single scenario + python cookoff/visual_test_loop.py --all # Run all scenarios + python cookoff/visual_test_loop.py --dry-run # Skip Cosmos call + python cookoff/visual_test_loop.py --vllm-url http://... # Custom vLLM endpoint + python cookoff/visual_test_loop.py --fio-url http://... # Custom FIO API + +Requires: + - Factory I/O running with Web API enabled (default :7410) + - vLLM serving Cosmos R2 (default localhost:8000) +""" + +import argparse +import io +import json +import os +import sys +import time +from pathlib import Path + +# Fix Windows console encoding (skip if already wrapped by diagnosis_engine) +if sys.platform == "win32" and hasattr(sys.stdout, "buffer"): + if getattr(sys.stdout, "encoding", "").lower() != "utf-8": + try: + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8", errors="replace") + sys.stderr = io.TextIOWrapper(sys.stderr.buffer, encoding="utf-8", errors="replace") + except (AttributeError, ValueError): + pass + +# Add repo root to path +REPO_ROOT = Path(__file__).resolve().parent.parent +sys.path.insert(0, str(REPO_ROOT)) + +from cookoff.capture_fio import capture_clip, capture_screenshot, chunk_session, record_session, stop_recording +from cookoff.diagnosis_engine import ( + diagnose, + encode_media, + format_fault_analysis, + format_plc_registers, + load_prompts, + VLLM_URL, + MODEL_NAME, + DEFAULT_TEMPERATURE, + DEFAULT_TOP_P, + DEFAULT_MAX_TOKENS, +) +from sim.sorting_controller import FactoryIOClient +from sim.fault_injector import FaultInjector, FAULTS + +import requests +import yaml + + +# --------------------------------------------------------------------------- +# Fault scenarios for the visual test loop +# --------------------------------------------------------------------------- +SCENARIOS = { + "normal": { + "id": None, + "note": "Baseline -- healthy operation", + "settle_time": 2.0, + }, + "pile_up": { + "id": "jam", + "note": "Pallet sensor stuck ON -- boxes pile up", + "settle_time": 3.0, + }, + "blind": { + "id": "sensor", + "note": "Height sensors dead -- all boxes route wrong", + "settle_time": 2.0, + }, + "estop": { + "id": "estop", + "note": "Emergency stop engaged -- immediate halt", + "settle_time": 1.0, + }, + "stuck_left": { + "id": "sorter_stuck_left", + "note": "Left transfer arm locked -- all boxes go left", + "settle_time": 2.0, + }, + "no_parts": { + "id": "emitter_off", + "note": "No new parts entering system", + "settle_time": 3.0, + }, +} + + +# --------------------------------------------------------------------------- +# Visual Test Loop +# --------------------------------------------------------------------------- +class VisualTestLoop: + """Three-mode visual testing harness for Cosmos R2 + Factory I/O. + + Modes: + 1. Observe-only (--dry-run): capture + tags, no Cosmos call + 2. Diagnose: capture + tags + Cosmos R2 diagnosis + 3. Prescribe: capture + tags + code snippet + Cosmos R2 diagnosis + fix + """ + + def __init__( + self, + vllm_url: str = VLLM_URL, + fio_url: str = "http://localhost:7410", + controller_path: str = "sim/sorting_controller.py", + dry_run: bool = False, + obs_clip: str | None = None, + clip_duration: float = 8, + chunk_duration: float = 8, + ): + self.vllm_url = vllm_url + self.fio_url = fio_url + self.controller_path = Path(REPO_ROOT) / controller_path + self.dry_run = dry_run + self.obs_clip = obs_clip + self.clip_duration = clip_duration + self.chunk_duration = chunk_duration + self.iteration = 0 + self.results: list[dict] = [] + + # Initialize Factory I/O client and fault injector + self.fio = FactoryIOClient(base_url=fio_url) + self.injector = FaultInjector(self.fio) + + def check_prerequisites(self) -> bool: + """Verify Factory I/O is reachable.""" + if not self.fio.check_connection(): + print(f"ERROR: Cannot connect to Factory I/O at {self.fio_url}") + print("Make sure Factory I/O is running with Web API enabled (Edit > Options > Web API).") + return False + print(f"Factory I/O connected: {self.fio_url}") + + if not self.dry_run: + try: + r = requests.get( + self.vllm_url.replace("/chat/completions", "/models"), + timeout=5, + ) + if r.status_code == 200: + print(f"vLLM connected: {self.vllm_url}") + else: + print(f"WARNING: vLLM returned {r.status_code} -- diagnoses may fail") + except requests.ConnectionError: + print(f"WARNING: Cannot reach vLLM at {self.vllm_url}") + print("Diagnoses will fail. Use --dry-run for capture-only mode.") + + return True + + def run_iteration(self, scenario_name: str) -> dict: + """One full observe -> diagnose -> prescribe cycle.""" + self.iteration += 1 + scenario = SCENARIOS.get(scenario_name) + if not scenario: + print(f"ERROR: Unknown scenario '{scenario_name}'. Available: {list(SCENARIOS.keys())}") + return {"error": f"Unknown scenario: {scenario_name}"} + + fault_id = scenario["id"] + settle_time = scenario.get("settle_time", 2.0) + + print(f"\n{'='*70}") + print(f" VISUAL TEST LOOP -- Iteration {self.iteration}") + print(f" Scenario: {scenario_name} ({scenario['note']})") + print(f"{'='*70}\n") + + # 1. Inject fault (if any) + if fault_id: + print(f"[1/6] Injecting fault: {fault_id}") + self.injector.inject(fault_id) + print(f" Settling for {settle_time}s...") + time.sleep(settle_time) + else: + print("[1/6] No fault injection (baseline)") + + # 2. Record clip (or use pre-recorded) + if self.obs_clip: + print(f"[2/6] Using pre-recorded clip: {self.obs_clip}") + frame_path = Path(self.obs_clip) + else: + print(f"[2/6] Recording {self.clip_duration}s clip...") + frame_path = capture_clip( + duration_seconds=self.clip_duration, + label=f"iter{self.iteration}_{scenario_name}", + ) + print(f" Media: {frame_path}") + + # 3. Read live FIO tags + print("[3/6] Reading Factory I/O tags...") + tags = self.fio.read_all_values() + if tags: + # Show compact tag summary + bool_tags = {k: v for k, v in tags.items() if isinstance(v, bool)} + on_tags = [k for k, v in bool_tags.items() if v] + print(f" {len(tags)} tags read. Active: {on_tags or 'none'}") + else: + print(" WARNING: Could not read tags") + tags = {} + + # 4. Extract controller code snippet + print("[4/6] Extracting tick() method...") + code_snippet = self._extract_tick_method() + print(f" {len(code_snippet.splitlines())} lines extracted") + + # 5. Call Cosmos R2 (or skip in dry-run) + result = { + "iteration": self.iteration, + "scenario": scenario_name, + "fault_id": fault_id, + "note": scenario["note"], + "frame_path": str(frame_path), + "tags": tags, + "code_lines": len(code_snippet.splitlines()), + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + } + + if self.dry_run: + print("[5/6] DRY RUN -- skipping Cosmos R2 call") + result["reasoning"] = "(dry run)" + result["diagnosis"] = "(dry run -- no Cosmos R2 call)" + result["prescription"] = "(dry run)" + result["confidence"] = "N/A" + result["elapsed_s"] = 0 + result["tokens"] = 0 + else: + print("[5/6] Sending to Cosmos R2...") + cosmos_result = self._call_cosmos(frame_path, tags, code_snippet, scenario_name) + result.update(cosmos_result) + + # 6. Print formatted output + print(f"\n[6/6] Results:") + self._print_result(result) + + # 7. Clear fault + if fault_id: + self.injector.clear(fault_id) + print(f"\nFault '{fault_id}' cleared.") + + # 8. Log to results + self.results.append(result) + return result + + def _call_cosmos(self, frame_path: Path, tags: dict, code_snippet: str, scenario_name: str) -> dict: + """Send frame + tags + code to Cosmos R2 and parse response.""" + prompts = load_prompts() + system_prompt = prompts["system"] + + # Build the prescribe prompt + user_template = prompts.get("user_prescribe", prompts["user_diagnosis"]) + plc_registers = format_plc_registers(tags) if tags else "No PLC data available" + fault_analysis = format_fault_analysis(tags) if tags else "No fault data available" + + try: + user_prompt = user_template.format( + plc_registers=plc_registers, + fault_analysis=fault_analysis, + code_snippet=code_snippet, + ) + except KeyError: + # Fall back to diagnosis template if prescribe template missing code_snippet + user_prompt = prompts["user_diagnosis"].format( + plc_registers=plc_registers, + fault_analysis=fault_analysis, + ) + + # Encode frame/clip — encode_media returns correct MIME for .mp4 vs .png + b64, mime = encode_media(str(frame_path)) + if mime.startswith("video"): + media_content = { + "type": "video_url", + "video_url": {"url": f"data:{mime};base64,{b64}"}, + } + else: + media_content = { + "type": "image_url", + "image_url": {"url": f"data:{mime};base64,{b64}"}, + } + user_content = [media_content, {"type": "text", "text": user_prompt}] + + payload = { + "model": MODEL_NAME, + "messages": [ + {"role": "system", "content": system_prompt}, + {"role": "user", "content": user_content}, + ], + "max_tokens": DEFAULT_MAX_TOKENS, + "temperature": DEFAULT_TEMPERATURE, + "top_p": DEFAULT_TOP_P, + } + + # Tell vLLM how to sample video frames + if mime.startswith("video"): + payload["media_io_kwargs"] = {"video": {"fps": 1.0}} + + t_start = time.time() + try: + response = requests.post( + self.vllm_url, + headers={"Content-Type": "application/json"}, + json=payload, + timeout=300, + ) + elapsed = time.time() - t_start + + if response.status_code != 200: + return { + "error": f"HTTP {response.status_code}: {response.text[:500]}", + "elapsed_s": round(elapsed, 1), + "reasoning": "", + "diagnosis": f"Error: HTTP {response.status_code}", + "prescription": "", + "confidence": "N/A", + "tokens": 0, + } + + data = response.json() + content = data["choices"][0]["message"]["content"] + usage = data.get("usage", {}) + + # Parse blocks + reasoning = "" + diagnosis_text = content + if "" in content and "" in content: + think_start = content.index("") + len("") + think_end = content.index("") + reasoning = content[think_start:think_end].strip() + diagnosis_text = content[think_end + len(""):].strip() + + # Try to extract confidence from response + confidence = "UNKNOWN" + for level in ["HIGH", "MEDIUM", "LOW"]: + if level in diagnosis_text.upper(): + confidence = level + break + + return { + "reasoning": reasoning, + "diagnosis": diagnosis_text, + "prescription": self._extract_prescription(diagnosis_text), + "confidence": confidence, + "elapsed_s": round(elapsed, 1), + "tokens": usage.get("completion_tokens", 0), + "raw_response": content, + } + + except requests.ConnectionError as e: + return { + "error": f"Connection failed: {e}", + "elapsed_s": 0, + "reasoning": "", + "diagnosis": "Error: Cannot connect to vLLM", + "prescription": "", + "confidence": "N/A", + "tokens": 0, + } + + def _extract_prescription(self, diagnosis_text: str) -> str: + """Extract the PRESCRIPTION section from diagnosis text.""" + lines = diagnosis_text.splitlines() + in_prescription = False + prescription_lines = [] + + for line in lines: + upper = line.strip().upper() + if upper.startswith("PRESCRIPTION") or upper.startswith("**PRESCRIPTION"): + in_prescription = True + continue + elif in_prescription and ( + upper.startswith("CONFIDENCE") or upper.startswith("**CONFIDENCE") + or upper.startswith("DIAGNOSIS") or upper.startswith("**DIAGNOSIS") + ): + break + elif in_prescription: + prescription_lines.append(line) + + return "\n".join(prescription_lines).strip() if prescription_lines else "(no prescription section found)" + + def _extract_tick_method(self) -> str: + """Extract the tick() method from sorting_controller.py for the prompt.""" + try: + source = self.controller_path.read_text(encoding="utf-8") + except FileNotFoundError: + return "# Controller file not found" + + lines = source.splitlines() + # Find "def tick(self):" and extract until next "def " at same indent level + start = None + for i, line in enumerate(lines): + if "def tick(self)" in line: + start = i + break + + if start is None: + return "# tick() method not found in controller" + + end = len(lines) + for i in range(start + 1, len(lines)): + # Next method at same indent level (4 spaces for class method) + stripped = lines[i] + if stripped.startswith(" def ") and i > start + 5: + end = i + break + + # Cap at ~120 lines to avoid blowing context + if end - start > 120: + end = start + 120 + + return "\n".join(lines[start:end]) + + def _print_result(self, result: dict): + """Print formatted iteration result.""" + print(f"\n{'- '*35}") + print(f"Scenario: {result['scenario']} ({result.get('note', '')})") + print(f"Frame: {result['frame_path']}") + + if result.get("reasoning") and result["reasoning"] != "(dry run)": + print(f"\nCOSMOS R2 CHAIN-OF-THOUGHT:") + # Truncate reasoning for display + reasoning = result["reasoning"] + if len(reasoning) > 500: + reasoning = reasoning[:500] + "..." + for line in reasoning.splitlines(): + print(f" {line}") + + if result.get("diagnosis") and result["diagnosis"] != "(dry run -- no Cosmos R2 call)": + print(f"\nDIAGNOSIS:") + print(f" {result['diagnosis'][:300]}") + + if result.get("prescription") and result["prescription"] != "(dry run)": + print(f"\nPRESCRIPTION:") + for line in result["prescription"].splitlines()[:10]: + print(f" {line}") + + confidence = result.get("confidence", "N/A") + elapsed = result.get("elapsed_s", 0) + tokens = result.get("tokens", 0) + print(f"\nConfidence: {confidence} | Elapsed: {elapsed}s | Tokens: {tokens}") + + if result.get("error"): + print(f"\nERROR: {result['error']}") + + def run_session(self): + """Record continuous session, chunk it, diagnose each chunk.""" + if self.obs_clip: + # Pre-recorded file -- still needs chunking + session_path = Path(self.obs_clip) + print(f"Using pre-recorded session: {session_path}") + print(f"\nSplitting into {self.chunk_duration}s chunks...") + chunks = chunk_session(session_path, chunk_duration=self.chunk_duration) + print(f"Split into {len(chunks)} chunks.\n") + if not chunks: + print("ERROR: No chunks produced. Recording may be too short.") + return + chunk_paths = [Path(c["chunk_file"]) for c in chunks] + else: + # Live recording -- already produces rolling chunks + print("Recording rolling chunks. To stop:") + print(" python cookoff/capture_fio.py stop") + chunk_paths = record_session( + label="session", + chunk_duration=self.chunk_duration, + ) + + if not chunk_paths: + print("ERROR: No chunks produced.") + return + + # Diagnose each chunk + for i, chunk_path in enumerate(chunk_paths): + print(f"\n{'='*70}") + print(f" CHUNK {i+1}/{len(chunk_paths)}: {chunk_path.name}") + print(f"{'='*70}") + + # Read live tags + tags = self.fio.read_all_values() or {} + if tags: + on_tags = [k for k, v in tags.items() if isinstance(v, bool) and v] + print(f" Tags: {len(tags)} total, active: {on_tags or 'none'}") + + if self.dry_run: + print(f" DRY RUN -- skipping R2 for {chunk_path.name}") + self.results.append({ + "iteration": i + 1, + "scenario": f"chunk_{i+1}", + "frame_path": str(chunk_path), + "tags": tags, + "diagnosis": "(dry run)", + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + }) + continue + + code_snippet = self._extract_tick_method() + cosmos_result = self._call_cosmos(chunk_path, tags, code_snippet, f"chunk_{i+1}") + + result = { + "iteration": i + 1, + "scenario": f"chunk_{i+1}", + "frame_path": str(chunk_path), + "tags": tags, + "code_lines": len(code_snippet.splitlines()), + "timestamp": time.strftime("%Y-%m-%d %H:%M:%S"), + **cosmos_result, + } + self.results.append(result) + self._print_result(result) + + self._print_summary() + + def run_all(self): + """Run all scenarios sequentially.""" + for name in SCENARIOS: + self.run_iteration(name) + if name != list(SCENARIOS.keys())[-1]: + print("\n Waiting 3s before next scenario...") + time.sleep(3.0) + self._print_summary() + + def run_interactive(self): + """Interactive menu for running scenarios.""" + print("\n" + "="*70) + print(" FACTORYLM VISUAL TEST LOOP -- Interactive Mode") + print("="*70) + print(f"\n Factory I/O: {self.fio_url}") + print(f" vLLM: {self.vllm_url}") + print(f" Controller: {self.controller_path}") + print(f" Dry run: {self.dry_run}") + print() + + while True: + print("\nAvailable scenarios:") + for i, (name, sc) in enumerate(SCENARIOS.items(), 1): + fault = sc["id"] or "none" + print(f" {i}. {name:<15} (fault: {fault}) -- {sc['note']}") + + print(f"\n a. Run ALL scenarios") + print(f" s. Show summary") + print(f" q. Quit") + + try: + choice = input("\nSelect scenario (name, number, a/s/q): ").strip().lower() + except (EOFError, KeyboardInterrupt): + print() + break + + if choice in ("q", "quit", "exit"): + break + elif choice in ("a", "all"): + self.run_all() + elif choice in ("s", "summary"): + self._print_summary() + elif choice in SCENARIOS: + self.run_iteration(choice) + self._prompt_apply_fix() + elif choice.isdigit(): + idx = int(choice) - 1 + names = list(SCENARIOS.keys()) + if 0 <= idx < len(names): + self.run_iteration(names[idx]) + self._prompt_apply_fix() + else: + print(f" Invalid number. Choose 1-{len(names)}.") + else: + print(f" Unknown: '{choice}'") + + self._print_summary() + + def _prompt_apply_fix(self): + """Ask operator whether to apply the prescribed fix.""" + if self.dry_run or not self.results: + return + last = self.results[-1] + if last.get("prescription") and last["prescription"] != "(no prescription section found)": + try: + answer = input("\nApply fix? (y/n/skip): ").strip().lower() + except (EOFError, KeyboardInterrupt): + return + if answer == "y": + print(" -> Manual fix application mode: review prescription above and edit code.") + print(" (Automatic code mutation not implemented -- safe by design)") + elif answer == "n": + print(" -> Fix skipped.") + + def _print_summary(self): + """Print summary of all iterations.""" + if not self.results: + print("\nNo iterations completed yet.") + return + + print(f"\n{'='*70}") + print(f" SESSION SUMMARY -- {len(self.results)} iterations") + print(f"{'='*70}") + + for r in self.results: + status = "OK" if not r.get("error") else "ERR" + conf = r.get("confidence", "N/A") + elapsed = r.get("elapsed_s", 0) + print(f" [{status}] iter{r['iteration']} {r['scenario']:<15} " + f"confidence={conf:<7} elapsed={elapsed}s") + + # Save results to JSON + out_path = REPO_ROOT / "cookoff" / "clips" / f"session_{time.strftime('%Y%m%d_%H%M%S')}.json" + out_path.parent.mkdir(parents=True, exist_ok=True) + with open(out_path, "w") as f: + # Strip raw_response and tags to keep file small + slim = [] + for r in self.results: + entry = {k: v for k, v in r.items() if k not in ("raw_response", "tags")} + slim.append(entry) + json.dump(slim, f, indent=2, default=str) + print(f"\n Results saved: {out_path}") + + +# --------------------------------------------------------------------------- +# CLI +# --------------------------------------------------------------------------- +def main(): + parser = argparse.ArgumentParser( + description="FactoryLM Visual Test Loop -- Cosmos R2 + Factory I/O", + formatter_class=argparse.RawDescriptionHelpFormatter, + ) + parser.add_argument("--scenario", help="Run a single scenario by name") + parser.add_argument("--all", action="store_true", help="Run all scenarios") + parser.add_argument("--dry-run", action="store_true", help="Skip Cosmos R2 call") + parser.add_argument("--vllm-url", default=os.getenv("VLLM_URL", VLLM_URL), + help="vLLM endpoint URL") + parser.add_argument("--fio-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410"), + help="Factory I/O Web API URL") + parser.add_argument("--controller", default="sim/sorting_controller.py", + help="Path to sorting controller (relative to repo root)") + parser.add_argument("--obs-clip", default=None, + help="Path to a pre-recorded clip (skip live capture)") + parser.add_argument("--clip-duration", type=float, default=8, + help="Seconds to record per clip (default: 8)") + parser.add_argument("--session", action="store_true", + help="Record continuous session, chunk it, diagnose each chunk") + parser.add_argument("--chunk-duration", type=float, default=8, + help="Seconds per chunk in session mode (default: 8)") + args = parser.parse_args() + + loop = VisualTestLoop( + vllm_url=args.vllm_url, + fio_url=args.fio_url, + controller_path=args.controller, + dry_run=args.dry_run, + obs_clip=args.obs_clip, + clip_duration=args.clip_duration, + chunk_duration=args.chunk_duration, + ) + + if not loop.check_prerequisites(): + sys.exit(1) + + if args.session: + loop.run_session() + elif args.all: + loop.run_all() + elif args.scenario: + loop.run_iteration(args.scenario) + else: + loop.run_interactive() + + +if __name__ == "__main__": + main() diff --git a/sim/fault_injector.py b/sim/fault_injector.py new file mode 100644 index 0000000..318d7ba --- /dev/null +++ b/sim/fault_injector.py @@ -0,0 +1,231 @@ +""" +Factory I/O Fault Injector — Demo fault injection via Web API. + +Uses Factory I/O's force/release endpoints to simulate equipment faults +for the Cosmos Cookoff diagnosis demo. + +Usage: + python sim/fault_injector.py --fault sensor # Force sensor failure + python sim/fault_injector.py --fault jam # Simulate conveyor jam + python sim/fault_injector.py --fault estop # Trigger E-stop + python sim/fault_injector.py --fault motor # Motor overload (stop conveyor) + python sim/fault_injector.py --clear # Release all forced tags + python sim/fault_injector.py --list # Show available faults + python sim/fault_injector.py --interactive # Interactive mode +""" + +import argparse +import json +import logging +import os +import sys +import time +from pathlib import Path + +_repo_root = str(Path(__file__).resolve().parent.parent) +if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) + +from sim.sorting_controller import FactoryIOClient, load_config + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger("fault_injector") + + +# Fault definitions using actual Factory I/O tag names for "Sorting by Height (Basic)" +FAULTS = { + "sensor": { + "name": "Sensor Failure", + "description": "Height sensors forced OFF — parts pass unsorted", + "force": {"High sensor": False, "Low sensor": False}, + }, + "jam": { + "name": "Conveyor Jam", + "description": "Pallet sensor stuck ON — appears as permanent blockage", + "force": {"Pallet sensor": True}, + }, + "estop": { + "name": "Emergency Stop", + "description": "E-stop forced active (False = pressed in Factory I/O)", + "force": {"Emergency stop": False}, + }, + "motor": { + "name": "Motor Failure", + "description": "Entry conveyor stops — parts pile up", + "force": {"Conveyor entry": False}, + }, + "sorter_stuck_left": { + "name": "Sorter Stuck Left", + "description": "Left transfer arm stays engaged — all parts go left", + "force": {"Transf. left": True, "Transf. right": False}, + }, + "sorter_stuck_right": { + "name": "Sorter Stuck Right", + "description": "Right transfer arm stays engaged — all parts go right", + "force": {"Transf. right": True, "Transf. left": False}, + }, + "blind": { + "name": "Sensor Blind", + "description": "Low sensor forced OFF — controller can't detect any parts", + "force": {"Low sensor": False}, + }, + "emitter_off": { + "name": "No Parts", + "description": "Emitter forced OFF — no new parts entering system", + "force": {"Emitter": False}, + }, +} + + +class FaultInjector: + """Inject and clear faults via Factory I/O Web API force endpoints.""" + + def __init__(self, client: FactoryIOClient): + self.fio = client + self._active_faults: dict[str, list[str]] = {} + + def inject(self, fault_id: str, duration: float = 0) -> bool: + if fault_id not in FAULTS: + logger.error("Unknown fault: %s. Available: %s", fault_id, list(FAULTS.keys())) + return False + + fault = FAULTS[fault_id] + logger.info("Injecting: %s — %s", fault["name"], fault["description"]) + + forced_tags = [] + for tag_name, value in fault["force"].items(): + if self.fio.force_tag(tag_name, value): + forced_tags.append(tag_name) + logger.info(" Forced: %s = %s", tag_name, value) + else: + logger.warning(" FAILED to force: %s", tag_name) + + self._active_faults[fault_id] = forced_tags + + if duration > 0: + logger.info("Auto-clear in %.1fs", duration) + time.sleep(duration) + self.clear(fault_id) + + return len(forced_tags) > 0 + + def clear(self, fault_id: str = "") -> bool: + if fault_id and fault_id in self._active_faults: + for tag_name in self._active_faults.pop(fault_id): + self.fio.release_tag(tag_name) + logger.info(" Released: %s", tag_name) + logger.info("Cleared: %s", fault_id) + return True + elif not fault_id: + # Clear everything + all_tags = set() + for tags in self._active_faults.values(): + all_tags.update(tags) + for tag_name in all_tags: + self.fio.release_tag(tag_name) + logger.info(" Released: %s", tag_name) + # Safety: also release all known fault tags + for fault in FAULTS.values(): + for tag_name in fault["force"]: + self.fio.release_tag(tag_name) + count = len(self._active_faults) + self._active_faults.clear() + logger.info("Cleared all faults (%d)", count) + return True + else: + logger.warning("Fault '%s' not active", fault_id) + return False + + @property + def active_faults(self) -> list[str]: + return list(self._active_faults.keys()) + + def status(self) -> dict: + return { + "active": [ + {"id": fid, "name": FAULTS[fid]["name"], "forced_tags": tags} + for fid, tags in self._active_faults.items() + ], + "available": [ + {"id": fid, "name": f["name"], "description": f["description"]} + for fid, f in FAULTS.items() + ], + } + + +def main(): + parser = argparse.ArgumentParser(description="Factory I/O Fault Injector") + parser.add_argument("--api-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410")) + parser.add_argument("--config", default="config/sorting_tags.yaml") + parser.add_argument("--fault", help="Fault ID to inject") + parser.add_argument("--duration", type=float, default=0, help="Auto-clear after N seconds") + parser.add_argument("--clear", action="store_true", help="Clear all faults") + parser.add_argument("--list", action="store_true", help="List available faults") + parser.add_argument("--interactive", action="store_true", help="Interactive mode") + args = parser.parse_args() + + if args.list: + print(f"\n{'='*60}") + print("Available Faults") + print(f"{'='*60}") + for fid, f in FAULTS.items(): + print(f" {fid:<20} {f['name']}") + print(f" {'':20} {f['description']}") + print(f" {'':20} Forces: {json.dumps(f['force'])}") + print() + return + + config = load_config(args.config) + client = FactoryIOClient(base_url=args.api_url) + + if not client.check_connection(): + logger.error("Cannot connect to Factory I/O at %s", args.api_url) + sys.exit(1) + + injector = FaultInjector(client) + + if args.clear: + injector.clear() + return + + if args.fault: + injector.inject(args.fault, duration=args.duration) + return + + if args.interactive: + print("\nInteractive Fault Injector") + print("Commands: inject , clear [fault], status, list, quit\n") + while True: + try: + cmd = input("fault> ").strip().split() + except (EOFError, KeyboardInterrupt): + print() + break + if not cmd: + continue + if cmd[0] in ("quit", "exit"): + break + elif cmd[0] == "inject" and len(cmd) > 1: + dur = float(cmd[2]) if len(cmd) > 2 else 0 + injector.inject(cmd[1], duration=dur) + elif cmd[0] == "clear": + injector.clear(cmd[1] if len(cmd) > 1 else "") + elif cmd[0] == "status": + print(json.dumps(injector.status(), indent=2)) + elif cmd[0] == "list": + for fid in FAULTS: + print(f" {fid}: {FAULTS[fid]['name']}") + else: + print(f"Unknown: {cmd[0]}") + injector.clear() + return + + parser.print_help() + + +if __name__ == "__main__": + main() diff --git a/sim/sorting_controller.py b/sim/sorting_controller.py new file mode 100644 index 0000000..6855e48 --- /dev/null +++ b/sim/sorting_controller.py @@ -0,0 +1,791 @@ +""" +Factory I/O Sorting Controller — Python Soft PLC. + +Drives the "Sorting by Height (Basic)" scene via Factory I/O's Web API. +No CODESYS or PLC hardware required. + +Architecture: + Factory I/O (scene running, Web API enabled on :7410) + | HTTP REST (id-based, lowercase fields) + This script (reads sensors, runs state machine, writes actuators) + | Optional + /api/plc/live endpoint for Cosmos diagnosis + +API contract (Factory I/O Web API — EmbedIO server): + GET /api/tags -> [{name, id, address, type, kind, value, isForced, ...}] + GET /api/tag/values -> [{id, value}] + PUT /api/tag/values <- [{id, value}] (output tags only) + PUT /api/tag/values-force <- [{id, value}] + PUT /api/tag/values-release <- ["id"] + +Usage: + python sim/sorting_controller.py # Run controller + python sim/sorting_controller.py --discover # Dump all tags + python sim/sorting_controller.py --api-url http://host:7410 + python sim/sorting_controller.py --dry-run # Read-only mode +""" + +import argparse +import datetime +import json +import logging +import os +import signal +import sys +import time +from enum import IntEnum +from pathlib import Path +from threading import Event, Lock, Thread + +_repo_root = str(Path(__file__).resolve().parent.parent) +if _repo_root not in sys.path: + sys.path.insert(0, _repo_root) + +import httpx + +logging.basicConfig( + level=logging.INFO, + format="%(asctime)s %(levelname)s %(message)s", + datefmt="%H:%M:%S", +) +logger = logging.getLogger("sorting_controller") +# Suppress noisy httpx request logging +logging.getLogger("httpx").setLevel(logging.WARNING) +logging.getLogger("httpcore").setLevel(logging.WARNING) + + +# --------------------------------------------------------------------------- +# State machine (modeled after recovery/Micro820_v1.9.st CASE pattern) +# --------------------------------------------------------------------------- +class State(IntEnum): + INIT = 0 # All outputs OFF, stop light ON, waiting for Start + RUNNING = 1 # Entry conveyor ON, green light ON, waiting for box + SETTLING = 2 # Conveyor stopped, waiting 3 scans for sensors to settle + SORTING_LEFT = 3 # Transfer left active, waiting for exit sensor + SORTING_RIGHT = 4 # Transfer right active, waiting for exit sensor + STOPPED = 5 # All outputs OFF, yellow light ON, waiting for Reset + ESTOP = 6 # All outputs OFF immediately, red light ON + + +STATE_NAMES = {s: s.name for s in State} + + +# --------------------------------------------------------------------------- +# Factory I/O Web API client (id-based, lowercase fields) +# --------------------------------------------------------------------------- +class FactoryIOClient: + """HTTP client for Factory I/O Web API (port 7410). + + Factory I/O uses UUIDs (id) to identify tags, not names. + On init, we fetch /api/tags to build name<->id lookup tables. + All read/write operations use IDs internally. + """ + + def __init__(self, base_url: str = "http://localhost:7410", timeout: float = 2.0): + self.base_url = base_url.rstrip("/") + self.client = httpx.Client(timeout=timeout) + self._connected = False + # Lookup tables built from /api/tags + self._name_to_id: dict[str, str] = {} # "Conveyor entry" -> "uuid" (prefers Output) + self._id_to_name: dict[str, str] = {} # "uuid" -> "Conveyor entry" + self._tag_meta: dict[str, dict] = {} # name -> full tag dict + self._output_names: set[str] = set() + self._input_name_to_id: dict[str, str] = {} # "Start" -> input UUID + self._kind_by_id: dict[str, str] = {} # UUID -> "Input"/"Output" + + def check_connection(self) -> bool: + """Connect and build name<->id lookup from /api/tags.""" + try: + r = self.client.get(f"{self.base_url}/api/tags") + r.raise_for_status() + tags = r.json() + self._name_to_id.clear() + self._id_to_name.clear() + self._tag_meta.clear() + self._output_names.clear() + self._input_name_to_id.clear() + self._kind_by_id.clear() + collisions = [] + input_count = 0 + output_count = 0 + for t in tags: + name = t["name"] + tid = t["id"] + kind = t.get("kind", "") + # Track kind for every UUID + self._kind_by_id[tid] = kind + # Map ALL UUIDs to names (not just the winner) + self._id_to_name[tid] = name + if kind == "Input": + input_count += 1 + self._input_name_to_id[name] = tid + if kind == "Output": + output_count += 1 + self._output_names.add(name) + # _name_to_id prefers Output (for writes via _name_id) + if name in self._name_to_id: + if kind == "Output": + collisions.append(name) + self._name_to_id[name] = tid + # else: keep existing (Output already stored, or first-seen) + else: + self._name_to_id[name] = tid + self._tag_meta[name] = t + self._connected = True + for c in collisions: + logger.warning("Tag name collision: '%s' has both Input and Output — reads will prefer Input", c) + logger.info("Loaded %d tags (%d inputs, %d outputs)", len(self._name_to_id), input_count, output_count) + return True + except Exception as e: + logger.warning("Factory I/O not reachable at %s: %s", self.base_url, e) + self._connected = False + return False + + @property + def connected(self) -> bool: + return self._connected + + def _name_id(self, name: str) -> str | None: + tid = self._name_to_id.get(name) + if tid is None: + logger.warning("Unknown tag name: %s", name) + return tid + + def get_all_tags(self) -> list[dict] | None: + """GET /api/tags — full tag metadata.""" + try: + r = self.client.get(f"{self.base_url}/api/tags") + r.raise_for_status() + return r.json() + except Exception as e: + logger.error("Failed to get tags: %s", e) + return None + + def read_all_values(self) -> dict | None: + """GET /api/tag/values -> dict of tag_name: value. + + When a name has both Input and Output tags, the Input value wins. + This ensures sensor reads are never shadowed by actuator values. + """ + try: + r = self.client.get(f"{self.base_url}/api/tag/values") + r.raise_for_status() + raw = r.json() # [{id, value}, ...] + result = {} + for item in raw: + tid = item["id"] + name = self._id_to_name.get(tid) + if not name: + continue + kind = self._kind_by_id.get(tid, "") + # If name already stored and current tag is NOT Input, skip — + # Input (sensor) values always win over Output (actuator) values. + if name in result and kind != "Input": + continue + result[name] = item["value"] + return result + except Exception as e: + logger.warning("Read values failed: %s", e) + self._connected = False + return None + + def write_tags(self, tag_values: dict) -> bool: + """Write multiple tags by name. Only writes Output tags.""" + payload = [] + for name, value in tag_values.items(): + tid = self._name_id(name) + if tid and name in self._output_names: + payload.append({"id": tid, "value": value}) + if not payload: + return True + try: + r = self.client.put( + f"{self.base_url}/api/tag/values", + json=payload, + ) + r.raise_for_status() + # Check for errors in response + resp = r.json() + for item in resp: + if item.get("error"): + logger.warning("Write error: %s", item) + return True + except Exception as e: + logger.warning("Write tags failed: %s", e) + return False + + def force_tag(self, tag_name: str, value) -> bool: + """Force a tag value (overrides scene logic).""" + tid = self._name_id(tag_name) + if not tid: + return False + try: + r = self.client.put( + f"{self.base_url}/api/tag/values-force", + json=[{"id": tid, "value": value}], + ) + r.raise_for_status() + return True + except Exception as e: + logger.warning("Force %s=%s failed: %s", tag_name, value, e) + return False + + def release_tag(self, tag_name: str) -> bool: + """Release a forced tag (return to scene logic).""" + tid = self._name_id(tag_name) + if not tid: + return False + try: + r = self.client.put( + f"{self.base_url}/api/tag/values-release", + json=[tid], + ) + r.raise_for_status() + return True + except Exception as e: + logger.warning("Release %s failed: %s", tag_name, e) + return False + + +# --------------------------------------------------------------------------- +# Config loader +# --------------------------------------------------------------------------- +def load_config(path: str = "config/sorting_tags.yaml") -> dict: + """Load tag mapping config. Returns defaults matching 'Sorting by Height (Basic)' scene.""" + defaults = { + "api_url": "http://localhost:7410", + "poll_interval_ms": 100, + # Actual Factory I/O tag names for "Sorting by Height (Basic)" + "sensors": { + "high_sensor": "High sensor", + "low_sensor": "Low sensor", + "pallet_sensor": "Pallet sensor", + "at_left_entry": "At left entry", + "at_left_exit": "At left exit", + "at_right_entry": "At right entry", + "at_right_exit": "At right exit", + "start": "Start", + "stop": "Stop", + "reset": "Reset", + "emergency_stop": "Emergency stop", + "auto": "Auto", + "manual": "Manual", + "loaded": "Loaded", + }, + "actuators": { + "entry_conveyor": "Conveyor entry", + "left_conveyor": "Conveyor left", + "right_conveyor": "Conveyor right", + "transfer_left": "Transf. left", + "transfer_right": "Transf. right", + "load": "Load", + "unload": "Unload", + "emitter": "Emitter", + "remover_left": "Remover left", + "remover_right": "Remover right", + "counter": "Counter", + "start_light": "Start light", + "stop_light": "Stop light", + "reset_light": "Reset light", + }, + } + + cfg_file = Path(path) + if cfg_file.exists(): + try: + import yaml + with cfg_file.open("r", encoding="utf-8") as f: + raw = yaml.safe_load(f) or {} + fio = raw.get("factoryio", raw) + defaults["api_url"] = fio.get("api_url", defaults["api_url"]) + defaults["poll_interval_ms"] = fio.get("poll_interval_ms", defaults["poll_interval_ms"]) + for section in ("sensors", "actuators"): + if section in fio: + defaults[section].update(fio[section]) + logger.info("Config loaded from %s", cfg_file) + except ImportError: + logger.warning("PyYAML not installed — using defaults") + except Exception: + logger.exception("Config load error from %s", cfg_file) + return defaults + + +# --------------------------------------------------------------------------- +# Sorting Controller +# --------------------------------------------------------------------------- +class SortingController: + """State machine for "Sorting by Height (Basic)" via Factory I/O Web API. + + Rebuilt from official Factory I/O template logic: + - Rising edge detection on Start/Stop/Reset (no re-trigger on held buttons) + - E-stop and Stop use NC convention (False=engaged/pressed) + - Height classification uses BOTH high and low sensors + - Settling delay (3 scans) before reading height after conveyor stop + - Sensor-based transfer completion with 15s watchdog safety net + """ + + # Settling delay: number of scans to wait after conveyor stop + SETTLE_SCANS = 3 + # Watchdog: max scans before force-completing a transfer (15s at 100ms) + WATCHDOG_SCANS = 150 + + def __init__(self, client: FactoryIOClient, config: dict, dry_run: bool = False): + self.fio = client + self.cfg = config + self.dry_run = dry_run + + self.s = config["sensors"] + self.a = config["actuators"] + + # State + self.state = State.INIT + self.tags: dict = {} + self.sorted_count = 0 + self.total_count = 0 + self.tall_count = 0 + self.short_count = 0 + self.last_sort_result = "" + self.start_time = time.monotonic() + + # Settling / watchdog counters + self._settle_count = 0 + self._watchdog_count = 0 + + # Rising edge detection: track "active" state (True=action requested) + # Start/Reset are NO (True=pressed), Stop is NC (True=released, invert) + self._prev_start = False + self._prev_stop = False # tracks stop_active (inverted NC signal) + self._prev_reset = False + # Exit sensor edges for transfer completion + self._prev_at_left_exit = False + self._prev_at_right_exit = False + + # Thread-safe tag snapshot for diagnosis API + self._lock = Lock() + self._snapshot: dict = {} + + def get_snapshot(self) -> dict: + with self._lock: + return dict(self._snapshot) + + def _update_snapshot(self): + with self._lock: + self._snapshot = { + "timestamp": datetime.datetime.now(tz=datetime.timezone.utc).isoformat(), + "state": STATE_NAMES.get(self.state, "UNKNOWN"), + "state_id": int(self.state), + "sorted_count": self.sorted_count, + "tall_count": self.tall_count, + "short_count": self.short_count, + "total_count": self.total_count, + "last_sort_result": self.last_sort_result, + "uptime_s": round(time.monotonic() - self.start_time, 1), + "dry_run": self.dry_run, + "tags": dict(self.tags), + "tag_source": "factoryio_webapi", + } + + def _read_sensors(self) -> bool: + values = self.fio.read_all_values() + if values is None: + return False + self.tags = values + return True + + def _write(self, tag_values: dict): + if self.dry_run: + return + self.fio.write_tags(tag_values) + + def _sensor(self, logical_name: str, default=False): + tag_name = self.s.get(logical_name, logical_name) + return self.tags.get(tag_name, default) + + def _all_outputs_off(self): + """Kill all actuators immediately.""" + self._write({ + self.a["entry_conveyor"]: False, + self.a["left_conveyor"]: False, + self.a["right_conveyor"]: False, + self.a["emitter"]: False, + self.a["transfer_left"]: False, + self.a["transfer_right"]: False, + self.a["start_light"]: False, + self.a["stop_light"]: False, + self.a["reset_light"]: False, + }) + + def tick(self): + """One scan cycle of the sorting state machine.""" + if not self._read_sensors(): + return + + prev_state = self.state + + # --- Button active states (normalized: True = action requested) --- + start_active = self._sensor("start") # NO: True=pressed + stop_active = not self._sensor("stop") # NC: True=released -> invert + reset_active = self._sensor("reset") # NO: True=pressed + estop_engaged = not self._sensor("emergency_stop") # NC: True=safe -> invert + + # --- Rising edge detection (curr AND NOT prev) --- + start_rising = start_active and not self._prev_start + stop_rising = stop_active and not self._prev_stop + reset_rising = reset_active and not self._prev_reset + + # --- Exit sensor rising edges (for transfer completion) --- + at_left_exit = self._sensor("at_left_exit") + at_right_exit = self._sensor("at_right_exit") + left_exit_rising = at_left_exit and not self._prev_at_left_exit + right_exit_rising = at_right_exit and not self._prev_at_right_exit + + # --- E-stop check: highest priority, every tick, before state logic --- + if estop_engaged and self.state != State.ESTOP: + self._all_outputs_off() + self._write({self.a["stop_light"]: True}) # red indicator + self.state = State.ESTOP + logger.warning("E-STOP ENGAGED — all outputs OFF") + + # --- State machine --- + elif self.state == State.INIT: + # All outputs off, stop light on, waiting for Start + self._write({ + self.a["entry_conveyor"]: False, + self.a["left_conveyor"]: True, + self.a["right_conveyor"]: True, + self.a["emitter"]: False, + self.a["remover_left"]: True, + self.a["remover_right"]: True, + self.a["transfer_left"]: False, + self.a["transfer_right"]: False, + self.a["start_light"]: True, + self.a["stop_light"]: False, + self.a["reset_light"]: False, + self.a["counter"]: 0, + }) + if start_rising: + self.state = State.RUNNING + logger.info("INIT -> RUNNING: start pressed") + + elif self.state == State.RUNNING: + if stop_rising: + self.state = State.STOPPED + logger.info("RUNNING -> STOPPED: stop pressed") + else: + # Entry conveyor ON, waiting for box at pallet sensor + pallet = self._sensor("pallet_sensor") + if pallet: + # Box arrived — stop conveyor, begin settling + self._write({self.a["entry_conveyor"]: False}) + self._settle_count = 0 + self.state = State.SETTLING + logger.info("RUNNING -> SETTLING: box at pallet sensor") + else: + # Keep running + self._write({ + self.a["entry_conveyor"]: True, + self.a["left_conveyor"]: True, + self.a["right_conveyor"]: True, + self.a["emitter"]: True, + self.a["remover_left"]: True, + self.a["remover_right"]: True, + self.a["transfer_left"]: False, + self.a["transfer_right"]: False, + self.a["start_light"]: True, + self.a["stop_light"]: False, + }) + + elif self.state == State.SETTLING: + if stop_rising: + self.state = State.STOPPED + logger.info("SETTLING -> STOPPED: stop pressed") + else: + # Wait for sensors to settle after conveyor stop + self._settle_count += 1 + if self._settle_count >= self.SETTLE_SCANS: + # Read height sensors + high = self._sensor("high_sensor") + low = self._sensor("low_sensor") + if high and low: + # TALL box -> route LEFT + self._write({ + self.a["transfer_left"]: True, + self.a["transfer_right"]: False, + self.a["left_conveyor"]: True, + }) + self._watchdog_count = 0 + self.state = State.SORTING_LEFT + logger.info("SETTLING -> SORTING_LEFT: tall box (high=%s low=%s)", high, low) + elif low and not high: + # SHORT box -> route RIGHT + self._write({ + self.a["transfer_left"]: False, + self.a["transfer_right"]: True, + self.a["right_conveyor"]: True, + }) + self._watchdog_count = 0 + self.state = State.SORTING_RIGHT + logger.info("SETTLING -> SORTING_RIGHT: short box (high=%s low=%s)", high, low) + else: + # Neither sensor triggered — box may have moved, go back to RUNNING + logger.warning("SETTLING: no height detected (high=%s low=%s) — returning to RUNNING", high, low) + self.state = State.RUNNING + + elif self.state == State.SORTING_LEFT: + if stop_rising: + self._write({self.a["transfer_left"]: False}) + self.state = State.STOPPED + logger.info("SORTING_LEFT -> STOPPED: stop pressed") + else: + self._watchdog_count += 1 + if left_exit_rising: + # Box reached left exit — transfer complete + self._write({self.a["transfer_left"]: False}) + self.tall_count += 1 + self.sorted_count += 1 + self.last_sort_result = "LEFT" + self._write({self.a["counter"]: self.sorted_count}) + self.state = State.RUNNING + logger.info("Part sorted LEFT (tall) [#%d]", self.sorted_count) + elif self._watchdog_count >= self.WATCHDOG_SCANS: + # Watchdog: exit sensor never fired — force-complete + self._write({self.a["transfer_left"]: False}) + self.tall_count += 1 + self.sorted_count += 1 + self.last_sort_result = "LEFT" + self._write({self.a["counter"]: self.sorted_count}) + self.state = State.RUNNING + logger.warning("WATCHDOG: left transfer timed out after %d scans — force-completing", self._watchdog_count) + + elif self.state == State.SORTING_RIGHT: + if stop_rising: + self._write({self.a["transfer_right"]: False}) + self.state = State.STOPPED + logger.info("SORTING_RIGHT -> STOPPED: stop pressed") + else: + self._watchdog_count += 1 + if right_exit_rising: + # Box reached right exit — transfer complete + self._write({self.a["transfer_right"]: False}) + self.short_count += 1 + self.sorted_count += 1 + self.last_sort_result = "RIGHT" + self._write({self.a["counter"]: self.sorted_count}) + self.state = State.RUNNING + logger.info("Part sorted RIGHT (short) [#%d]", self.sorted_count) + elif self._watchdog_count >= self.WATCHDOG_SCANS: + # Watchdog: exit sensor never fired — force-complete + self._write({self.a["transfer_right"]: False}) + self.short_count += 1 + self.sorted_count += 1 + self.last_sort_result = "RIGHT" + self._write({self.a["counter"]: self.sorted_count}) + self.state = State.RUNNING + logger.warning("WATCHDOG: right transfer timed out after %d scans — force-completing", self._watchdog_count) + + elif self.state == State.STOPPED: + self._write({ + self.a["entry_conveyor"]: False, + self.a["emitter"]: False, + self.a["transfer_left"]: False, + self.a["transfer_right"]: False, + self.a["start_light"]: False, + self.a["stop_light"]: True, + self.a["reset_light"]: True, + }) + if reset_rising: + self.sorted_count = 0 + self.tall_count = 0 + self.short_count = 0 + self.total_count = 0 + self._write({self.a["counter"]: 0}) + self.state = State.INIT + logger.info("STOPPED -> INIT: reset pressed (counters cleared)") + + elif self.state == State.ESTOP: + # Hold all outputs off, red light on + self._all_outputs_off() + self._write({self.a["stop_light"]: True}) + # Exit: e-stop released AND reset rising edge + if not estop_engaged and reset_rising: + self._write({self.a["stop_light"]: False, self.a["reset_light"]: False}) + self.state = State.INIT + logger.info("ESTOP -> INIT: e-stop released + reset pressed") + + # --- Update edge detection state (MUST be after all state logic) --- + self._prev_start = start_active + self._prev_stop = stop_active + self._prev_reset = reset_active + self._prev_at_left_exit = at_left_exit + self._prev_at_right_exit = at_right_exit + + if self.state != prev_state: + logger.info("State: %s -> %s", STATE_NAMES[prev_state], STATE_NAMES[self.state]) + + self._update_snapshot() + + def stop(self): + """Graceful shutdown.""" + self.state = State.STOPPED + self._all_outputs_off() + self._write({self.a["stop_light"]: True}) + logger.info("Controller stopped. Sorted %d parts (%d tall, %d short).", + self.sorted_count, self.tall_count, self.short_count) + + +# --------------------------------------------------------------------------- +# Optional HTTP API for diagnosis integration +# --------------------------------------------------------------------------- +def start_api_server(controller: SortingController, port: int = 8765): + """Start a minimal HTTP server exposing /api/plc/live.""" + from http.server import HTTPServer, BaseHTTPRequestHandler + + class Handler(BaseHTTPRequestHandler): + def log_message(self, *args): + pass + + def do_GET(self): + if self.path in ("/api/plc/live", "/api/plc/live/"): + snapshot = controller.get_snapshot() + body = json.dumps(snapshot, default=str).encode() + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.send_header("Access-Control-Allow-Origin", "*") + self.end_headers() + self.wfile.write(body) + elif self.path in ("/health", "/health/"): + self.send_response(200) + self.send_header("Content-Type", "application/json") + self.end_headers() + self.wfile.write(json.dumps({"status": "ok"}).encode()) + else: + self.send_response(404) + self.end_headers() + + server = HTTPServer(("0.0.0.0", port), Handler) + thread = Thread(target=server.serve_forever, daemon=True) + thread.start() + logger.info("API server listening on http://0.0.0.0:%d/api/plc/live", port) + return server + + +# --------------------------------------------------------------------------- +# Tag discovery +# --------------------------------------------------------------------------- +def discover_tags(client: FactoryIOClient): + """Print all Factory I/O tags with actual field names.""" + tags = client.get_all_tags() + if tags is None: + logger.error("Could not retrieve tags. Is Factory I/O running with Web API enabled?") + return + + # Filter system vs scene tags + system = [t for t in tags if t["name"].startswith("FACTORY I/O")] + scene = [t for t in tags if not t["name"].startswith("FACTORY I/O")] + + print(f"\n{'='*70}") + print(f"Scene Tags ({len(scene)} tags)") + print(f"{'='*70}") + for t in sorted(scene, key=lambda x: (x["kind"], x["name"])): + forced = " [FORCED]" if t.get("isForced") else "" + print(f" {t['kind']:<8} {t['type']:<5} addr={t['address']:<4} " + f"{t['name']:<25} = {t['value']}{forced}") + print(f" id={t['id']}") + + print(f"\n{'='*70}") + print(f"System Tags ({len(system)} tags)") + print(f"{'='*70}") + for t in sorted(system, key=lambda x: x["name"]): + print(f" {t['kind']:<8} {t['name']:<30} = {t['value']}") + + # Dump values via /api/tag/values to confirm format + print(f"\n{'='*70}") + print("Live Values (via /api/tag/values)") + print(f"{'='*70}") + values = client.read_all_values() + if values: + for name, val in sorted(values.items()): + print(f" {name:<30} = {val}") + else: + print(" (failed to read values)") + + +# --------------------------------------------------------------------------- +# Main +# --------------------------------------------------------------------------- +def main(): + parser = argparse.ArgumentParser(description="Factory I/O Sorting Controller (Python Soft PLC)") + parser.add_argument("--api-url", default=os.getenv("FACTORYIO_URL", "http://localhost:7410")) + parser.add_argument("--config", default="config/sorting_tags.yaml") + parser.add_argument("--discover", action="store_true", help="Dump all tags and exit") + parser.add_argument("--dry-run", action="store_true", help="Read-only mode") + parser.add_argument("--interval", type=int, default=0, help="Override poll interval (ms)") + parser.add_argument("--port", type=int, default=8765, help="API server port") + parser.add_argument("--no-api", action="store_true", help="Disable API server") + args = parser.parse_args() + + config = load_config(args.config) + api_url = args.api_url or config["api_url"] + interval_ms = args.interval or config["poll_interval_ms"] + + client = FactoryIOClient(base_url=api_url) + + if not client.check_connection(): + logger.error("Cannot connect to Factory I/O at %s", api_url) + logger.info("Enable: press \\ in Factory I/O, type: app.web_server = True") + sys.exit(1) + + logger.info("Connected to Factory I/O at %s", api_url) + + # Check if the scene is actually running (F5) + all_tags = client.get_all_tags() + if all_tags: + running_tag = next((t for t in all_tags if t["name"] == "FACTORY I/O (Running)"), None) + if running_tag and not running_tag.get("value"): + logger.warning("Scene is NOT running — press F5 in Factory I/O to start the scene") + + if args.discover: + discover_tags(client) + return + + controller = SortingController(client, config, dry_run=args.dry_run) + + api_server = None + if not args.no_api: + api_server = start_api_server(controller, port=args.port) + + stop_event = Event() + + def on_signal(signum, frame): + logger.info("Shutdown signal received") + stop_event.set() + + signal.signal(signal.SIGINT, on_signal) + signal.signal(signal.SIGTERM, on_signal) + + logger.info("Starting sorting controller (poll=%dms, dry_run=%s)", interval_ms, args.dry_run) + cycle = 0 + interval_s = interval_ms / 1000.0 + + while not stop_event.is_set(): + controller.tick() + cycle += 1 + + if cycle % max(1, int(10000 / interval_ms)) == 0: + snap = controller.get_snapshot() + logger.info( + "Cycle %d | state=%s | sorted=%d (tall=%d short=%d) | total=%d | uptime=%.0fs", + cycle, snap["state"], snap["sorted_count"], + snap["tall_count"], snap["short_count"], + snap["total_count"], snap["uptime_s"], + ) + + stop_event.wait(interval_s) + + controller.stop() + if api_server: + api_server.shutdown() + + +if __name__ == "__main__": + main()