diff --git a/Src/bot_core.py b/Src/bot_core.py index b280abc..36a2cc8 100644 --- a/Src/bot_core.py +++ b/Src/bot_core.py @@ -72,7 +72,8 @@ class TouchConstants: import port_scan import ocr_utils -SLEEP_DELAY = 0.1 +# default delay between sequential actions (seconds) +SLEEP_DELAY = 0.05 class Bot: @@ -407,7 +408,14 @@ def get_current_icons(self, new=True, available=False): img_gray = cv2.cvtColor(img_rgb, cv2.COLOR_BGR2GRAY) self.logger.debug(f'Screenshot shape: {img_gray.shape}') - + + # OCR fallback for chapter headers + try: + ocr_chapters = ocr_utils.find_chapter_headers(img_rgb) + except Exception as e: + ocr_chapters = {} + self.logger.debug(f'Chapter OCR failed: {e}') + # Check every target in dir icon_count = 0 for target in os.listdir("icons"): @@ -426,12 +434,26 @@ def get_current_icons(self, new=True, available=False): max_val = res.max() loc = np.where(res >= threshold) icon_found = len(loc[0]) > 0 - - # Removed verbose per-icon visibility debug logging - + if icon_found: y = loc[0][0] x = loc[1][0] + elif target.startswith('chapter_'): + # Fallback to OCR if chapter icon not found + try: + num = int(target.split('_')[1].split('.')[0]) + if num in ocr_chapters: + x, y = ocr_chapters[num] + icon_found = True + self.logger.debug(f'OCR detected {target} at position {(x, y)}') + except Exception: + pass + + # Debug for key icons + if target in ['home_screen.png', 'battle_icon.png'] or 'chapter_' in target: + self.logger.debug(f'Icon {target}: max_val={max_val:.3f}, found={icon_found}') + + if icon_found: icon_count += 1 current_icons.append([target, icon_found, (x, y)]) diff --git a/Src/gui.py b/Src/gui.py index b147a13..dc270b9 100644 --- a/Src/gui.py +++ b/Src/gui.py @@ -55,9 +55,10 @@ def __init__(self): self.stop_flag = False self.running = False self.info_ready = threading.Event() - self.thread_run = None - self.thread_init = None - self.bot_instance = None + self.thread_run: threading.Thread | None = None + self.thread_init: threading.Thread | None = None + self.bot_instance: Any | None = None + # Read config file self.config = configparser.ConfigParser() self.config.read('config.ini') @@ -206,10 +207,13 @@ def leave_game(self): def _schedule_latency_update(self): try: stats = LT.rolling_stats() + last = LT.last_thinking_ms() if stats.get('count', 0) and stats['count'] > 0: - text = f"RT avg {stats['avg_ms']:.0f} ms p50 {stats['p50_ms']:.0f} p90 {stats['p90_ms']:.0f}" + text = ( + f"RT avg {stats['avg_ms']:.0f} ms p50 {stats['p50_ms']:.0f} p90 {stats['p90_ms']:.0f} last {last:.0f}" + ) else: - text = 'RT: -- ms' + text = f'RT: -- ms last {last:.0f}' self.latency_label.config(text=text) except Exception: # Keep label as-is on errors diff --git a/Src/latency.py b/Src/latency.py index c10a291..24e389e 100644 --- a/Src/latency.py +++ b/Src/latency.py @@ -59,12 +59,19 @@ def __init__(self, csv_path: str = 'latency_metrics.csv') -> None: self.enabled = os.getenv('LATENCY_ENABLED', '1') not in ('0', 'false', 'False') # rolling stats self._roll = deque(maxlen=int(os.getenv('LATENCY_ROLLING_N', '200'))) + + self._flush_count = 0 + self._summary_every = int(os.getenv('LATENCY_SUMMARY_EVERY', '50')) + self._summary_path = os.getenv('LATENCY_SUMMARY_PATH', 'latency_summary.txt') + # store last screenshot->action latency for quick access + self._last_s2a_ms: float = 0.0 self._roll_think = deque(maxlen=int(os.getenv('LATENCY_ROLLING_N_THINK', '200'))) self._flush_count = 0 self._summary_every = int(os.getenv('LATENCY_SUMMARY_EVERY', '50')) self._summary_path = os.getenv('LATENCY_SUMMARY_PATH', 'latency_summary.txt') self._last_think_ms = 0.0 + def _ensure_header(self) -> None: if not os.path.exists(self._csv_path): with open(self._csv_path, 'w', newline='', encoding='utf-8') as f: @@ -126,6 +133,9 @@ def flush(self, extra: Optional[Dict[str, str]] = None) -> None: # update rolling stats with primary KPI (screenshot_to_action_ms) if screenshot_to_action_ms is not None: self._roll.append(float(screenshot_to_action_ms)) + # keep last thinking time regardless of rolling window + if screenshot_to_action_ms is not None: + self._last_s2a_ms = float(screenshot_to_action_ms) self._flush_count += 1 if self._summary_every > 0 and self._flush_count % self._summary_every == 0: self._write_summary() @@ -147,6 +157,11 @@ def _fmt(v: Optional[float]) -> str: return '' return f'{v:.3f}' + # --- Helpers for UI/monitoring --- + def last_thinking_ms(self) -> float: + """Return milliseconds between screenshot capture end and action issue for last flush.""" + return float(self._last_s2a_ms) + # --- Extras --- def set_enabled(self, enabled: bool) -> None: self.enabled = enabled diff --git a/Src/ocr_utils.py b/Src/ocr_utils.py index 562bda7..6330864 100644 --- a/Src/ocr_utils.py +++ b/Src/ocr_utils.py @@ -115,3 +115,29 @@ def read_floor_from_chapter(screen_bgr: np.ndarray, chapter_header_xy: Tuple[int val, conf = val2, conf2 results[slot] = (val, conf) return results + + +def find_chapter_headers(screen_bgr: np.ndarray) -> Dict[int, Tuple[int, int]]: + """Locate positions of 'Chapter N' headers on the given screen using OCR. + Returns mapping {chapter_number: (x, y)} for the top-left of the word 'Chapter'. + """ + if not _TESS: + return {} + try: + data = pytesseract.image_to_data(screen_bgr, output_type=pytesseract.Output.DICT) + except Exception as e: + if isinstance(e, (KeyboardInterrupt, SystemExit)): + raise + return {} + results: Dict[int, Tuple[int, int]] = {} + words = [w.strip().lower() for w in data.get('text', [])] + for i, w in enumerate(words): + if w == 'chapter' and i + 1 < len(words): + try: + num = int(words[i + 1]) + x = int(data['left'][i]) + y = int(data['top'][i]) + results[num] = (x, y) + except (ValueError, TypeError): + continue + return results