From d0274ba8cf134276f46846bbbe577e855562447c Mon Sep 17 00:00:00 2001 From: Mike Roberts Date: Sat, 7 Mar 2026 18:44:09 -0800 Subject: [PATCH 1/6] Unify TinyClaw runtime logging with structured pino logs - Switch queue, API, channels, daemon, and heartbeat to structured NDJSON logging - Add log rotation (10 MB, 5 archives) and expose merged history via `GET /api/logs` - Wire `LOG_LEVEL` through runtime config and update docs/TinyOffice log references --- README.md | 11 +- docs/QUEUE.md | 8 + lib/common.sh | 66 +++++- lib/daemon.sh | 3 + lib/heartbeat-cron.sh | 51 +++- package-lock.json | 133 ++++++++++- package.json | 1 + src/channels/discord-client.ts | 80 ++++--- src/channels/telegram-client.ts | 100 ++++---- src/channels/whatsapp-client.ts | 92 ++++---- src/lib/config.ts | 13 +- src/lib/conversation.ts | 43 +++- src/lib/events.ts | 22 ++ src/lib/invoke.ts | 18 +- src/lib/logging.ts | 339 +++++++++++++++++++++++++-- src/lib/plugins.ts | 28 ++- src/lib/response.ts | 5 +- src/lib/routing.ts | 17 +- src/queue-processor.ts | 118 ++++++++-- src/server/index.ts | 7 +- src/server/routes/agents.ts | 11 +- src/server/routes/logs.ts | 30 ++- src/server/routes/messages.ts | 12 +- src/server/routes/queue.ts | 9 +- src/server/routes/settings.ts | 5 +- src/server/routes/tasks.ts | 9 +- src/server/routes/teams.ts | 7 +- src/server/sse.ts | 2 +- tinyoffice/README.md | 3 +- tinyoffice/src/app/logs/page.tsx | 88 +++++-- tinyoffice/src/app/settings/page.tsx | 2 +- tinyoffice/src/lib/api.ts | 26 +- 32 files changed, 1085 insertions(+), 274 deletions(-) create mode 100644 src/lib/events.ts diff --git a/README.md b/README.md index dd0e4e77..5ea9ed4b 100644 --- a/README.md +++ b/README.md @@ -141,7 +141,7 @@ TinyClaw includes `tinyoffice/`, a Next.js web portal for operating TinyClaw fro - **Chat Console** - Send messages to default agent, `@agent`, or `@team` - **Agents & Teams** - Create, edit, and remove agents/teams - **Tasks (Kanban)** - Create tasks, drag across stages, assign to agent/team -- **Logs & Events** - Inspect queue logs and streaming events +- **Logs & Events** - Inspect structured runtime logs and streaming events - **Settings** - Edit TinyClaw configuration (`settings.json`) via UI - **Office View** - Visual simulation of agent interactions @@ -180,6 +180,13 @@ Commands work with `tinyclaw` (if CLI installed) or `./tinyclaw.sh` (direct scri | `logs [type]` | View logs (discord/telegram/whatsapp/queue/heartbeat/all) | `tinyclaw logs queue` | | `attach` | Attach to tmux session | `tinyclaw attach` | +### Logging + +- TinyClaw uses structured JSON logs backed by `pino` for Node runtimes. +- Set `LOG_LEVEL=debug|info|warn|error` before starting TinyClaw to control verbosity. +- `/api/logs` returns merged historical entries across `queue`, `api`, `telegram`, `discord`, `whatsapp`, `daemon`, and `heartbeat`. +- Log files rotate at `10 MB` with `5` retained archives per source. + ### Agent Commands | Command | Description | Example | @@ -423,7 +430,7 @@ tinyclaw/ │ │ ├── incoming/ │ │ ├── processing/ │ │ └── outgoing/ -│ ├── logs/ # All logs +│ ├── logs/ # Structured NDJSON logs + rotated archives │ ├── channels/ # Channel state │ ├── files/ # Uploaded files │ ├── pairing.json # Sender allowlist state (pending + approved) diff --git a/docs/QUEUE.md b/docs/QUEUE.md index 7a3bc0dd..c749431a 100644 --- a/docs/QUEUE.md +++ b/docs/QUEUE.md @@ -251,6 +251,7 @@ The API server runs on port 3777 (configurable via `TINYCLAW_API_PORT`): | `POST /api/message` | Enqueue a message | | `GET /api/queue/status` | Queue depth (pending, processing, dead) | | `GET /api/responses` | Recent responses | +| `GET /api/logs` | Unified structured log history across queue, API, channels, daemon, and heartbeat | | `GET /api/queue/dead` | Dead messages | | `POST /api/queue/dead/:id/retry` | Retry a dead message | | `DELETE /api/queue/dead/:id` | Delete a dead message | @@ -264,6 +265,13 @@ Periodic cleanup tasks run automatically: - **Acked response pruning**: Every hour (responses acked > 24h ago) - **Conversation TTL**: Every 30 minutes (team conversations older than 30 min) +## Logging + +- Node runtimes write structured NDJSON logs with `pino`. +- `LOG_LEVEL=debug|info|warn|error` controls verbosity for queue, API, and channel clients. +- `/api/logs` merges current and rotated files for `queue`, `api`, `telegram`, `discord`, `whatsapp`, `daemon`, and `heartbeat`. +- Log files rotate at `10 MB` and retain the previous `5` archives per source. + ## Debugging ### Check Queue Status diff --git a/lib/common.sh b/lib/common.sh index 398744be..b1a898b1 100644 --- a/lib/common.sh +++ b/lib/common.sh @@ -86,9 +86,71 @@ get_channel_token() { done } -# Logging function +# Structured log helpers +rotate_log_file() { + local file="$1" + local max_bytes=$((10 * 1024 * 1024)) + local max_files=5 + + [ -f "$file" ] || return 0 + + local size + size=$(wc -c < "$file" | tr -d ' ') + if [ "$size" -lt "$max_bytes" ]; then + return 0 + fi + + local ext="${file##*.}" + local base="${file%.*}" + local i + for ((i=max_files; i>=1; i--)); do + local current="${base}.${i}.${ext}" + local previous + if [ "$i" -eq 1 ]; then + previous="$file" + else + previous="${base}.$((i-1)).${ext}" + fi + + [ -f "$previous" ] || continue + [ ! -f "$current" ] || rm -f "$current" + mv "$previous" "$current" + done +} + +write_structured_log() { + local source="$1" + local component="$2" + local level="$3" + shift 3 + + local msg="$*" + local file="$LOG_DIR/${source}.log" + local timestamp + timestamp=$(date -u '+%Y-%m-%dT%H:%M:%SZ') + + mkdir -p "$LOG_DIR" + rotate_log_file "$file" + + if command -v jq >/dev/null 2>&1; then + jq -nc \ + --arg time "$timestamp" \ + --arg level "$level" \ + --arg source "$source" \ + --arg component "$component" \ + --arg msg "$msg" \ + '{time:$time,level:$level,source:$source,component:$component,msg:$msg}' >> "$file" + else + node -e 'const [time, level, source, component, msg] = process.argv.slice(1); console.log(JSON.stringify({ time, level, source, component, msg }));' \ + "$timestamp" "$level" "$source" "$component" "$msg" >> "$file" + fi + printf '\n' >> "$file" +} + log() { - echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_DIR/daemon.log" + local msg="$*" + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $msg" + write_structured_log "daemon" "daemon" "info" "$msg" } # Load settings from JSON diff --git a/lib/daemon.sh b/lib/daemon.sh index 807860a3..56f85a7c 100644 --- a/lib/daemon.sh +++ b/lib/daemon.sh @@ -102,6 +102,9 @@ start_daemon() { # Write tokens to .env for the Node.js clients local env_file="$SCRIPT_DIR/.env" : > "$env_file" + if [ -n "${LOG_LEVEL:-}" ]; then + echo "LOG_LEVEL=${LOG_LEVEL}" >> "$env_file" + fi for ch in "${ACTIVE_CHANNELS[@]}"; do local env_var env_var="$(channel_token_env "$ch")" diff --git a/lib/heartbeat-cron.sh b/lib/heartbeat-cron.sh index 00fa3a7c..33649173 100755 --- a/lib/heartbeat-cron.sh +++ b/lib/heartbeat-cron.sh @@ -25,8 +25,57 @@ INTERVAL=${INTERVAL:-3600} mkdir -p "$(dirname "$LOG_FILE")" +rotate_log_file() { + local file="$1" + local max_bytes=$((10 * 1024 * 1024)) + local max_files=5 + + [ -f "$file" ] || return 0 + + local size + size=$(wc -c < "$file" | tr -d ' ') + if [ "$size" -lt "$max_bytes" ]; then + return 0 + fi + + local ext="${file##*.}" + local base="${file%.*}" + local i + for ((i=max_files; i>=1; i--)); do + local current="${base}.${i}.${ext}" + local previous + if [ "$i" -eq 1 ]; then + previous="$file" + else + previous="${base}.$((i-1)).${ext}" + fi + + [ -f "$previous" ] || continue + [ ! -f "$current" ] || rm -f "$current" + mv "$previous" "$current" + done +} + log() { - echo "[$(date '+%Y-%m-%d %H:%M:%S')] $*" | tee -a "$LOG_FILE" + local msg="$*" + local timestamp + timestamp=$(date -u '+%Y-%m-%dT%H:%M:%SZ') + + echo "[$(date '+%Y-%m-%d %H:%M:%S')] $msg" + rotate_log_file "$LOG_FILE" + if command -v jq >/dev/null 2>&1; then + jq -nc \ + --arg time "$timestamp" \ + --arg level "info" \ + --arg source "heartbeat" \ + --arg component "heartbeat" \ + --arg msg "$msg" \ + '{time:$time,level:$level,source:$source,component:$component,msg:$msg}' >> "$LOG_FILE" + else + node -e 'const [time, level, source, component, msg] = process.argv.slice(1); console.log(JSON.stringify({ time, level, source, component, msg }));' \ + "$timestamp" "info" "heartbeat" "heartbeat" "$msg" >> "$LOG_FILE" + fi + printf '\n' >> "$LOG_FILE" } log "Heartbeat started (interval: ${INTERVAL}s, API: ${API_URL})" diff --git a/package-lock.json b/package-lock.json index 033badb1..220341c8 100644 --- a/package-lock.json +++ b/package-lock.json @@ -1,12 +1,12 @@ { "name": "tinyclaw", - "version": "0.0.6", + "version": "0.0.8", "lockfileVersion": 3, "requires": true, "packages": { "": { "name": "tinyclaw", - "version": "0.0.6", + "version": "0.0.8", "dependencies": { "@hono/node-server": "^1.19.9", "@types/better-sqlite3": "^7.6.13", @@ -20,6 +20,7 @@ "ink-spinner": "^5.0.0", "jsonrepair": "^3.13.2", "node-telegram-bot-api": "^0.67.0", + "pino": "^9.14.0", "qrcode-terminal": "^0.12.0", "react": "^19.2.4", "whatsapp-web.js": "^1.34.6" @@ -319,6 +320,12 @@ "integrity": "sha512-wtnBAETBVYZ9GvcbgdswRVSLkFkYAGv1KzwBBTeRXvGT9sb9cPllOgFFWXCn9PyARQ0H+Ijz6mmoRrGateUDxQ==", "license": "MIT" }, + "node_modules/@pinojs/redact": { + "version": "0.4.0", + "resolved": "https://registry.npmjs.org/@pinojs/redact/-/redact-0.4.0.tgz", + "integrity": "sha512-k2ENnmBugE/rzQfEcdWHcCY+/FM3VLzH9cYEsbdsoqrvzAKRhUZeRNhAZvB8OitQJ1TBed3yqWtdjzS6wJKBwg==", + "license": "MIT" + }, "node_modules/@puppeteer/browsers": { "version": "2.12.0", "resolved": "https://registry.npmjs.org/@puppeteer/browsers/-/browsers-2.12.0.tgz", @@ -764,6 +771,15 @@ "integrity": "sha512-Oei9OH4tRh0YqU3GxhX79dM/mwVgvbZJaSNaRk+bshkj0S5cfHcgYakreBjrHwatXKbz+IoIdYLxrKim2MjW0Q==", "license": "MIT" }, + "node_modules/atomic-sleep": { + "version": "1.0.0", + "resolved": "https://registry.npmjs.org/atomic-sleep/-/atomic-sleep-1.0.0.tgz", + "integrity": "sha512-kNOjDqAh7px0XWNI+4QbzoiR/nTkHAWNud2uvnJquD1/x5a7EQZMJT0AczqK0Qn67oY/TTQ1LbUKajZpp3I9tQ==", + "license": "MIT", + "engines": { + "node": ">=8.0.0" + } + }, "node_modules/auto-bind": { "version": "5.0.1", "resolved": "https://registry.npmjs.org/auto-bind/-/auto-bind-5.0.1.tgz", @@ -3808,6 +3824,15 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/on-exit-leak-free": { + "version": "2.1.2", + "resolved": "https://registry.npmjs.org/on-exit-leak-free/-/on-exit-leak-free-2.1.2.tgz", + "integrity": "sha512-0eJJY6hXLGf1udHwfNftBqH+g73EU4B504nZeKpz1sYRKafAghwxEJunB2O7rDZkL4PGfsMVnTXZ2EjibbqcsA==", + "license": "MIT", + "engines": { + "node": ">=14.0.0" + } + }, "node_modules/once": { "version": "1.4.0", "resolved": "https://registry.npmjs.org/once/-/once-1.4.0.tgz", @@ -3948,6 +3973,43 @@ "integrity": "sha512-xceH2snhtb5M9liqDsmEw56le376mTZkEX/jEb/RxNFyegNul7eNslCXP9FDj/Lcu0X8KEyMceP2ntpaHrDEVA==", "license": "ISC" }, + "node_modules/pino": { + "version": "9.14.0", + "resolved": "https://registry.npmjs.org/pino/-/pino-9.14.0.tgz", + "integrity": "sha512-8OEwKp5juEvb/MjpIc4hjqfgCNysrS94RIOMXYvpYCdm/jglrKEiAYmiumbmGhCvs+IcInsphYDFwqrjr7398w==", + "license": "MIT", + "dependencies": { + "@pinojs/redact": "^0.4.0", + "atomic-sleep": "^1.0.0", + "on-exit-leak-free": "^2.1.0", + "pino-abstract-transport": "^2.0.0", + "pino-std-serializers": "^7.0.0", + "process-warning": "^5.0.0", + "quick-format-unescaped": "^4.0.3", + "real-require": "^0.2.0", + "safe-stable-stringify": "^2.3.1", + "sonic-boom": "^4.0.1", + "thread-stream": "^3.0.0" + }, + "bin": { + "pino": "bin.js" + } + }, + "node_modules/pino-abstract-transport": { + "version": "2.0.0", + "resolved": "https://registry.npmjs.org/pino-abstract-transport/-/pino-abstract-transport-2.0.0.tgz", + "integrity": "sha512-F63x5tizV6WCh4R6RHyi2Ml+M70DNRXt/+HANowMflpgGFMAym/VKm6G7ZOQRjqN7XbGxK1Lg9t6ZrtzOaivMw==", + "license": "MIT", + "dependencies": { + "split2": "^4.0.0" + } + }, + "node_modules/pino-std-serializers": { + "version": "7.1.0", + "resolved": "https://registry.npmjs.org/pino-std-serializers/-/pino-std-serializers-7.1.0.tgz", + "integrity": "sha512-BndPH67/JxGExRgiX1dX0w1FvZck5Wa4aal9198SrRhZjH3GxKQUKIBnYJTdj2HDN3UQAS06HlfcSbQj2OHmaw==", + "license": "MIT" + }, "node_modules/possible-typed-array-names": { "version": "1.1.0", "resolved": "https://registry.npmjs.org/possible-typed-array-names/-/possible-typed-array-names-1.1.0.tgz", @@ -4002,6 +4064,22 @@ "integrity": "sha512-3ouUOpQhtgrbOa17J7+uxOTpITYWaGP7/AhoR3+A+/1e9skrzelGi/dXzEYyvbxubEF6Wn2ypscTKiKJFFn1ag==", "license": "MIT" }, + "node_modules/process-warning": { + "version": "5.0.0", + "resolved": "https://registry.npmjs.org/process-warning/-/process-warning-5.0.0.tgz", + "integrity": "sha512-a39t9ApHNx2L4+HBnQKqxxHNs1r7KF+Intd8Q/g1bUh6q0WIp9voPXJ/x0j+ZL45KF1pJd9+q2jLIRMfvEshkA==", + "funding": [ + { + "type": "github", + "url": "https://github.com/sponsors/fastify" + }, + { + "type": "opencollective", + "url": "https://opencollective.com/fastify" + } + ], + "license": "MIT" + }, "node_modules/progress": { "version": "2.0.3", "resolved": "https://registry.npmjs.org/progress/-/progress-2.0.3.tgz", @@ -4135,6 +4213,12 @@ "integrity": "sha512-FIqgj2EUvTa7R50u0rGsyTftzjYmv/a3hO345bZNrqabNqjtgiDMgmo4mkUjd+nzU5oF3dClKqFIPUKybUyqoQ==", "license": "MIT" }, + "node_modules/quick-format-unescaped": { + "version": "4.0.4", + "resolved": "https://registry.npmjs.org/quick-format-unescaped/-/quick-format-unescaped-4.0.4.tgz", + "integrity": "sha512-tYC1Q1hgyRuHgloV/YXs2w15unPVh8qfu/qCTfhTYamaw7fyhumKa2yGpdSo87vY32rIclj+4fWYQXUMs9EHvg==", + "license": "MIT" + }, "node_modules/rc": { "version": "1.2.8", "resolved": "https://registry.npmjs.org/rc/-/rc-1.2.8.tgz", @@ -4221,6 +4305,15 @@ "node": ">=10" } }, + "node_modules/real-require": { + "version": "0.2.0", + "resolved": "https://registry.npmjs.org/real-require/-/real-require-0.2.0.tgz", + "integrity": "sha512-57frrGM/OCTLqLOAh0mhVA9VBMHd+9U7Zb2THMGdBUoZVOtGbJzjxsYGDJ3A9AYYCP4hn6y1TVbaOfzWtm5GFg==", + "license": "MIT", + "engines": { + "node": ">= 12.13.0" + } + }, "node_modules/reflect.getprototypeof": { "version": "1.0.10", "resolved": "https://registry.npmjs.org/reflect.getprototypeof/-/reflect.getprototypeof-1.0.10.tgz", @@ -4531,6 +4624,15 @@ "url": "https://github.com/sponsors/ljharb" } }, + "node_modules/safe-stable-stringify": { + "version": "2.5.0", + "resolved": "https://registry.npmjs.org/safe-stable-stringify/-/safe-stable-stringify-2.5.0.tgz", + "integrity": "sha512-b3rppTKm9T+PsVCBEOUR46GWI7fdOs00VKZ1+9c1EWDaDMvjQc6tUwuFyIprgGgTcWoVHSKrU8H31ZHA2e0RHA==", + "license": "MIT", + "engines": { + "node": ">=10" + } + }, "node_modules/safer-buffer": { "version": "2.1.2", "resolved": "https://registry.npmjs.org/safer-buffer/-/safer-buffer-2.1.2.tgz", @@ -4812,6 +4914,15 @@ "node": ">= 14" } }, + "node_modules/sonic-boom": { + "version": "4.2.1", + "resolved": "https://registry.npmjs.org/sonic-boom/-/sonic-boom-4.2.1.tgz", + "integrity": "sha512-w6AxtubXa2wTXAUsZMMWERrsIRAdrK0Sc+FUytWvYAhBJLyuI4llrMIC1DtlNSdI99EI86KZum2MMq3EAZlF9Q==", + "license": "MIT", + "dependencies": { + "atomic-sleep": "^1.0.0" + } + }, "node_modules/source-map": { "version": "0.6.1", "resolved": "https://registry.npmjs.org/source-map/-/source-map-0.6.1.tgz", @@ -4822,6 +4933,15 @@ "node": ">=0.10.0" } }, + "node_modules/split2": { + "version": "4.2.0", + "resolved": "https://registry.npmjs.org/split2/-/split2-4.2.0.tgz", + "integrity": "sha512-UcjcJOWknrNkF6PLX83qcHM6KHgVKNkV62Y8a5uYDVv9ydGQVwAHMKqHdJje1VTWpljG0WYpCDhrCdAOYH4TWg==", + "license": "ISC", + "engines": { + "node": ">= 10.x" + } + }, "node_modules/sshpk": { "version": "1.18.0", "resolved": "https://registry.npmjs.org/sshpk/-/sshpk-1.18.0.tgz", @@ -5066,6 +5186,15 @@ "b4a": "^1.6.4" } }, + "node_modules/thread-stream": { + "version": "3.1.0", + "resolved": "https://registry.npmjs.org/thread-stream/-/thread-stream-3.1.0.tgz", + "integrity": "sha512-OqyPZ9u96VohAyMfJykzmivOrY2wfMSf3C5TtFJVgN+Hm6aj+voFhlK+kZEIv2FBh1X6Xp3DlnCOfEQ3B2J86A==", + "license": "MIT", + "dependencies": { + "real-require": "^0.2.0" + } + }, "node_modules/tinycolor2": { "version": "1.6.0", "resolved": "https://registry.npmjs.org/tinycolor2/-/tinycolor2-1.6.0.tgz", diff --git a/package.json b/package.json index 84a98dc7..7ac3af2c 100644 --- a/package.json +++ b/package.json @@ -25,6 +25,7 @@ "ink-spinner": "^5.0.0", "jsonrepair": "^3.13.2", "node-telegram-bot-api": "^0.67.0", + "pino": "^9.14.0", "qrcode-terminal": "^0.12.0", "react": "^19.2.4", "whatsapp-web.js": "^1.34.6" diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index 00304525..c4a36fad 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -12,6 +12,7 @@ import path from 'path'; import https from 'https'; import http from 'http'; import { ensureSenderPaired } from '../lib/pairing'; +import { createLogger, excerptText, logError } from '../lib/logging'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); const API_BASE = `http://localhost:${API_PORT}`; @@ -22,13 +23,13 @@ const TINYCLAW_HOME = process.env.TINYCLAW_HOME || (fs.existsSync(path.join(_localTinyclaw, 'settings.json')) ? _localTinyclaw : path.join(require('os').homedir(), '.tinyclaw')); -const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/discord.log'); const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); +const logger = createLogger({ runtime: 'discord', source: 'discord', component: 'client' }); // Ensure directories exist -[path.dirname(LOG_FILE), FILES_DIR].forEach(dir => { +[FILES_DIR].forEach(dir => { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } @@ -37,7 +38,7 @@ const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); // Validate bot token const DISCORD_BOT_TOKEN = process.env.DISCORD_BOT_TOKEN; if (!DISCORD_BOT_TOKEN || DISCORD_BOT_TOKEN === 'your_token_here') { - console.error('ERROR: DISCORD_BOT_TOKEN is not set in .env file'); + logger.error('DISCORD_BOT_TOKEN is not set in .env file'); process.exit(1); } @@ -96,14 +97,6 @@ function downloadFile(url: string, destPath: string): Promise { const pendingMessages = new Map(); let processingOutgoingQueue = false; -// Logger -function log(level: string, message: string): void { - const timestamp = new Date().toISOString(); - const logMessage = `[${timestamp}] [${level}] ${message}\n`; - console.log(logMessage.trim()); - fs.appendFileSync(LOG_FILE, logMessage); -} - // Load teams from settings for /team command function getTeamListText(): string { try { @@ -209,8 +202,8 @@ const client = new Client({ // Client ready client.on(Events.ClientReady, (readyClient) => { - log('INFO', `Discord bot connected as ${readyClient.user.tag}`); - log('INFO', 'Listening for DMs...'); + logger.info({ context: { userTag: readyClient.user.tag } }, 'Discord bot connected'); + logger.info('Listening for DMs'); }); // Message received - Write to queue @@ -250,31 +243,37 @@ client.on(Events.MessageCreate, async (message: Message) => { await downloadFile(attachment.url, localPath); downloadedFiles.push(localPath); - log('INFO', `Downloaded attachment: ${path.basename(localPath)} (${attachment.contentType || 'unknown'})`); + logger.info({ messageId, context: { file: path.basename(localPath), contentType: attachment.contentType || 'unknown' } }, 'Downloaded attachment'); } catch (dlErr) { - log('ERROR', `Failed to download attachment ${attachment.name}: ${(dlErr as Error).message}`); + logError(logger, dlErr, 'Failed to download attachment', { messageId, attachmentName: attachment.name || undefined }); } } } let messageText = message.content || ''; - log('INFO', `Message from ${sender}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); + logger.info({ + channel: 'discord', + sender, + messageId, + excerpt: excerptText(messageText || '[attachment only]'), + context: { fileCount: downloadedFiles.length, senderId: message.author.id }, + }, 'Message received'); const pairing = ensureSenderPaired(PAIRING_FILE, 'discord', message.author.id, sender); if (!pairing.approved && pairing.code) { if (pairing.isNewPending) { - log('INFO', `Blocked unpaired Discord sender ${sender} (${message.author.id}) with code ${pairing.code}`); + logger.info({ channel: 'discord', sender, context: { senderId: message.author.id, pairingCode: pairing.code } }, 'Blocked unpaired sender'); await message.reply(pairingMessage(pairing.code)); } else { - log('INFO', `Blocked pending Discord sender ${sender} (${message.author.id}) without re-sending pairing message`); + logger.info({ channel: 'discord', sender, context: { senderId: message.author.id } }, 'Blocked pending sender without re-sending pairing message'); } return; } // Check for agent list command if (message.content.trim().match(/^[!/]agent$/i)) { - log('INFO', 'Agent list command received'); + logger.info({ channel: 'discord', sender, messageId }, 'Agent list command received'); const agentList = getAgentListText(); await message.reply(agentList); return; @@ -282,7 +281,7 @@ client.on(Events.MessageCreate, async (message: Message) => { // Check for team list command if (message.content.trim().match(/^[!/]team$/i)) { - log('INFO', 'Team list command received'); + logger.info({ channel: 'discord', sender, messageId }, 'Team list command received'); const teamList = getTeamListText(); await message.reply(teamList); return; @@ -295,7 +294,7 @@ client.on(Events.MessageCreate, async (message: Message) => { return; } if (resetMatch) { - log('INFO', 'Per-agent reset command received'); + logger.info({ channel: 'discord', sender, messageId }, 'Per-agent reset command received'); const agentArgs = resetMatch[1].split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()); try { const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); @@ -322,7 +321,7 @@ client.on(Events.MessageCreate, async (message: Message) => { // Check for restart command if (message.content.trim().match(/^[!/]restart$/i)) { - log('INFO', 'Restart command received'); + logger.info({ channel: 'discord', sender, messageId }, 'Restart command received'); await message.reply('Restarting TinyClaw...'); const { exec } = require('child_process'); exec(`"${path.join(SCRIPT_DIR, 'tinyclaw.sh')}" restart`, { detached: true, stdio: 'ignore' }); @@ -353,7 +352,7 @@ client.on(Events.MessageCreate, async (message: Message) => { }), }); - log('INFO', `Queued message ${messageId}`); + logger.info({ channel: 'discord', sender, messageId, context: { fileCount: downloadedFiles.length, senderId: message.author.id } }, 'Queued message'); // Store pending message for response pendingMessages.set(messageId, { @@ -371,7 +370,7 @@ client.on(Events.MessageCreate, async (message: Message) => { } } catch (error) { - log('ERROR', `Message handling error: ${(error as Error).message}`); + logError(logger, error, 'Message handling error'); } }); @@ -405,7 +404,7 @@ async function checkOutgoingQueue(): Promise { const user = await client.users.fetch(senderId); dmChannel = await user.createDM(); } catch (err) { - log('ERROR', `Could not open DM for senderId ${senderId}: ${(err as Error).message}`); + logError(logger, err, 'Could not open DM for senderId', { senderId, messageId }); } } @@ -418,12 +417,12 @@ async function checkOutgoingQueue(): Promise { if (!fs.existsSync(file)) continue; attachments.push(new AttachmentBuilder(file)); } catch (fileErr) { - log('ERROR', `Failed to prepare file ${file}: ${(fileErr as Error).message}`); + logError(logger, fileErr, 'Failed to prepare file for Discord', { messageId, file }); } } if (attachments.length > 0) { await dmChannel.send({ files: attachments }); - log('INFO', `Sent ${attachments.length} file(s) to Discord`); + logger.info({ channel: 'discord', sender, messageId, context: { fileCount: attachments.length } }, 'Sent files to Discord'); } } @@ -443,21 +442,30 @@ async function checkOutgoingQueue(): Promise { } } - log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); + logger.info({ + channel: 'discord', + sender, + messageId, + context: { + kind: pending ? 'response' : 'proactive message', + responseLength: responseText.length, + fileCount: files.length, + }, + }, 'Sent outbound message'); if (pending) pendingMessages.delete(messageId); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } else { - log('WARN', `No pending message for ${messageId} and no senderId, acking`); + logger.warn({ channel: 'discord', sender, messageId, context: { senderId } }, 'No pending message and no senderId; acking'); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } } catch (error) { - log('ERROR', `Error processing response ${resp.id}: ${(error as Error).message}`); + logError(logger, error, 'Error processing Discord response', { responseId: resp.id }); // Don't ack on error, will retry next poll } } } catch (error) { - log('ERROR', `Outgoing queue error: ${(error as Error).message}`); + logError(logger, error, 'Outgoing queue error'); } finally { processingOutgoingQueue = false; } @@ -477,25 +485,25 @@ setInterval(() => { // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { - log('ERROR', `Unhandled rejection: ${reason}`); + logError(logger, reason, 'Unhandled rejection'); }); process.on('uncaughtException', (error) => { - log('ERROR', `Uncaught exception: ${error.message}\n${error.stack}`); + logError(logger, error, 'Uncaught exception'); }); // Graceful shutdown process.on('SIGINT', () => { - log('INFO', 'Shutting down Discord client...'); + logger.info('Shutting down Discord client'); client.destroy(); process.exit(0); }); process.on('SIGTERM', () => { - log('INFO', 'Shutting down Discord client...'); + logger.info('Shutting down Discord client'); client.destroy(); process.exit(0); }); // Start client -log('INFO', 'Starting Discord client...'); +logger.info('Starting Discord client'); client.login(DISCORD_BOT_TOKEN); diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index aa73b8d8..cadfe20c 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -14,6 +14,7 @@ import path from 'path'; import https from 'https'; import http from 'http'; import { ensureSenderPaired } from '../lib/pairing'; +import { createLogger, excerptText, logError } from '../lib/logging'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); const API_BASE = `http://localhost:${API_PORT}`; @@ -24,13 +25,13 @@ const TINYCLAW_HOME = process.env.TINYCLAW_HOME || (fs.existsSync(path.join(_localTinyclaw, 'settings.json')) ? _localTinyclaw : path.join(require('os').homedir(), '.tinyclaw')); -const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/telegram.log'); const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); +const logger = createLogger({ runtime: 'telegram', source: 'telegram', component: 'client' }); // Ensure directories exist -[path.dirname(LOG_FILE), FILES_DIR].forEach(dir => { +[FILES_DIR].forEach(dir => { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } @@ -39,7 +40,7 @@ const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); // Validate bot token const TELEGRAM_BOT_TOKEN = process.env.TELEGRAM_BOT_TOKEN; if (!TELEGRAM_BOT_TOKEN || TELEGRAM_BOT_TOKEN === 'your_token_here') { - console.error('ERROR: TELEGRAM_BOT_TOKEN is not set in .env file'); + logger.error('TELEGRAM_BOT_TOKEN is not set in .env file'); process.exit(1); } @@ -80,14 +81,6 @@ let processingOutgoingQueue = false; let lastPollingActivity = Date.now(); let pollingRestartInProgress = false; -// Logger -function log(level: string, message: string): void { - const timestamp = new Date().toISOString(); - const logMessage = `[${timestamp}] [${level}] ${message}\n`; - console.log(logMessage.trim()); - fs.appendFileSync(LOG_FILE, logMessage); -} - // Load teams from settings for /team command function getTeamListText(): string { try { @@ -185,7 +178,7 @@ async function sendTelegramMessage( throw error; } - log('WARN', 'Failed to parse Telegram Markdown, retrying without Markdown parsing'); + logger.warn('Failed to parse Telegram Markdown, retrying without Markdown parsing'); await bot.sendMessage(chatId, text, options); } } @@ -231,10 +224,10 @@ async function downloadTelegramFile(fileId: string, ext: string, messageId: stri const localPath = buildUniqueFilePath(FILES_DIR, filename); await downloadFile(url, localPath); - log('INFO', `Downloaded file: ${path.basename(localPath)}`); + logger.info({ messageId, context: { file: path.basename(localPath) } }, 'Downloaded file'); return localPath; } catch (error) { - log('ERROR', `Failed to download file: ${(error as Error).message}`); + logError(logger, error, 'Failed to download file', { messageId, fileId }); return null; } } @@ -264,7 +257,7 @@ const bot = new TelegramBot(TELEGRAM_BOT_TOKEN, { polling: true }); // Bot ready bot.getMe().then(async (me: TelegramBot.User) => { - log('INFO', `Telegram bot connected as @${me.username}`); + logger.info({ context: { username: me.username } }, 'Telegram bot connected'); lastPollingActivity = Date.now(); // Register bot commands so they appear in Telegram's "/" menu @@ -273,11 +266,11 @@ bot.getMe().then(async (me: TelegramBot.User) => { { command: 'team', description: 'List available teams' }, { command: 'reset', description: 'Reset conversation history' }, { command: 'restart', description: 'Restart TinyClaw' }, - ]).catch((err: Error) => log('WARN', `Failed to register commands: ${err.message}`)); + ]).catch((err: Error) => logger.warn({ context: { error: err.message } }, 'Failed to register commands')); - log('INFO', 'Listening for messages...'); + logger.info('Listening for messages'); }).catch((err: Error) => { - log('ERROR', `Failed to connect: ${err.message}`); + logError(logger, err, 'Failed to connect Telegram bot'); process.exit(1); }); @@ -357,24 +350,30 @@ bot.on('message', async (msg: TelegramBot.Message) => { : 'Unknown'; const senderId = msg.chat.id.toString(); - log('INFO', `Message from ${sender}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); + logger.info({ + channel: 'telegram', + sender, + messageId: queueMessageId, + excerpt: excerptText(messageText || '[media only]'), + context: { fileCount: downloadedFiles.length }, + }, 'Message received'); const pairing = ensureSenderPaired(PAIRING_FILE, 'telegram', senderId, sender); if (!pairing.approved && pairing.code) { if (pairing.isNewPending) { - log('INFO', `Blocked unpaired Telegram sender ${sender} (${senderId}) with code ${pairing.code}`); + logger.info({ channel: 'telegram', sender, context: { senderId, pairingCode: pairing.code } }, 'Blocked unpaired sender'); await bot.sendMessage(msg.chat.id, pairingMessage(pairing.code), { reply_to_message_id: msg.message_id, }); } else { - log('INFO', `Blocked pending Telegram sender ${sender} (${senderId}) without re-sending pairing message`); + logger.info({ channel: 'telegram', sender, context: { senderId } }, 'Blocked pending sender without re-sending pairing message'); } return; } // Check for agent list command if (msg.text && msg.text.trim().match(/^[!/]agent$/i)) { - log('INFO', 'Agent list command received'); + logger.info({ channel: 'telegram', sender, messageId: queueMessageId }, 'Agent list command received'); const agentList = getAgentListText(); await bot.sendMessage(msg.chat.id, agentList, { reply_to_message_id: msg.message_id, @@ -384,7 +383,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { // Check for team list command if (msg.text && msg.text.trim().match(/^[!/]team$/i)) { - log('INFO', 'Team list command received'); + logger.info({ channel: 'telegram', sender, messageId: queueMessageId }, 'Team list command received'); const teamList = getTeamListText(); await bot.sendMessage(msg.chat.id, teamList, { reply_to_message_id: msg.message_id, @@ -401,7 +400,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { return; } if (resetMatch) { - log('INFO', 'Per-agent reset command received'); + logger.info({ channel: 'telegram', sender, messageId: queueMessageId }, 'Per-agent reset command received'); const agentArgs = resetMatch[1].split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()); try { const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); @@ -432,7 +431,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { // Check for restart command if (messageText.trim().match(/^[!/]restart$/i)) { - log('INFO', 'Restart command received'); + logger.info({ channel: 'telegram', sender, messageId: queueMessageId }, 'Restart command received'); await bot.sendMessage(msg.chat.id, 'Restarting TinyClaw...', { reply_to_message_id: msg.message_id, }); @@ -465,7 +464,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { }), }); - log('INFO', `Queued message ${queueMessageId}`); + logger.info({ channel: 'telegram', sender, messageId: queueMessageId, context: { senderId, fileCount: downloadedFiles.length } }, 'Queued message'); // Store pending message for response pendingMessages.set(queueMessageId, { @@ -483,7 +482,7 @@ bot.on('message', async (msg: TelegramBot.Message) => { } } catch (error) { - log('ERROR', `Message handling error: ${(error as Error).message}`); + logError(logger, error, 'Message handling error'); } }); @@ -528,9 +527,9 @@ async function checkOutgoingQueue(): Promise { } else { await bot.sendDocument(targetChatId, file); } - log('INFO', `Sent file to Telegram: ${path.basename(file)}`); + logger.info({ channel: 'telegram', sender, messageId, context: { file: path.basename(file) } }, 'Sent file to Telegram'); } catch (fileErr) { - log('ERROR', `Failed to send file ${file}: ${(fileErr as Error).message}`); + logError(logger, fileErr, 'Failed to send file to Telegram', { messageId, file }); } } } @@ -552,21 +551,30 @@ async function checkOutgoingQueue(): Promise { } } - log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); + logger.info({ + channel: 'telegram', + sender, + messageId, + context: { + kind: pending ? 'response' : 'proactive message', + responseLength: responseText.length, + fileCount: files.length, + }, + }, 'Sent outbound message'); if (pending) pendingMessages.delete(messageId); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } else { - log('WARN', `No pending message for ${messageId} and no valid senderId, acking`); + logger.warn({ channel: 'telegram', sender, messageId, context: { senderId } }, 'No pending message and no valid senderId; acking'); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } } catch (error) { - log('ERROR', `Error processing response ${resp.id}: ${(error as Error).message}`); + logError(logger, error, 'Error processing Telegram response', { responseId: resp.id }); // Don't ack on error, will retry next poll } } } catch (error) { - log('ERROR', `Outgoing queue error: ${(error as Error).message}`); + logError(logger, error, 'Outgoing queue error'); } finally { processingOutgoingQueue = false; @@ -588,27 +596,27 @@ setInterval(() => { // Restart polling with proper cleanup to avoid duplicate polling loops async function restartPolling(reason: string, delayMs = 5000): Promise { if (pollingRestartInProgress) { - log('INFO', `Polling restart already in progress, skipping (${reason})`); + logger.info({ context: { reason } }, 'Polling restart already in progress, skipping'); return; } pollingRestartInProgress = true; - log('WARN', `${reason} — stopping polling, will restart in ${delayMs / 1000}s...`); + logger.warn({ context: { reason, delayMs } }, 'Stopping polling before restart'); try { await bot.stopPolling(); } catch (e) { - log('WARN', `stopPolling error (ignored): ${(e as Error).message}`); + logError(logger, e, 'stopPolling error (ignored)'); } await new Promise(resolve => setTimeout(resolve, delayMs)); try { - log('INFO', `Restarting polling (${reason})...`); + logger.info({ context: { reason } }, 'Restarting polling'); await bot.startPolling(); lastPollingActivity = Date.now(); - log('INFO', 'Polling restarted successfully'); + logger.info('Polling restarted successfully'); } catch (e) { - log('ERROR', `Failed to restart polling: ${(e as Error).message}`); + logError(logger, e, 'Failed to restart polling', { reason }); } finally { pollingRestartInProgress = false; } @@ -616,7 +624,7 @@ async function restartPolling(reason: string, delayMs = 5000): Promise { // Handle polling errors with automatic recovery bot.on('polling_error', (error: Error) => { - log('ERROR', `Polling error: ${error.message}`); + logError(logger, error, 'Polling error'); // ETELEGRAM 409 = another instance running (stale connection after sleep) // EFATAL = unrecoverable @@ -637,7 +645,7 @@ setInterval(async () => { await bot.getMe(); // API works fine — polling is just idle (no messages). Reset timer. lastPollingActivity = Date.now(); - log('INFO', `Watchdog: no messages for ${Math.round(silentMs / 1000)}s but API reachable, polling is healthy`); + logger.info({ context: { silentSeconds: Math.round(silentMs / 1000) } }, 'Watchdog check passed; polling is healthy'); } catch { // API unreachable — polling is actually broken, restart it restartPolling(`No polling activity for ${Math.round(silentMs / 1000)}s and API unreachable (watchdog)`, 5000); @@ -647,24 +655,24 @@ setInterval(async () => { // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { - log('ERROR', `Unhandled rejection: ${reason}`); + logError(logger, reason, 'Unhandled rejection'); }); process.on('uncaughtException', (error) => { - log('ERROR', `Uncaught exception: ${error.message}\n${error.stack}`); + logError(logger, error, 'Uncaught exception'); }); // Graceful shutdown process.on('SIGINT', () => { - log('INFO', 'Shutting down Telegram client...'); + logger.info('Shutting down Telegram client'); bot.stopPolling(); process.exit(0); }); process.on('SIGTERM', () => { - log('INFO', 'Shutting down Telegram client...'); + logger.info('Shutting down Telegram client'); bot.stopPolling(); process.exit(0); }); // Start -log('INFO', 'Starting Telegram client...'); +logger.info('Starting Telegram client'); diff --git a/src/channels/whatsapp-client.ts b/src/channels/whatsapp-client.ts index fe0c6a71..95595956 100644 --- a/src/channels/whatsapp-client.ts +++ b/src/channels/whatsapp-client.ts @@ -10,6 +10,7 @@ import qrcode from 'qrcode-terminal'; import fs from 'fs'; import path from 'path'; import { ensureSenderPaired } from '../lib/pairing'; +import { createLogger, excerptText, logError } from '../lib/logging'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); const API_BASE = `http://localhost:${API_PORT}`; @@ -20,14 +21,14 @@ const TINYCLAW_HOME = process.env.TINYCLAW_HOME || (fs.existsSync(path.join(_localTinyclaw, 'settings.json')) ? _localTinyclaw : path.join(require('os').homedir(), '.tinyclaw')); -const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/whatsapp.log'); const SESSION_DIR = path.join(SCRIPT_DIR, '.tinyclaw/whatsapp-session'); const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); const PAIRING_FILE = path.join(TINYCLAW_HOME, 'pairing.json'); +const logger = createLogger({ runtime: 'whatsapp', source: 'whatsapp', component: 'client' }); // Ensure directories exist -[path.dirname(LOG_FILE), SESSION_DIR, FILES_DIR].forEach(dir => { +[SESSION_DIR, FILES_DIR].forEach(dir => { if (!fs.existsSync(dir)) { fs.mkdirSync(dir, { recursive: true }); } @@ -78,10 +79,10 @@ async function downloadWhatsAppMedia(message: Message, queueMessageId: string): // Write base64 data to file fs.writeFileSync(localPath, Buffer.from(media.data, 'base64')); - log('INFO', `Downloaded media: ${filename} (${media.mimetype})`); + logger.info({ messageId: queueMessageId, context: { file: filename, mimeType: media.mimetype } }, 'Downloaded media'); return localPath; } catch (error) { - log('ERROR', `Failed to download media: ${(error as Error).message}`); + logError(logger, error, 'Failed to download media', { messageId: queueMessageId }); return null; } } @@ -90,14 +91,6 @@ async function downloadWhatsAppMedia(message: Message, queueMessageId: string): const pendingMessages = new Map(); let processingOutgoingQueue = false; -// Logger -function log(level: string, message: string): void { - const timestamp = new Date().toISOString(); - const logMessage = `[${timestamp}] [${level}] ${message}\n`; - console.log(logMessage.trim()); - fs.appendFileSync(LOG_FILE, logMessage); -} - // Load teams from settings for /team command function getTeamListText(): string { try { @@ -174,7 +167,7 @@ const client = new Client({ // QR Code for authentication client.on('qr', (qr: string) => { - log('INFO', 'Scan this QR code with WhatsApp:'); + logger.info('Scan this QR code with WhatsApp'); console.log('\n'); // Display in tmux pane @@ -188,22 +181,22 @@ client.on('qr', (qr: string) => { const qrFile = path.join(channelsDir, 'whatsapp_qr.txt'); qrcode.generate(qr, { small: true }, (code: string) => { fs.writeFileSync(qrFile, code); - log('INFO', 'QR code saved to .tinyclaw/channels/whatsapp_qr.txt'); + logger.info('QR code saved to .tinyclaw/channels/whatsapp_qr.txt'); }); console.log('\n'); - log('INFO', 'Open WhatsApp → Settings → Linked Devices → Link a Device'); + logger.info('Open WhatsApp → Settings → Linked Devices → Link a Device'); }); // Authentication success client.on('authenticated', () => { - log('INFO', 'WhatsApp authenticated successfully!'); + logger.info('WhatsApp authenticated successfully'); }); // Client ready client.on('ready', () => { - log('INFO', '✓ WhatsApp client connected and ready!'); - log('INFO', 'Listening for messages...'); + logger.info('WhatsApp client connected and ready'); + logger.info('Listening for messages'); // Create ready flag for tinyclaw.sh const readyFile = path.join(TINYCLAW_HOME, 'channels/whatsapp_ready'); @@ -259,22 +252,28 @@ client.on('message_create', async (message: Message) => { return; } - log('INFO', `📱 Message from ${sender}: ${messageText.substring(0, 50)}${downloadedFiles.length > 0 ? ` [+${downloadedFiles.length} file(s)]` : ''}...`); + logger.info({ + channel: 'whatsapp', + sender, + messageId, + excerpt: excerptText(messageText || '[media only]'), + context: { fileCount: downloadedFiles.length, senderId: message.from }, + }, 'Message received'); const pairing = ensureSenderPaired(PAIRING_FILE, 'whatsapp', message.from, sender); if (!pairing.approved && pairing.code) { if (pairing.isNewPending) { - log('INFO', `Blocked unpaired WhatsApp sender ${sender} (${message.from}) with code ${pairing.code}`); + logger.info({ channel: 'whatsapp', sender, context: { senderId: message.from, pairingCode: pairing.code } }, 'Blocked unpaired sender'); await message.reply(pairingMessage(pairing.code)); } else { - log('INFO', `Blocked pending WhatsApp sender ${sender} (${message.from}) without re-sending pairing message`); + logger.info({ channel: 'whatsapp', sender, context: { senderId: message.from } }, 'Blocked pending sender without re-sending pairing message'); } return; } // Check for agent list command if (message.body.trim().match(/^[!/]agent$/i)) { - log('INFO', 'Agent list command received'); + logger.info({ channel: 'whatsapp', sender, messageId }, 'Agent list command received'); const agentList = getAgentListText(); await message.reply(agentList); return; @@ -282,7 +281,7 @@ client.on('message_create', async (message: Message) => { // Check for team list command if (message.body.trim().match(/^[!/]team$/i)) { - log('INFO', 'Team list command received'); + logger.info({ channel: 'whatsapp', sender, messageId }, 'Team list command received'); const teamList = getTeamListText(); await message.reply(teamList); return; @@ -295,7 +294,7 @@ client.on('message_create', async (message: Message) => { return; } if (resetMatch) { - log('INFO', 'Per-agent reset command received'); + logger.info({ channel: 'whatsapp', sender, messageId }, 'Per-agent reset command received'); const agentArgs = resetMatch[1].split(/\s+/).map(a => a.replace(/^@/, '').toLowerCase()); try { const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); @@ -322,7 +321,7 @@ client.on('message_create', async (message: Message) => { // Check for restart command if (messageText.trim().match(/^[!/]restart$/i)) { - log('INFO', 'Restart command received'); + logger.info({ channel: 'whatsapp', sender, messageId }, 'Restart command received'); await message.reply('Restarting TinyClaw...'); const { exec } = require('child_process'); exec(`"${path.join(SCRIPT_DIR, 'tinyclaw.sh')}" restart`, { detached: true, stdio: 'ignore' }); @@ -353,7 +352,7 @@ client.on('message_create', async (message: Message) => { }), }); - log('INFO', `✓ Queued message ${messageId}`); + logger.info({ channel: 'whatsapp', sender, messageId, context: { fileCount: downloadedFiles.length, senderId: message.from } }, 'Queued message'); // Store pending message for response pendingMessages.set(messageId, { @@ -371,7 +370,7 @@ client.on('message_create', async (message: Message) => { } } catch (error) { - log('ERROR', `Message handling error: ${(error as Error).message}`); + logError(logger, error, 'Message handling error'); } }); @@ -405,7 +404,7 @@ async function checkOutgoingQueue(): Promise { const chatId = senderId.includes('@') ? senderId : `${senderId}@c.us`; targetChat = await client.getChatById(chatId); } catch (err) { - log('ERROR', `Could not get chat for senderId ${senderId}: ${(err as Error).message}`); + logError(logger, err, 'Could not get chat for senderId', { senderId, messageId }); } } @@ -417,9 +416,9 @@ async function checkOutgoingQueue(): Promise { if (!fs.existsSync(file)) continue; const media = MessageMedia.fromFilePath(file); await targetChat.sendMessage(media); - log('INFO', `Sent file to WhatsApp: ${path.basename(file)}`); + logger.info({ channel: 'whatsapp', sender, messageId, context: { file: path.basename(file) } }, 'Sent file to WhatsApp'); } catch (fileErr) { - log('ERROR', `Failed to send file ${file}: ${(fileErr as Error).message}`); + logError(logger, fileErr, 'Failed to send file to WhatsApp', { messageId, file }); } } } @@ -433,21 +432,30 @@ async function checkOutgoingQueue(): Promise { } } - log('INFO', `Sent ${pending ? 'response' : 'proactive message'} to ${sender} (${responseText.length} chars${files.length > 0 ? `, ${files.length} file(s)` : ''})`); + logger.info({ + channel: 'whatsapp', + sender, + messageId, + context: { + kind: pending ? 'response' : 'proactive message', + responseLength: responseText.length, + fileCount: files.length, + }, + }, 'Sent outbound message'); if (pending) pendingMessages.delete(messageId); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } else { - log('WARN', `No pending message for ${messageId} and no senderId, acking`); + logger.warn({ channel: 'whatsapp', sender, messageId, context: { senderId } }, 'No pending message and no senderId; acking'); await fetch(`${API_BASE}/api/responses/${resp.id}/ack`, { method: 'POST' }); } } catch (error) { - log('ERROR', `Error processing response ${resp.id}: ${(error as Error).message}`); + logError(logger, error, 'Error processing WhatsApp response', { responseId: resp.id }); // Don't ack on error, will retry next poll } } } catch (error) { - log('ERROR', `Outgoing queue error: ${(error as Error).message}`); + logError(logger, error, 'Outgoing queue error'); } finally { processingOutgoingQueue = false; } @@ -458,12 +466,12 @@ setInterval(checkOutgoingQueue, 1000); // Error handlers client.on('auth_failure', (msg: string) => { - log('ERROR', `Authentication failure: ${msg}`); + logger.error({ context: { reason: msg } }, 'Authentication failure'); process.exit(1); }); client.on('disconnected', (reason: string) => { - log('WARN', `WhatsApp disconnected: ${reason}, attempting reconnect in 10s...`); + logger.warn({ context: { reason } }, 'WhatsApp disconnected; attempting reconnect in 10s'); // Remove ready flag const readyFile = path.join(TINYCLAW_HOME, 'channels/whatsapp_ready'); @@ -472,22 +480,22 @@ client.on('disconnected', (reason: string) => { } setTimeout(() => { - log('INFO', 'Reconnecting WhatsApp client...'); + logger.info('Reconnecting WhatsApp client'); client.initialize(); }, 10000); }); // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { - log('ERROR', `Unhandled rejection: ${reason}`); + logError(logger, reason, 'Unhandled rejection'); }); process.on('uncaughtException', (error) => { - log('ERROR', `Uncaught exception: ${error.message}\n${error.stack}`); + logError(logger, error, 'Uncaught exception'); }); // Graceful shutdown process.on('SIGINT', async () => { - log('INFO', 'Shutting down WhatsApp client...'); + logger.info('Shutting down WhatsApp client'); // Remove ready flag const readyFile = path.join(TINYCLAW_HOME, 'channels/whatsapp_ready'); @@ -500,7 +508,7 @@ process.on('SIGINT', async () => { }); process.on('SIGTERM', async () => { - log('INFO', 'Shutting down WhatsApp client...'); + logger.info('Shutting down WhatsApp client'); // Remove ready flag const readyFile = path.join(TINYCLAW_HOME, 'channels/whatsapp_ready'); @@ -513,5 +521,5 @@ process.on('SIGTERM', async () => { }); // Start client -log('INFO', 'Starting WhatsApp client...'); +logger.info('Starting WhatsApp client'); client.initialize(); diff --git a/src/lib/config.ts b/src/lib/config.ts index c80b33fa..6acda21b 100644 --- a/src/lib/config.ts +++ b/src/lib/config.ts @@ -9,11 +9,16 @@ export const TINYCLAW_HOME = process.env.TINYCLAW_HOME || (fs.existsSync(path.join(_localTinyclaw, 'settings.json')) ? _localTinyclaw : path.join(require('os').homedir(), '.tinyclaw')); -export const LOG_FILE = path.join(TINYCLAW_HOME, 'logs/queue.log'); +export const LOG_DIR = path.join(TINYCLAW_HOME, 'logs'); +export const LOG_FILE = path.join(LOG_DIR, 'queue.log'); export const SETTINGS_FILE = path.join(TINYCLAW_HOME, 'settings.json'); export const CHATS_DIR = path.join(TINYCLAW_HOME, 'chats'); export const FILES_DIR = path.join(TINYCLAW_HOME, 'files'); +function writeStderr(message: string): void { + process.stderr.write(`${message}\n`); +} + export function getSettings(): Settings { try { const settingsData = fs.readFileSync(SETTINGS_FILE, 'utf8'); @@ -23,7 +28,7 @@ export function getSettings(): Settings { settings = JSON.parse(settingsData); } catch (parseError) { // JSON is invalid — attempt auto-fix with jsonrepair - console.error(`[WARN] settings.json contains invalid JSON: ${(parseError as Error).message}`); + writeStderr(`[WARN] settings.json contains invalid JSON: ${(parseError as Error).message}`); try { const repaired = jsonrepair(settingsData); @@ -33,9 +38,9 @@ export function getSettings(): Settings { const backupPath = SETTINGS_FILE + '.bak'; fs.copyFileSync(SETTINGS_FILE, backupPath); fs.writeFileSync(SETTINGS_FILE, JSON.stringify(settings, null, 2) + '\n'); - console.error(`[WARN] Auto-fixed settings.json (backup: ${backupPath})`); + writeStderr(`[WARN] Auto-fixed settings.json (backup: ${backupPath})`); } catch { - console.error(`[ERROR] Could not auto-fix settings.json — returning empty config`); + writeStderr('[ERROR] Could not auto-fix settings.json — returning empty config'); return {}; } } diff --git a/src/lib/conversation.ts b/src/lib/conversation.ts index 8b5a8d63..e0df3f58 100644 --- a/src/lib/conversation.ts +++ b/src/lib/conversation.ts @@ -2,12 +2,14 @@ import fs from 'fs'; import path from 'path'; import { Conversation } from './types'; import { CHATS_DIR, getSettings, getAgents } from './config'; -import { log, emitEvent } from './logging'; +import { emitEvent } from './events'; +import { createLogger, excerptText, isDebugEnabled, logError } from './logging'; import { enqueueMessage, enqueueResponse } from './db'; import { handleLongResponse, collectFiles } from './response'; // Active conversations — tracks in-flight team message passing export const conversations = new Map(); +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'conversation' }); export const MAX_CONVERSATION_MESSAGES = 50; @@ -53,7 +55,7 @@ export async function withConversationLock( */ export function incrementPending(conv: Conversation, count: number): void { conv.pending += count; - log('DEBUG', `Conversation ${conv.id}: pending incremented to ${conv.pending} (+${count})`); + logger.debug({ conversationId: conv.id, context: { pending: conv.pending, increment: count } }, 'Conversation pending incremented'); } /** @@ -62,10 +64,10 @@ export function incrementPending(conv: Conversation, count: number): void { */ export function decrementPending(conv: Conversation): boolean { conv.pending--; - log('DEBUG', `Conversation ${conv.id}: pending decremented to ${conv.pending}`); + logger.debug({ conversationId: conv.id, context: { pending: conv.pending } }, 'Conversation pending decremented'); if (conv.pending < 0) { - log('WARN', `Conversation ${conv.id}: pending went negative (${conv.pending}), resetting to 0`); + logger.warn({ conversationId: conv.id, context: { pending: conv.pending } }, 'Conversation pending went negative, resetting to 0'); conv.pending = 0; } @@ -93,7 +95,17 @@ export function enqueueInternalMessage( conversationId, fromAgent, }); - log('INFO', `Enqueued internal message: @${fromAgent} → @${targetAgent}`); + const bindings: Record = { + conversationId, + messageId, + fromAgent, + toAgent: targetAgent, + channel: originalData.channel, + }; + if (isDebugEnabled(logger)) { + bindings.excerpt = excerptText(message); + } + logger.info(bindings, 'Enqueued internal message'); } /** @@ -103,7 +115,13 @@ export function completeConversation(conv: Conversation): void { const settings = getSettings(); const agents = getAgents(settings); - log('INFO', `Conversation ${conv.id} complete — ${conv.responses.length} response(s), ${conv.totalMessages} total message(s)`); + logger.info({ + conversationId: conv.id, + channel: conv.channel, + sender: conv.sender, + teamId: conv.teamContext.teamId, + context: { responseCount: conv.responses.length, totalMessages: conv.totalMessages }, + }, 'Conversation complete'); emitEvent('team_chain_end', { teamId: conv.teamContext.teamId, totalSteps: conv.responses.length, @@ -152,9 +170,9 @@ export function completeConversation(conv: Conversation): void { const now = new Date(); const dateTime = now.toISOString().replace(/[:.]/g, '-').replace('T', '_').replace('Z', ''); fs.writeFileSync(path.join(teamChatsDir, `${dateTime}.md`), chatLines.join('\n')); - log('INFO', `Chat history saved`); + logger.info({ conversationId: conv.id, teamId: conv.teamContext.teamId }, 'Chat history saved'); } catch (e) { - log('ERROR', `Failed to save chat history: ${(e as Error).message}`); + logError(logger, e, 'Failed to save chat history', { conversationId: conv.id, teamId: conv.teamContext.teamId }); } // Detect file references @@ -184,7 +202,14 @@ export function completeConversation(conv: Conversation): void { files: allFiles.length > 0 ? allFiles : undefined, }); - log('INFO', `✓ Response ready [${conv.channel}] ${conv.sender} (${finalResponse.length} chars)`); + logger.info({ + conversationId: conv.id, + channel: conv.channel, + sender: conv.sender, + messageId: conv.messageId, + teamId: conv.teamContext.teamId, + context: { responseLength: finalResponse.length }, + }, 'Team response ready'); emitEvent('response_ready', { channel: conv.channel, sender: conv.sender, responseLength: finalResponse.length, responseText: finalResponse, messageId: conv.messageId }); // Clean up diff --git a/src/lib/events.ts b/src/lib/events.ts new file mode 100644 index 00000000..573b7c67 --- /dev/null +++ b/src/lib/events.ts @@ -0,0 +1,22 @@ +/** + * Pluggable event listeners. The API server registers an SSE listener and the + * plugin system registers event handlers here. This remains separate from the + * file-based logger so live events and historical logs can evolve independently. + */ +type EventListener = (type: string, data: Record) => void; + +const eventListeners: EventListener[] = []; + +export function onEvent(listener: EventListener): void { + eventListeners.push(listener); +} + +export function emitEvent(type: string, data: Record): void { + for (const listener of eventListeners) { + try { + listener(type, data); + } catch { + // Event subscribers should never break the runtime path. + } + } +} diff --git a/src/lib/invoke.ts b/src/lib/invoke.ts index c763939f..1cd795f9 100644 --- a/src/lib/invoke.ts +++ b/src/lib/invoke.ts @@ -3,9 +3,11 @@ import fs from 'fs'; import path from 'path'; import { AgentConfig, TeamConfig } from './types'; import { SCRIPT_DIR, resolveClaudeModel, resolveCodexModel, resolveOpenCodeModel } from './config'; -import { log } from './logging'; +import { createLogger } from './logging'; import { ensureAgentDirectory, updateAgentTeammates } from './agent'; +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'invoke' }); + export async function runCommand(command: string, args: string[], cwd?: string): Promise { return new Promise((resolve, reject) => { const env = { ...process.env }; @@ -65,7 +67,7 @@ export async function invokeAgent( const isNewAgent = !fs.existsSync(agentDir); ensureAgentDirectory(agentDir); if (isNewAgent) { - log('INFO', `Initialized agent directory with config files: ${agentDir}`); + logger.info({ agentId, context: { agentDir } }, 'Initialized agent directory with config files'); } // Update AGENTS.md with current teammate info @@ -81,12 +83,12 @@ export async function invokeAgent( const provider = agent.provider || 'anthropic'; if (provider === 'openai') { - log('INFO', `Using Codex CLI (agent: ${agentId})`); + logger.info({ agentId }, 'Using Codex CLI'); const shouldResume = !shouldReset; if (shouldReset) { - log('INFO', `🔄 Resetting Codex conversation for agent: ${agentId}`); + logger.info({ agentId }, 'Resetting Codex conversation'); } const modelId = resolveCodexModel(agent.model); @@ -122,12 +124,12 @@ export async function invokeAgent( // Model passed via --model in provider/model format (e.g. opencode/claude-sonnet-4-5). // Supports -c flag for conversation continuation (resumes last session). const modelId = resolveOpenCodeModel(agent.model); - log('INFO', `Using OpenCode CLI (agent: ${agentId}, model: ${modelId})`); + logger.info({ agentId, context: { modelId } }, 'Using OpenCode CLI'); const continueConversation = !shouldReset; if (shouldReset) { - log('INFO', `🔄 Resetting OpenCode conversation for agent: ${agentId}`); + logger.info({ agentId }, 'Resetting OpenCode conversation'); } const opencodeArgs = ['run', '--format', 'json']; @@ -158,12 +160,12 @@ export async function invokeAgent( return response || 'Sorry, I could not generate a response from OpenCode.'; } else { // Default to Claude (Anthropic) - log('INFO', `Using Claude provider (agent: ${agentId})`); + logger.info({ agentId }, 'Using Claude provider'); const continueConversation = !shouldReset; if (shouldReset) { - log('INFO', `🔄 Resetting conversation for agent: ${agentId}`); + logger.info({ agentId }, 'Resetting conversation'); } const modelId = resolveClaudeModel(agent.model); diff --git a/src/lib/logging.ts b/src/lib/logging.ts index 93cdadd1..ad051f75 100644 --- a/src/lib/logging.ts +++ b/src/lib/logging.ts @@ -1,31 +1,326 @@ import fs from 'fs'; -import { LOG_FILE } from './config'; +import path from 'path'; +import { Writable } from 'stream'; +import pino, { type Logger } from 'pino'; +import { LOG_DIR } from './config'; -export function log(level: string, message: string): void { - const timestamp = new Date().toISOString(); - const logMessage = `[${timestamp}] [${level}] ${message}\n`; - console.log(logMessage.trim()); - fs.appendFileSync(LOG_FILE, logMessage); +export type RuntimeLogFile = 'queue' | 'telegram' | 'discord' | 'whatsapp' | 'daemon' | 'heartbeat'; +export type LogSource = RuntimeLogFile | 'api'; +export type LogLevel = 'debug' | 'info' | 'warn' | 'error'; + +export interface LogEntry { + time: string; + level: LogLevel; + source: LogSource | string; + component: string; + msg: string; + channel?: string; + agentId?: string; + messageId?: string; + conversationId?: string; + fromAgent?: string; + toAgent?: string; + teamId?: string; + sender?: string; + excerpt?: string; + context?: Record; + err?: { + type?: string; + message?: string; + stack?: string; + [key: string]: unknown; + }; + [key: string]: unknown; +} + +interface CreateLoggerOptions { + runtime: RuntimeLogFile; + source?: LogSource; + component: string; + bindings?: Record; +} + +interface ReadLogsOptions { + limit?: number; + source?: string[]; + level?: string; + channel?: string; + agentId?: string; + messageId?: string; + conversationId?: string; + search?: string; +} + +const MAX_LOG_BYTES = 10 * 1024 * 1024; +const MAX_ROTATED_FILES = 5; +const LOG_LEVELS: LogLevel[] = ['debug', 'info', 'warn', 'error']; +const LEVEL_MAP: Record = { + DEBUG: 'debug', + INFO: 'info', + WARN: 'warn', + WARNING: 'warn', + ERROR: 'error', + debug: 'debug', + info: 'info', + warn: 'warn', + warning: 'warn', + error: 'error', +}; +const SOURCE_TO_RUNTIME: Record = { + queue: 'queue', + api: 'queue', + telegram: 'telegram', + discord: 'discord', + whatsapp: 'whatsapp', + daemon: 'daemon', + heartbeat: 'heartbeat', +}; + +fs.mkdirSync(LOG_DIR, { recursive: true }); + +class RotatingFileStream extends Writable { + private stream: fs.WriteStream; + + private bytesWritten: number; + + constructor(private readonly filePath: string) { + super(); + this.bytesWritten = fs.existsSync(filePath) ? fs.statSync(filePath).size : 0; + this.stream = fs.createWriteStream(filePath, { flags: 'a' }); + } + + _write(chunk: Buffer | string, encoding: BufferEncoding, callback: (error?: Error | null) => void): void { + try { + const buffer = Buffer.isBuffer(chunk) ? chunk : Buffer.from(chunk, encoding); + this.rotateIfNeeded(buffer.length); + this.stream.write(buffer, (error) => { + if (!error) { + this.bytesWritten += buffer.length; + } + callback(error ?? undefined); + }); + } catch (error) { + callback(error as Error); + } + } + + _final(callback: (error?: Error | null) => void): void { + this.stream.end(() => callback()); + } + + private rotateIfNeeded(incomingBytes: number): void { + if (this.bytesWritten + incomingBytes <= MAX_LOG_BYTES) { + return; + } + + this.stream.end(); + + for (let index = MAX_ROTATED_FILES; index >= 1; index--) { + const current = rotatedFilePath(this.filePath, index); + const previous = index === 1 ? this.filePath : rotatedFilePath(this.filePath, index - 1); + + if (!fs.existsSync(previous)) { + continue; + } + + if (fs.existsSync(current)) { + fs.unlinkSync(current); + } + + fs.renameSync(previous, current); + } + + this.bytesWritten = 0; + this.stream = fs.createWriteStream(this.filePath, { flags: 'a' }); + } } -/** - * Pluggable event listeners. The API server registers a listener so that - * every event emitted by the queue processor is also broadcast over SSE. - * The plugin system also registers a listener for plugin event handlers. - */ -type EventListener = (type: string, data: Record) => void; -const eventListeners: EventListener[] = []; +const destinations = new Map(); +const runtimeLoggers = new Map(); -/** Register a listener that is called on every emitEvent. */ -export function onEvent(listener: EventListener): void { - eventListeners.push(listener); +function rotatedFilePath(filePath: string, index: number): string { + const ext = path.extname(filePath); + const base = filePath.slice(0, filePath.length - ext.length); + return `${base}.${index}${ext}`; } -/** - * Emit a structured event — dispatched to in-memory listeners (e.g. SSE broadcast, plugins). - */ -export function emitEvent(type: string, data: Record): void { - for (const listener of eventListeners) { - try { listener(type, data); } catch { /* never break the queue processor */ } +function normalizeLevel(level?: string): LogLevel { + if (!level) { + return 'info'; } + return LEVEL_MAP[level] || 'info'; +} + +function getConfiguredLogLevel(): LogLevel { + return normalizeLevel(process.env.LOG_LEVEL); } + +function getDestination(runtime: RuntimeLogFile): RotatingFileStream { + let destination = destinations.get(runtime); + if (!destination) { + destination = new RotatingFileStream(path.join(LOG_DIR, `${runtime}.log`)); + destinations.set(runtime, destination); + } + return destination; +} + +function getRuntimeLogger(runtime: RuntimeLogFile): Logger { + let logger = runtimeLoggers.get(runtime); + if (!logger) { + logger = pino({ + level: getConfiguredLogLevel(), + base: undefined, + messageKey: 'msg', + timestamp: pino.stdTimeFunctions.isoTime, + formatters: { + level: (label) => ({ level: label }), + }, + serializers: { + err: pino.stdSerializers.err, + }, + }, getDestination(runtime)); + runtimeLoggers.set(runtime, logger); + } + return logger; +} + +export function createLogger(options: CreateLoggerOptions): Logger { + const { runtime, source = options.runtime, component, bindings = {} } = options; + return getRuntimeLogger(runtime).child({ source, component, ...bindings }); +} + +export function logAtLevel( + logger: Logger, + level: string, + msg: string, + bindings?: Record +): void { + const normalized = normalizeLevel(level); + const payload = bindings ?? {}; + if (normalized === 'debug') { + logger.debug(payload, msg); + } else if (normalized === 'warn') { + logger.warn(payload, msg); + } else if (normalized === 'error') { + logger.error(payload, msg); + } else { + logger.info(payload, msg); + } +} + +export function logError( + logger: Logger, + error: unknown, + msg: string, + context?: Record +): void { + const err = error instanceof Error ? error : new Error(String(error)); + if (context && Object.keys(context).length > 0) { + logger.error({ err, context }, msg); + return; + } + logger.error({ err }, msg); +} + +export function isDebugEnabled(logger: Logger): boolean { + return logger.isLevelEnabled('debug'); +} + +export function excerptText(value: string, maxLength = 160): string { + const compact = value.replace(/\s+/g, ' ').trim(); + if (compact.length <= maxLength) { + return compact; + } + return `${compact.slice(0, maxLength - 1)}...`; +} + +function listFilesForRuntime(runtime: RuntimeLogFile): string[] { + const filePath = path.join(LOG_DIR, `${runtime}.log`); + const files = [filePath]; + for (let index = 1; index <= MAX_ROTATED_FILES; index++) { + files.push(rotatedFilePath(filePath, index)); + } + return files.filter(file => fs.existsSync(file)); +} + +function readEntriesFromFile(filePath: string): LogEntry[] { + try { + const raw = fs.readFileSync(filePath, 'utf8'); + if (!raw.trim()) { + return []; + } + return raw + .split('\n') + .map((line) => line.trim()) + .filter(Boolean) + .flatMap((line) => { + try { + return [JSON.parse(line) as LogEntry]; + } catch { + return []; + } + }); + } catch { + return []; + } +} + +function includesSearch(entry: LogEntry, search: string): boolean { + const haystacks = [ + entry.msg, + entry.excerpt, + entry.messageId, + entry.conversationId, + entry.agentId, + entry.fromAgent, + entry.toAgent, + entry.sender, + entry.channel, + entry.teamId, + entry.err?.message, + entry.err?.stack, + entry.context ? JSON.stringify(entry.context) : '', + ]; + + return haystacks.some((item) => typeof item === 'string' && item.toLowerCase().includes(search)); +} + +export function readLogEntries(options: ReadLogsOptions = {}): LogEntry[] { + const sourceFilter = (options.source ?? []).map(item => item.trim()).filter(Boolean); + const runtimes = sourceFilter.length > 0 + ? Array.from(new Set(sourceFilter.map(source => SOURCE_TO_RUNTIME[source]).filter(Boolean))) + : (Object.keys(SOURCE_TO_RUNTIME) as Array) + .map(source => SOURCE_TO_RUNTIME[source]) + .filter((runtime, index, list) => list.indexOf(runtime) === index); + + const entries = runtimes + .flatMap(runtime => listFilesForRuntime(runtime).flatMap(readEntriesFromFile)) + .filter((entry) => { + if (sourceFilter.length > 0 && !sourceFilter.includes(String(entry.source))) { + return false; + } + if (options.level && String(entry.level) !== options.level) { + return false; + } + if (options.channel && String(entry.channel ?? '') !== options.channel) { + return false; + } + if (options.agentId && String(entry.agentId ?? '') !== options.agentId) { + return false; + } + if (options.messageId && String(entry.messageId ?? '') !== options.messageId) { + return false; + } + if (options.conversationId && String(entry.conversationId ?? '') !== options.conversationId) { + return false; + } + if (options.search && !includesSearch(entry, options.search.toLowerCase())) { + return false; + } + return true; + }) + .sort((a, b) => Date.parse(String(b.time)) - Date.parse(String(a.time))); + + return entries.slice(0, options.limit ?? 100); +} + diff --git a/src/lib/plugins.ts b/src/lib/plugins.ts index 649fae5e..df9725a4 100644 --- a/src/lib/plugins.ts +++ b/src/lib/plugins.ts @@ -8,7 +8,8 @@ import fs from 'fs'; import path from 'path'; import { TINYCLAW_HOME } from './config'; -import { log, onEvent } from './logging'; +import { onEvent } from './events'; +import { createLogger, logAtLevel, logError } from './logging'; // Types export interface PluginEvent { @@ -53,6 +54,7 @@ interface LoadedPlugin { // Internal state const loadedPlugins: LoadedPlugin[] = []; const eventHandlers = new Map void>>(); +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'plugins' }); /** * Create the plugin context passed to activate() functions. @@ -65,7 +67,11 @@ function createPluginContext(pluginName: string): PluginContext { eventHandlers.set(eventType, handlers); }, log(level: string, message: string): void { - log(level, `[plugin:${pluginName}] ${message}`); + logAtLevel( + logger.child({ context: { pluginName }, component: 'plugin' }), + level, + message + ); }, getTinyClawHome(): string { return TINYCLAW_HOME; @@ -83,7 +89,7 @@ export async function loadPlugins(): Promise { const pluginsDir = path.join(TINYCLAW_HOME, 'plugins'); if (!fs.existsSync(pluginsDir)) { - log('DEBUG', 'No plugins directory found'); + logger.debug('No plugins directory found'); return; } @@ -107,7 +113,7 @@ export async function loadPlugins(): Promise { } if (!indexPath) { - log('WARN', `Plugin '${pluginName}' has no index.js or index.ts, skipping`); + logger.warn({ context: { pluginName } }, 'Plugin has no index.js or index.ts, skipping'); continue; } @@ -128,14 +134,14 @@ export async function loadPlugins(): Promise { } loadedPlugins.push(plugin); - log('INFO', `Loaded plugin: ${pluginName}`); + logger.info({ context: { pluginName } }, 'Loaded plugin'); } catch (error) { - log('ERROR', `Failed to load plugin '${pluginName}': ${(error as Error).message}`); + logError(logger, error, 'Failed to load plugin', { pluginName }); } } if (loadedPlugins.length > 0) { - log('INFO', `${loadedPlugins.length} plugin(s) loaded`); + logger.info({ context: { loadedPlugins: loadedPlugins.length } }, 'Plugins loaded'); // Register as an event listener so all emitEvent() calls get broadcast to plugins onEvent((type, data) => { @@ -162,7 +168,7 @@ export async function runOutgoingHooks(message: string, ctx: HookContext): Promi metadata = { ...metadata, ...result.metadata }; } } catch (error) { - log('ERROR', `Plugin '${plugin.name}' transformOutgoing error: ${(error as Error).message}`); + logError(logger, error, 'Plugin transformOutgoing error', { pluginName: plugin.name }); } } } @@ -188,7 +194,7 @@ export async function runIncomingHooks(message: string, ctx: HookContext): Promi metadata = { ...metadata, ...result.metadata }; } } catch (error) { - log('ERROR', `Plugin '${plugin.name}' transformIncoming error: ${(error as Error).message}`); + logError(logger, error, 'Plugin transformIncoming error', { pluginName: plugin.name }); } } } @@ -206,7 +212,7 @@ export function broadcastEvent(event: PluginEvent): void { try { handler(event); } catch (error) { - log('ERROR', `Plugin event handler error: ${(error as Error).message}`); + logError(logger, error, 'Plugin event handler error', { eventType: event.type }); } } @@ -216,7 +222,7 @@ export function broadcastEvent(event: PluginEvent): void { try { handler(event); } catch (error) { - log('ERROR', `Plugin wildcard handler error: ${(error as Error).message}`); + logError(logger, error, 'Plugin wildcard handler error', { eventType: event.type }); } } } diff --git a/src/lib/response.ts b/src/lib/response.ts index 9cc3616a..715d1db4 100644 --- a/src/lib/response.ts +++ b/src/lib/response.ts @@ -1,9 +1,10 @@ import fs from 'fs'; import path from 'path'; import { FILES_DIR } from './config'; -import { log } from './logging'; +import { createLogger } from './logging'; export const LONG_RESPONSE_THRESHOLD = 4000; +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'response' }); /** * If a response exceeds the threshold, save full text as a .md file @@ -21,7 +22,7 @@ export function handleLongResponse( const filename = `response_${Date.now()}.md`; const filePath = path.join(FILES_DIR, filename); fs.writeFileSync(filePath, response); - log('INFO', `Long response (${response.length} chars) saved to ${filename}`); + logger.info({ context: { filename, responseLength: response.length } }, 'Long response saved to file'); // Truncate to preview const preview = response.substring(0, LONG_RESPONSE_THRESHOLD) + '\n\n_(Full response attached as file)_'; diff --git a/src/lib/routing.ts b/src/lib/routing.ts index 06efb549..49e7d766 100644 --- a/src/lib/routing.ts +++ b/src/lib/routing.ts @@ -1,6 +1,8 @@ import path from 'path'; import { AgentConfig, TeamConfig } from './types'; -import { log } from './logging'; +import { createLogger } from './logging'; + +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'routing' }); /** * Find the first team that contains the given agent. @@ -26,22 +28,27 @@ export function isTeammate( ): boolean { const team = teams[teamId]; if (!team) { - log('WARN', `isTeammate check failed: Team '${teamId}' not found`); + logger.warn({ teamId, agentId: currentAgentId, toAgent: mentionedId }, 'Teammate check failed: team not found'); return false; } if (mentionedId === currentAgentId) { - log('DEBUG', `isTeammate check failed: Self-mention (agent: ${mentionedId})`); + logger.debug({ teamId, agentId: currentAgentId, toAgent: mentionedId }, 'Teammate check failed: self mention'); return false; } if (!team.agents.includes(mentionedId)) { - log('WARN', `isTeammate check failed: Agent '${mentionedId}' not in team '${teamId}' (members: ${team.agents.join(', ')})`); + logger.warn({ + teamId, + agentId: currentAgentId, + toAgent: mentionedId, + context: { members: team.agents }, + }, 'Teammate check failed: agent not in team'); return false; } if (!agents[mentionedId]) { - log('WARN', `isTeammate check failed: Agent '${mentionedId}' not found in agents config`); + logger.warn({ teamId, agentId: currentAgentId, toAgent: mentionedId }, 'Teammate check failed: agent missing from config'); return false; } diff --git a/src/queue-processor.ts b/src/queue-processor.ts index 32bec7f1..64a219c1 100644 --- a/src/queue-processor.ts +++ b/src/queue-processor.ts @@ -21,7 +21,8 @@ import { LOG_FILE, CHATS_DIR, FILES_DIR, getSettings, getAgents, getTeams } from './lib/config'; -import { log, emitEvent } from './lib/logging'; +import { emitEvent } from './lib/events'; +import { createLogger, excerptText, isDebugEnabled, logError } from './lib/logging'; import { parseAgentRouting, findTeamForAgent, getAgentResetFlag, extractTeammateMentions } from './lib/routing'; import { invokeAgent } from './lib/invoke'; import { loadPlugins, runIncomingHooks, runOutgoingHooks } from './lib/plugins'; @@ -37,6 +38,8 @@ import { withConversationLock, incrementPending, decrementPending, } from './lib/conversation'; +const logger = createLogger({ runtime: 'queue', source: 'queue', component: 'processor' }); + // Ensure directories exist [FILES_DIR, path.dirname(LOG_FILE), CHATS_DIR].forEach(dir => { if (!fs.existsSync(dir)) { @@ -68,7 +71,18 @@ async function processMessage(dbMsg: DbMessage): Promise { fromAgent: dbMsg.from_agent ?? undefined, }; - log('INFO', `Processing [${isInternal ? 'internal' : channel}] ${isInternal ? `@${dbMsg.from_agent}→@${dbMsg.agent}` : `from ${sender}`}: ${rawMessage.substring(0, 50)}...`); + const processingBindings: Record = { + channel, + sender, + messageId, + conversationId: messageData.conversationId, + fromAgent: messageData.fromAgent, + toAgent: dbMsg.agent ?? undefined, + }; + if (isDebugEnabled(logger)) { + processingBindings.excerpt = excerptText(rawMessage); + } + logger.info(processingBindings, isInternal ? 'Processing internal message' : 'Processing inbound message'); if (!isInternal) { emitEvent('message_received', { channel, sender, message: rawMessage.substring(0, 120), messageId }); } @@ -110,7 +124,13 @@ async function processMessage(dbMsg: DbMessage): Promise { } const agent = agents[agentId]; - log('INFO', `Routing to agent: ${agent.name} (${agentId}) [${agent.provider}/${agent.model}]`); + logger.info({ + channel, + sender, + messageId, + agentId, + context: { agentName: agent.name, provider: agent.provider, model: agent.model }, + }, 'Routing to agent'); if (!isInternal) { emitEvent('agent_routed', { agentId, agentName: agent.name, provider: agent.provider, model: agent.model, isTeamRouted }); } @@ -168,7 +188,12 @@ async function processMessage(dbMsg: DbMessage): Promise { } catch (error) { const provider = agent.provider || 'anthropic'; const providerLabel = provider === 'openai' ? 'Codex' : provider === 'opencode' ? 'OpenCode' : 'Claude'; - log('ERROR', `${providerLabel} error (agent: ${agentId}): ${(error as Error).message}`); + logError(logger, error, `${providerLabel} error during agent invocation`, { + agentId, + channel, + sender, + messageId, + }); response = "Sorry, I encountered an error processing your request. Please check the queue logs."; } @@ -204,7 +229,13 @@ async function processMessage(dbMsg: DbMessage): Promise { metadata: Object.keys(metadata).length > 0 ? metadata : undefined, }); - log('INFO', `✓ Response ready [${channel}] ${sender} via agent:${agentId} (${finalResponse.length} chars)`); + logger.info({ + channel, + sender, + messageId, + agentId, + context: { responseLength: finalResponse.length }, + }, 'Response ready'); emitEvent('response_ready', { channel, sender, agentId, responseLength: finalResponse.length, responseText: finalResponse, messageId }); dbCompleteMessage(dbMsg.id); @@ -237,7 +268,14 @@ async function processMessage(dbMsg: DbMessage): Promise { pendingAgents: new Set([agentId]), }; conversations.set(convId, conv); - log('INFO', `Conversation started: ${convId} (team: ${teamContext.team.name})`); + logger.info({ + conversationId: convId, + channel, + sender, + messageId, + teamId: teamContext.teamId, + context: { teamName: teamContext.team.name }, + }, 'Conversation started'); emitEvent('team_chain_start', { teamId: teamContext.teamId, teamName: teamContext.team.name, agents: teamContext.team.agents, leader: teamContext.team.leader_agent }); } @@ -258,7 +296,18 @@ async function processMessage(dbMsg: DbMessage): Promise { conv.outgoingMentions.set(agentId, teammateMentions.length); for (const mention of teammateMentions) { conv.pendingAgents.add(mention.teammateId); - log('INFO', `@${agentId} → @${mention.teammateId}`); + const handoffBindings: Record = { + conversationId: conv.id, + channel: messageData.channel, + messageId: messageData.messageId, + teamId: conv.teamContext.teamId, + fromAgent: agentId, + toAgent: mention.teammateId, + }; + if (isDebugEnabled(logger)) { + handoffBindings.excerpt = excerptText(mention.message); + } + logger.info(handoffBindings, 'Agent handoff queued'); emitEvent('chain_handoff', { teamId: conv.teamContext.teamId, fromAgent: agentId, toAgent: mention.teammateId }); const internalMsg = `[Message from teammate @${agentId}]:\n${mention.message}`; @@ -270,7 +319,11 @@ async function processMessage(dbMsg: DbMessage): Promise { }); } } else if (teammateMentions.length > 0) { - log('WARN', `Conversation ${conv.id} hit max messages (${conv.maxMessages}) — not enqueuing further mentions`); + logger.warn({ + conversationId: conv.id, + teamId: conv.teamContext.teamId, + context: { maxMessages: conv.maxMessages }, + }, 'Conversation hit max messages; skipping further mentions'); } // This branch is done - use atomic decrement with locking @@ -280,7 +333,7 @@ async function processMessage(dbMsg: DbMessage): Promise { if (shouldComplete) { completeConversation(conv); } else { - log('INFO', `Conversation ${conv.id}: ${conv.pending} branch(es) still pending`); + logger.info({ conversationId: conv.id, context: { pending: conv.pending } }, 'Conversation branches still pending'); } }); @@ -288,7 +341,13 @@ async function processMessage(dbMsg: DbMessage): Promise { dbCompleteMessage(dbMsg.id); } catch (error) { - log('ERROR', `Processing error: ${(error as Error).message}`); + logError(logger, error, 'Processing error', { + channel: dbMsg.channel, + sender: dbMsg.sender, + messageId: dbMsg.message_id, + conversationId: dbMsg.conversation_id ?? undefined, + agentId: dbMsg.agent ?? undefined, + }); failMessage(dbMsg.id, (error as Error).message); } } @@ -316,7 +375,7 @@ async function processQueue(): Promise { const newChain = currentChain .then(() => processMessage(dbMsg)) .catch(error => { - log('ERROR', `Error processing message for agent ${agentId}: ${error.message}`); + logError(logger, error, 'Error processing message for agent', { agentId }); }); // Update the chain @@ -330,7 +389,7 @@ async function processQueue(): Promise { }); } } catch (error) { - log('ERROR', `Queue processing error: ${(error as Error).message}`); + logError(logger, error, 'Queue processing error'); } } @@ -341,16 +400,27 @@ function logAgentConfig(): void { const teams = getTeams(settings); const agentCount = Object.keys(agents).length; - log('INFO', `Loaded ${agentCount} agent(s):`); + logger.info({ context: { agentCount } }, 'Loaded agents'); for (const [id, agent] of Object.entries(agents)) { - log('INFO', ` ${id}: ${agent.name} [${agent.provider}/${agent.model}] cwd=${agent.working_directory}`); + logger.info({ + agentId: id, + context: { + agentName: agent.name, + provider: agent.provider, + model: agent.model, + workingDirectory: agent.working_directory, + }, + }, 'Agent configuration loaded'); } const teamCount = Object.keys(teams).length; if (teamCount > 0) { - log('INFO', `Loaded ${teamCount} team(s):`); + logger.info({ context: { teamCount } }, 'Loaded teams'); for (const [id, team] of Object.entries(teams)) { - log('INFO', ` ${id}: ${team.name} [agents: ${team.agents.join(', ')}] leader=${team.leader_agent}`); + logger.info({ + teamId: id, + context: { teamName: team.name, agents: team.agents, leader: team.leader_agent }, + }, 'Team configuration loaded'); } } } @@ -363,7 +433,7 @@ initQueueDb(); // Recover stale messages from previous crash const recovered = recoverStaleMessages(); if (recovered > 0) { - log('INFO', `Recovered ${recovered} stale message(s) from previous session`); + logger.info({ context: { recovered } }, 'Recovered stale messages from previous session'); } // Start the API server (passes conversations for queue status reporting) @@ -374,7 +444,7 @@ const apiServer = startApiServer(conversations); await loadPlugins(); })(); -log('INFO', 'Queue processor started (SQLite-backed)'); +logger.info('Queue processor started (SQLite-backed)'); logAgentConfig(); emitEvent('processor_start', { agents: Object.keys(getAgents(getSettings())), teams: Object.keys(getTeams(getSettings())) }); @@ -384,7 +454,7 @@ queueEvents.on('message:enqueued', () => processQueue()); // Periodic maintenance setInterval(() => { const count = recoverStaleMessages(); - if (count > 0) log('INFO', `Recovered ${count} stale message(s)`); + if (count > 0) logger.info({ context: { recovered: count } }, 'Recovered stale messages'); }, 5 * 60 * 1000); // every 5 min setInterval(() => { @@ -392,7 +462,7 @@ setInterval(() => { const cutoff = Date.now() - 30 * 60 * 1000; for (const [id, conv] of conversations.entries()) { if (conv.startTime < cutoff) { - log('WARN', `Conversation ${id} timed out after 30 min — cleaning up`); + logger.warn({ conversationId: id }, 'Conversation timed out after 30 minutes; cleaning up'); conversations.delete(id); } } @@ -400,24 +470,24 @@ setInterval(() => { setInterval(() => { const pruned = pruneAckedResponses(); - if (pruned > 0) log('INFO', `Pruned ${pruned} acked response(s)`); + if (pruned > 0) logger.info({ context: { pruned } }, 'Pruned acked responses'); }, 60 * 60 * 1000); // every 1 hr setInterval(() => { const pruned = pruneCompletedMessages(); - if (pruned > 0) log('INFO', `Pruned ${pruned} completed message(s)`); + if (pruned > 0) logger.info({ context: { pruned } }, 'Pruned completed messages'); }, 60 * 60 * 1000); // every 1 hr // Graceful shutdown process.on('SIGINT', () => { - log('INFO', 'Shutting down queue processor...'); + logger.info('Shutting down queue processor'); closeQueueDb(); apiServer.close(); process.exit(0); }); process.on('SIGTERM', () => { - log('INFO', 'Shutting down queue processor...'); + logger.info('Shutting down queue processor'); closeQueueDb(); apiServer.close(); process.exit(0); diff --git a/src/server/index.ts b/src/server/index.ts index 05c9c9f5..180752ab 100644 --- a/src/server/index.ts +++ b/src/server/index.ts @@ -12,7 +12,7 @@ import { cors } from 'hono/cors'; import { serve } from '@hono/node-server'; import { RESPONSE_ALREADY_SENT } from '@hono/node-server/utils/response'; import { Conversation } from '../lib/types'; -import { log } from '../lib/logging'; +import { createLogger, logError } from '../lib/logging'; import { addSSEClient, removeSSEClient } from './sse'; import messagesRoutes from './routes/messages'; @@ -25,6 +25,7 @@ import logsRoutes from './routes/logs'; import chatsRoutes from './routes/chats'; const API_PORT = parseInt(process.env.TINYCLAW_API_PORT || '3777', 10); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'server' }); /** * Create and start the API server. @@ -73,7 +74,7 @@ export function startApiServer( // Error handler app.onError((err, c) => { - log('ERROR', `[API] ${err.message}`); + logError(logger, err, 'API request failed'); return c.json({ error: 'Internal server error' }, 500); }); @@ -81,7 +82,7 @@ export function startApiServer( fetch: app.fetch, port: API_PORT, }, () => { - log('INFO', `API server listening on http://localhost:${API_PORT}`); + logger.info({ context: { port: API_PORT } }, `API server listening on http://localhost:${API_PORT}`); }); return server as unknown as http.Server; diff --git a/src/server/routes/agents.ts b/src/server/routes/agents.ts index e0950209..5e0dde13 100644 --- a/src/server/routes/agents.ts +++ b/src/server/routes/agents.ts @@ -3,10 +3,11 @@ import path from 'path'; import { Hono } from 'hono'; import { AgentConfig } from '../../lib/types'; import { SCRIPT_DIR, getSettings, getAgents } from '../../lib/config'; -import { log } from '../../lib/logging'; +import { createLogger, logError } from '../../lib/logging'; import { mutateSettings } from './settings'; const app = new Hono(); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'agents-route' }); // ── Agent workspace provisioning ───────────────────────────────────────────── @@ -107,13 +108,13 @@ app.put('/api/agents/:id', async (c) => { if (isNew) { try { provisionSteps = provisionAgentWorkspace(workingDir, agentId); - log('INFO', `[API] Agent '${agentId}' provisioned: ${provisionSteps.join(', ')}`); + logger.info({ agentId, context: { provisionSteps } }, 'Agent provisioned'); } catch (err) { - log('ERROR', `[API] Agent '${agentId}' provisioning failed: ${(err as Error).message}`); + logError(logger, err, 'Agent provisioning failed', { agentId }); } } - log('INFO', `[API] Agent '${agentId}' saved`); + logger.info({ agentId }, 'Agent saved'); return c.json({ ok: true, agent: settings.agents![agentId], @@ -130,7 +131,7 @@ app.delete('/api/agents/:id', (c) => { return c.json({ error: `agent '${agentId}' not found` }, 404); } mutateSettings(s => { delete s.agents![agentId]; }); - log('INFO', `[API] Agent '${agentId}' deleted`); + logger.info({ agentId }, 'Agent deleted'); return c.json({ ok: true }); }); diff --git a/src/server/routes/logs.ts b/src/server/routes/logs.ts index e0b9a5ba..7f3b8848 100644 --- a/src/server/routes/logs.ts +++ b/src/server/routes/logs.ts @@ -1,19 +1,31 @@ -import fs from 'fs'; import { Hono } from 'hono'; -import { LOG_FILE } from '../../lib/config'; +import { readLogEntries } from '../../lib/logging'; const app = new Hono(); // GET /api/logs app.get('/api/logs', (c) => { const limit = parseInt(c.req.query('limit') || '100', 10); - try { - const logContent = fs.readFileSync(LOG_FILE, 'utf8'); - const lines = logContent.trim().split('\n').slice(-limit); - return c.json({ lines }); - } catch { - return c.json({ lines: [] }); - } + const source = c.req.query('source')?.split(',').map(item => item.trim()).filter(Boolean) ?? []; + const level = c.req.query('level') || undefined; + const channel = c.req.query('channel') || undefined; + const agentId = c.req.query('agentId') || undefined; + const messageId = c.req.query('messageId') || undefined; + const conversationId = c.req.query('conversationId') || undefined; + const search = c.req.query('search') || undefined; + + return c.json({ + entries: readLogEntries({ + limit, + source, + level, + channel, + agentId, + messageId, + conversationId, + search, + }), + }); }); export default app; diff --git a/src/server/routes/messages.ts b/src/server/routes/messages.ts index 5b274479..51aa21f7 100644 --- a/src/server/routes/messages.ts +++ b/src/server/routes/messages.ts @@ -1,8 +1,10 @@ import { Hono } from 'hono'; -import { log, emitEvent } from '../../lib/logging'; +import { emitEvent } from '../../lib/events'; +import { createLogger, excerptText } from '../../lib/logging'; import { enqueueMessage } from '../../lib/db'; const app = new Hono(); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'messages-route' }); // POST /api/message app.post('/api/message', async (c) => { @@ -33,7 +35,13 @@ app.post('/api/message', async (c) => { files: files && files.length > 0 ? files : undefined, }); - log('INFO', `[API] Message enqueued: ${message.substring(0, 60)}...`); + logger.info({ + channel: resolvedChannel, + sender: resolvedSender, + agentId: agent || undefined, + messageId, + excerpt: excerptText(message, 120), + }, 'Message enqueued'); emitEvent('message_enqueued', { messageId, agent: agent || null, diff --git a/src/server/routes/queue.ts b/src/server/routes/queue.ts index 1ee100ba..bd061a01 100644 --- a/src/server/routes/queue.ts +++ b/src/server/routes/queue.ts @@ -1,6 +1,6 @@ import { Hono } from 'hono'; import { Conversation } from '../../lib/types'; -import { log } from '../../lib/logging'; +import { createLogger } from '../../lib/logging'; import { getQueueStatus, getRecentResponses, getResponsesForChannel, ackResponse, enqueueResponse, getDeadMessages, retryDeadMessage, deleteDeadMessage, @@ -8,6 +8,7 @@ import { export function createQueueRoutes(conversations: Map) { const app = new Hono(); + const logger = createLogger({ runtime: 'queue', source: 'api', component: 'queue-route' }); // GET /api/queue/status app.get('/api/queue/status', (c) => { @@ -81,7 +82,7 @@ export function createQueueRoutes(conversations: Map) { files: files && files.length > 0 ? files : undefined, }); - log('INFO', `[API] Proactive response enqueued for ${channel}/${sender}`); + logger.info({ channel, sender, agentId: agent, messageId }, 'Proactive response enqueued'); return c.json({ ok: true, messageId }); }); @@ -102,7 +103,7 @@ export function createQueueRoutes(conversations: Map) { const id = parseInt(c.req.param('id'), 10); const ok = retryDeadMessage(id); if (!ok) return c.json({ error: 'dead message not found' }, 404); - log('INFO', `[API] Dead message ${id} retried`); + logger.info({ context: { deadMessageId: id } }, 'Dead message retried'); return c.json({ ok: true }); }); @@ -111,7 +112,7 @@ export function createQueueRoutes(conversations: Map) { const id = parseInt(c.req.param('id'), 10); const ok = deleteDeadMessage(id); if (!ok) return c.json({ error: 'dead message not found' }, 404); - log('INFO', `[API] Dead message ${id} deleted`); + logger.info({ context: { deadMessageId: id } }, 'Dead message deleted'); return c.json({ ok: true }); }); diff --git a/src/server/routes/settings.ts b/src/server/routes/settings.ts index 215768b7..d7842314 100644 --- a/src/server/routes/settings.ts +++ b/src/server/routes/settings.ts @@ -2,7 +2,7 @@ import fs from 'fs'; import { Hono } from 'hono'; import { Settings } from '../../lib/types'; import { SETTINGS_FILE, getSettings } from '../../lib/config'; -import { log } from '../../lib/logging'; +import { createLogger } from '../../lib/logging'; /** Read, mutate, and persist settings.json atomically. */ export function mutateSettings(fn: (settings: Settings) => void): Settings { @@ -13,6 +13,7 @@ export function mutateSettings(fn: (settings: Settings) => void): Settings { } const app = new Hono(); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'settings-route' }); // GET /api/settings app.get('/api/settings', (c) => { @@ -25,7 +26,7 @@ app.put('/api/settings', async (c) => { const current = getSettings(); const merged = { ...current, ...body } as Settings; fs.writeFileSync(SETTINGS_FILE, JSON.stringify(merged, null, 2) + '\n'); - log('INFO', '[API] Settings updated'); + logger.info('Settings updated'); return c.json({ ok: true, settings: merged }); }); diff --git a/src/server/routes/tasks.ts b/src/server/routes/tasks.ts index 2e096ed3..c67ffa70 100644 --- a/src/server/routes/tasks.ts +++ b/src/server/routes/tasks.ts @@ -3,9 +3,10 @@ import path from 'path'; import { Hono } from 'hono'; import { Task, TaskStatus } from '../../lib/types'; import { TINYCLAW_HOME } from '../../lib/config'; -import { log } from '../../lib/logging'; +import { createLogger } from '../../lib/logging'; const TASKS_FILE = path.join(TINYCLAW_HOME, 'tasks.json'); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'tasks-route' }); function readTasks(): Task[] { try { @@ -46,7 +47,7 @@ app.post('/api/tasks', async (c) => { }; tasks.push(task); writeTasks(tasks); - log('INFO', `[API] Task created: ${task.title}`); + logger.info({ context: { taskId: task.id }, excerpt: task.title }, 'Task created'); return c.json({ ok: true, task }); }); @@ -79,7 +80,7 @@ app.put('/api/tasks/:id', async (c) => { if (idx === -1) return c.json({ error: 'task not found' }, 404); tasks[idx] = { ...tasks[idx], ...body, id: taskId, updatedAt: Date.now() }; writeTasks(tasks); - log('INFO', `[API] Task updated: ${taskId}`); + logger.info({ context: { taskId } }, 'Task updated'); return c.json({ ok: true, task: tasks[idx] }); }); @@ -91,7 +92,7 @@ app.delete('/api/tasks/:id', (c) => { if (idx === -1) return c.json({ error: 'task not found' }, 404); tasks.splice(idx, 1); writeTasks(tasks); - log('INFO', `[API] Task deleted: ${taskId}`); + logger.info({ context: { taskId } }, 'Task deleted'); return c.json({ ok: true }); }); diff --git a/src/server/routes/teams.ts b/src/server/routes/teams.ts index 5f56b9a0..635ed033 100644 --- a/src/server/routes/teams.ts +++ b/src/server/routes/teams.ts @@ -1,10 +1,11 @@ import { Hono } from 'hono'; import { TeamConfig } from '../../lib/types'; import { getSettings, getTeams } from '../../lib/config'; -import { log } from '../../lib/logging'; +import { createLogger } from '../../lib/logging'; import { mutateSettings } from './settings'; const app = new Hono(); +const logger = createLogger({ runtime: 'queue', source: 'api', component: 'teams-route' }); // GET /api/teams app.get('/api/teams', (c) => { @@ -26,7 +27,7 @@ app.put('/api/teams/:id', async (c) => { leader_agent: body.leader_agent!, }; }); - log('INFO', `[API] Team '${teamId}' saved`); + logger.info({ teamId }, 'Team saved'); return c.json({ ok: true, team: settings.teams![teamId] }); }); @@ -38,7 +39,7 @@ app.delete('/api/teams/:id', (c) => { return c.json({ error: `team '${teamId}' not found` }, 404); } mutateSettings(s => { delete s.teams![teamId]; }); - log('INFO', `[API] Team '${teamId}' deleted`); + logger.info({ teamId }, 'Team deleted'); return c.json({ ok: true }); }); diff --git a/src/server/sse.ts b/src/server/sse.ts index 27e97fc7..dead9db9 100644 --- a/src/server/sse.ts +++ b/src/server/sse.ts @@ -1,5 +1,5 @@ import http from 'http'; -import { onEvent } from '../lib/logging'; +import { onEvent } from '../lib/events'; const sseClients = new Set(); diff --git a/tinyoffice/README.md b/tinyoffice/README.md index 10e715f1..e1bcfebb 100644 --- a/tinyoffice/README.md +++ b/tinyoffice/README.md @@ -83,10 +83,11 @@ TinyOffice calls TinyClaw API endpoints such as: - `PUT /api/settings` - `GET /api/queue/status` - `GET /api/responses` -- `GET /api/logs` +- `GET /api/logs` - Unified structured log history (`limit`, `source`, `level`, `channel`, `agentId`, `messageId`, `conversationId`, `search`) - `GET /api/events/stream` (SSE) ## Notes - TinyOffice is UI-only; it does not replace TinyClaw daemon processes. - Start TinyClaw first so queue processor, channels, and API are available. +- `LOG_LEVEL=debug|info|warn|error` controls backend verbosity; `debug` includes agent handoff excerpts in the Logs view. diff --git a/tinyoffice/src/app/logs/page.tsx b/tinyoffice/src/app/logs/page.tsx index ebb9a7ae..bdd75620 100644 --- a/tinyoffice/src/app/logs/page.tsx +++ b/tinyoffice/src/app/logs/page.tsx @@ -2,7 +2,7 @@ import { useState } from "react"; import { usePolling, useSSE, timeAgo } from "@/lib/hooks"; -import { getLogs, type EventData } from "@/lib/api"; +import { getLogs, type EventData, type LogEntry } from "@/lib/api"; import { Card, CardContent, CardHeader, CardTitle } from "@/components/ui/card"; import { Button } from "@/components/ui/button"; import { Badge } from "@/components/ui/badge"; @@ -10,7 +10,7 @@ import { ScrollText, Activity, RefreshCw } from "lucide-react"; export default function LogsPage() { const [tab, setTab] = useState<"logs" | "events">("logs"); - const { data: logs, refresh: refreshLogs } = usePolling<{ lines: string[] }>( + const { data: logs, refresh: refreshLogs } = usePolling<{ entries: LogEntry[] }>( () => getLogs(200), 5000 ); @@ -25,7 +25,7 @@ export default function LogsPage() { Logs & Events

- Queue processor logs and system events + Structured runtime logs and live system events

+ {tab === "logs" ? ( @@ -78,7 +173,10 @@ export default function LogsPage() { {logs && logs.entries.length > 0 ? (
{logs.entries.map((entry, i) => ( - + ))}
) : ( @@ -89,7 +187,7 @@ export default function LogsPage() { - ) : ( + ) : tab === "events" ? ( System Events @@ -108,11 +206,162 @@ export default function LogsPage() { + ) : ( +
+ + + Queue Browser + + +
+ + + + + +
+ +
+
+

Incoming statuses

+
+ {(["pending", "processing", "dead", "completed"] as QueueMessageStatus[]).map((status) => ( + toggleMessageStatus(status)} + /> + ))} +
+
+ +
+

Outgoing statuses

+
+ {(["pending", "acked"] as QueueResponseStatus[]).map((status) => ( + toggleResponseStatus(status)} + /> + ))} +
+
+ +
+ setSearchDraft(e.target.value)} + onKeyDown={(e) => { + if (e.key === "Enter") applySearch(); + }} + placeholder="Search message, sender, message ID, agent..." + className="md:flex-1" + /> + + + +
+
+ + {queueError ? ( +

Failed to load queue rows: {queueError}

+ ) : null} +
+
+ + + + Incoming Messages + + +
+ {queue.messages.length > 0 ? ( + queue.messages.map((entry) => ( + + )) + ) : ( + + )} +
+
+
+ + + + Outgoing Responses + + +
+ {queue.responses.length > 0 ? ( + queue.responses.map((entry) => ( + + )) + ) : ( + + )} +
+
+
+
)} ); } +function QueueCountCard({ label, value }: { label: string; value: number }) { + return ( +
+

{label}

+

{value}

+
+ ); +} + +function StatusToggle({ + active, + label, + onClick, +}: { + active: boolean; + label: string; + onClick: () => void; +}) { + return ( + + ); +} + +function EmptyQueueState({ loading, label }: { loading: boolean; label: string }) { + return ( +

+ {loading ? "Loading queue rows..." : label} +

+ ); +} + function EventEntry({ event }: { event: EventData }) { const typeColors: Record = { message_received: "bg-blue-500", @@ -157,17 +406,21 @@ function EventEntry({ event }: { event: EventData }) { function LogEntryCard({ entry }: { entry: LogEntry }) { const levelClass: Record = { + trace: "border-slate-400/20", debug: "border-slate-500/30", info: "border-border/50", warn: "border-yellow-500/30", error: "border-destructive/30", + fatal: "border-destructive/50", }; const levelBadge: Record = { + trace: "secondary", debug: "secondary", info: "outline", warn: "secondary", error: "destructive", + fatal: "destructive", }; return ( @@ -211,3 +464,81 @@ function LogEntryCard({ entry }: { entry: LogEntry }) { ); } + +function QueueMessageCard({ entry }: { entry: QueueMessageRow }) { + return ( +
+
+ + {entry.channel} + {entry.sender} + {entry.agent ? @{entry.agent} : null} + {entry.fromAgent ? from:@{entry.fromAgent} : null} + {entry.messageId} + {timeAgo(entry.createdAt)} +
+ +

{entry.message}

+ +
+ created {timeAgo(entry.createdAt)} + updated {timeAgo(entry.updatedAt)} + {entry.conversationId ? conv:{entry.conversationId} : null} + {entry.claimedBy ? claimed by {entry.claimedBy} : null} + {entry.retryCount > 0 ? retries {entry.retryCount} : null} + {entry.files.length > 0 ? {entry.files.length} file{entry.files.length === 1 ? "" : "s"} : null} +
+ + {entry.lastError ? ( +
+          {entry.lastError}
+        
+ ) : null} +
+ ); +} + +function QueueResponseCard({ entry }: { entry: QueueResponseRow }) { + const metadataKeys = entry.metadata ? Object.keys(entry.metadata) : []; + + return ( +
+
+ + {entry.channel} + {entry.sender} + {entry.agent ? @{entry.agent} : null} + {entry.messageId} + {timeAgo(entry.createdAt)} +
+ +

{entry.message}

+ {entry.originalMessage ? ( +

+ Original: {entry.originalMessage} +

+ ) : null} + +
+ created {timeAgo(entry.createdAt)} + {entry.ackedAt ? acked {timeAgo(entry.ackedAt)} : null} + {entry.files.length > 0 ? {entry.files.length} file{entry.files.length === 1 ? "" : "s"} : null} + {metadataKeys.length > 0 ? metadata: {metadataKeys.join(", ")} : null} +
+
+ ); +} + +function QueueStatusBadge({ status }: { status: QueueMessageStatus | QueueResponseStatus }) { + const variant = status === "dead" + ? "destructive" + : status === "processing" || status === "acked" + ? "secondary" + : "outline"; + + return ( + + {status} + + ); +} diff --git a/tinyoffice/src/lib/api.ts b/tinyoffice/src/lib/api.ts index a6604473..07325192 100644 --- a/tinyoffice/src/lib/api.ts +++ b/tinyoffice/src/lib/api.ts @@ -7,7 +7,7 @@ async function apiFetch(path: string, options?: RequestInit): Promise { }); if (!res.ok) { const body = await res.json().catch(() => ({ error: res.statusText })); - throw new Error(body.error || res.statusText); + throw new Error(body.error || body.message || res.statusText); } return res.json(); } @@ -66,6 +66,57 @@ export interface ResponseData { files?: string[]; } +export type QueueMessageStatus = "pending" | "processing" | "completed" | "dead"; +export type QueueResponseStatus = "pending" | "acked"; + +export interface QueueMessageRow { + id: number; + messageId: string; + channel: string; + sender: string; + senderId: string | null; + agent: string | null; + conversationId: string | null; + fromAgent: string | null; + status: QueueMessageStatus; + message: string; + files: string[]; + retryCount: number; + lastError: string | null; + claimedBy: string | null; + createdAt: number; + updatedAt: number; +} + +export interface QueueResponseRow { + id: number; + messageId: string; + channel: string; + sender: string; + senderId: string | null; + agent: string | null; + message: string; + originalMessage: string | null; + files: string[]; + metadata: Record | null; + status: QueueResponseStatus; + createdAt: number; + ackedAt: number | null; +} + +export interface QueueRowsResponse { + messages: QueueMessageRow[]; + responses: QueueResponseRow[]; + counts: { + pending: number; + processing: number; + completed: number; + dead: number; + responsesPending: number; + responsesAcked: number; + }; +} + export interface EventData { type: string; timestamp: number; @@ -74,7 +125,7 @@ export interface EventData { export interface LogEntry { time: string; - level: "debug" | "info" | "warn" | "error"; + level: "trace" | "debug" | "info" | "warn" | "error" | "fatal"; source: string; component: string; msg: string; @@ -126,6 +177,37 @@ export async function getLogs(limit = 100): Promise<{ entries: LogEntry[] }> { return apiFetch(`/api/logs?limit=${limit}`); } +export async function getQueueRows(params?: { + messageStatus?: QueueMessageStatus[]; + responseStatus?: QueueResponseStatus[]; + search?: string; + limit?: number; + channel?: string; + agentId?: string; + sender?: string; + messageId?: string; + conversationId?: string; +}): Promise { + const searchParams = new URLSearchParams(); + + if (params?.messageStatus && params.messageStatus.length > 0) { + searchParams.set("messageStatus", params.messageStatus.join(",")); + } + if (params?.responseStatus && params.responseStatus.length > 0) { + searchParams.set("responseStatus", params.responseStatus.join(",")); + } + if (params?.search) searchParams.set("search", params.search); + if (params?.limit) searchParams.set("limit", String(params.limit)); + if (params?.channel) searchParams.set("channel", params.channel); + if (params?.agentId) searchParams.set("agentId", params.agentId); + if (params?.sender) searchParams.set("sender", params.sender); + if (params?.messageId) searchParams.set("messageId", params.messageId); + if (params?.conversationId) searchParams.set("conversationId", params.conversationId); + + const query = searchParams.toString(); + return apiFetch(`/api/queue/rows${query ? `?${query}` : ""}`); +} + export async function saveAgent( id: string, agent: AgentConfig From f9fbdcbec35c4119fc032ed85302c429cbfe5db7 Mon Sep 17 00:00:00 2001 From: Mike Roberts Date: Sun, 8 Mar 2026 14:21:40 -0700 Subject: [PATCH 3/6] Track conversation IDs in queued responses and tighten logging - Store and expose `conversationId` for queued responses across DB, API, and UI types - Include `conversationId` when enqueueing completed conversation responses and allow filtering/searching by it - Prevent extra blank JSON log lines, validate/clamp log limits, and lower noisy queue status logs to debug - Stop auto-shutdown on unhandled rejections in Discord/Telegram/WhatsApp clients --- lib/common.sh | 1 - lib/heartbeat-cron.sh | 1 - src/channels/discord-client.ts | 1 - src/channels/telegram-client.ts | 1 - src/channels/whatsapp-client.ts | 1 - src/lib/conversation.ts | 1 + src/lib/db.ts | 20 +++++++++++++++++--- src/lib/logging.ts | 3 ++- src/server/routes/logs.ts | 3 ++- src/server/routes/queue.ts | 2 +- tinyoffice/src/lib/api.ts | 1 + 11 files changed, 24 insertions(+), 11 deletions(-) diff --git a/lib/common.sh b/lib/common.sh index f787d8c9..9cadfd3f 100644 --- a/lib/common.sh +++ b/lib/common.sh @@ -144,7 +144,6 @@ write_structured_log() { node -e 'const [time, level, source, component, msg] = process.argv.slice(1); console.log(JSON.stringify({ time, level, source, component, msg }));' \ "$timestamp" "$level" "$source" "$component" "$msg" >> "$file" fi - printf '\n' >> "$file" } normalize_log_level() { diff --git a/lib/heartbeat-cron.sh b/lib/heartbeat-cron.sh index 5feccdab..eeda8c04 100755 --- a/lib/heartbeat-cron.sh +++ b/lib/heartbeat-cron.sh @@ -126,7 +126,6 @@ log() { node -e 'const [time, level, source, component, msg] = process.argv.slice(1); console.log(JSON.stringify({ time, level, source, component, msg }));' \ "$timestamp" "$level" "heartbeat" "heartbeat" "$msg" >> "$LOG_FILE" fi - printf '\n' >> "$LOG_FILE" } log "Heartbeat started (interval: ${INTERVAL}s, API: ${API_URL})" diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index e4bc18a4..1841e894 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -492,7 +492,6 @@ function shutdownDiscord(exitCode: number): void { // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { logError(logger, reason, 'Unhandled rejection'); - shutdownDiscord(1); }); process.on('uncaughtException', (error) => { logError(logger, error, 'Uncaught exception'); diff --git a/src/channels/telegram-client.ts b/src/channels/telegram-client.ts index e08a0d91..311dbc85 100644 --- a/src/channels/telegram-client.ts +++ b/src/channels/telegram-client.ts @@ -666,7 +666,6 @@ async function shutdownTelegram(exitCode: number): Promise { // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { logError(logger, reason, 'Unhandled rejection'); - void shutdownTelegram(1); }); process.on('uncaughtException', (error) => { logError(logger, error, 'Uncaught exception'); diff --git a/src/channels/whatsapp-client.ts b/src/channels/whatsapp-client.ts index b5745279..5ed911df 100644 --- a/src/channels/whatsapp-client.ts +++ b/src/channels/whatsapp-client.ts @@ -505,7 +505,6 @@ async function shutdownWhatsApp(exitCode: number): Promise { // Catch unhandled errors so we can see what kills the bot process.on('unhandledRejection', (reason) => { logError(logger, reason, 'Unhandled rejection'); - void shutdownWhatsApp(1); }); process.on('uncaughtException', (error) => { logError(logger, error, 'Uncaught exception'); diff --git a/src/lib/conversation.ts b/src/lib/conversation.ts index e0df3f58..457a397d 100644 --- a/src/lib/conversation.ts +++ b/src/lib/conversation.ts @@ -199,6 +199,7 @@ export function completeConversation(conv: Conversation): void { message: responseMessage, originalMessage: conv.originalMessage, messageId: conv.messageId, + conversationId: conv.id, files: allFiles.length > 0 ? allFiles : undefined, }); diff --git a/src/lib/db.ts b/src/lib/db.ts index 6c1a54c2..7d3e722e 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -40,6 +40,7 @@ export interface DbResponse { message: string; original_message: string; agent: string | null; + conversation_id: string | null; files: string | null; // JSON array metadata: string | null; // JSON object (plugin hook metadata) status: 'pending' | 'acked'; @@ -67,6 +68,7 @@ export interface EnqueueResponseData { originalMessage: string; messageId: string; agent?: string; + conversationId?: string; files?: string[]; metadata?: Record; } @@ -100,6 +102,7 @@ export interface QueueResponseRow { sender: string; senderId: string | null; agent: string | null; + conversationId: string | null; message: string; originalMessage: string | null; files: string[]; @@ -187,6 +190,7 @@ export function initQueueDb(): void { message TEXT NOT NULL, original_message TEXT NOT NULL, agent TEXT, + conversation_id TEXT, files TEXT, metadata TEXT, status TEXT NOT NULL DEFAULT 'pending', @@ -209,6 +213,9 @@ export function initQueueDb(): void { if (!cols.some(c => c.name === 'metadata')) { db.exec('ALTER TABLE responses ADD COLUMN metadata TEXT'); } + if (!cols.some(c => c.name === 'conversation_id')) { + db.exec('ALTER TABLE responses ADD COLUMN conversation_id TEXT'); + } } function getDb(): Database.Database { @@ -268,6 +275,7 @@ function mapQueueResponseRow(row: DbResponse): QueueResponseRow { sender: row.sender, senderId: row.sender_id, agent: row.agent, + conversationId: row.conversation_id, message: row.message, originalMessage: row.original_message ?? null, files: safeParseStringArray(row.files), @@ -366,8 +374,8 @@ export function enqueueResponse(data: EnqueueResponseData): number { const d = getDb(); const now = Date.now(); const result = d.prepare(` - INSERT INTO responses (message_id, channel, sender, sender_id, message, original_message, agent, files, metadata, status, created_at) - VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?) + INSERT INTO responses (message_id, channel, sender, sender_id, message, original_message, agent, conversation_id, files, metadata, status, created_at) + VALUES (?, ?, ?, ?, ?, ?, ?, ?, ?, ?, 'pending', ?) `).run( data.messageId, data.channel, @@ -376,6 +384,7 @@ export function enqueueResponse(data: EnqueueResponseData): number { data.message, data.originalMessage, data.agent ?? null, + data.conversationId ?? null, data.files ? JSON.stringify(data.files) : null, data.metadata ? JSON.stringify(data.metadata) : null, now, @@ -475,6 +484,10 @@ export function getQueueResponses(options: GetQueueResponsesOptions): QueueRespo clauses.push('message_id = ?'); params.push(options.messageId); } + if (options.conversationId) { + clauses.push('conversation_id = ?'); + params.push(options.conversationId); + } const searchTerm = normalizeSearchTerm(options.search); if (searchTerm) { clauses.push(`( @@ -483,8 +496,9 @@ export function getQueueResponses(options: GetQueueResponsesOptions): QueueRespo OR LOWER(sender) LIKE ? OR LOWER(message_id) LIKE ? OR LOWER(COALESCE(agent, '')) LIKE ? + OR LOWER(COALESCE(conversation_id, '')) LIKE ? )`); - params.push(searchTerm, searchTerm, searchTerm, searchTerm, searchTerm); + params.push(searchTerm, searchTerm, searchTerm, searchTerm, searchTerm, searchTerm); } params.push(options.limit); diff --git a/src/lib/logging.ts b/src/lib/logging.ts index 763f5001..3b8886f8 100644 --- a/src/lib/logging.ts +++ b/src/lib/logging.ts @@ -388,7 +388,8 @@ function includesSearch(entry: LogEntry, search: string): boolean { export async function readLogEntries(options: ReadLogsOptions = {}): Promise { const sourceFilter = (options.source ?? []).map(item => item.trim()).filter(Boolean); - const limit = options.limit ?? 100; + const rawLimit = options.limit ?? 100; + const limit = Number.isFinite(rawLimit) ? rawLimit : 100; if (limit <= 0) { return []; } diff --git a/src/server/routes/logs.ts b/src/server/routes/logs.ts index b799b60b..9781d30b 100644 --- a/src/server/routes/logs.ts +++ b/src/server/routes/logs.ts @@ -5,7 +5,8 @@ const app = new Hono(); // GET /api/logs app.get('/api/logs', async (c) => { - const limit = parseInt(c.req.query('limit') || '100', 10); + const rawLimit = parseInt(c.req.query('limit') || '100', 10); + const limit = Number.isFinite(rawLimit) ? Math.min(Math.max(rawLimit, 1), 1000) : 100; const source = c.req.query('source')?.split(',').map(item => item.trim()).filter(Boolean) ?? []; const level = c.req.query('level') || undefined; const channel = c.req.query('channel') || undefined; diff --git a/src/server/routes/queue.ts b/src/server/routes/queue.ts index 70e06eb9..a79291c7 100644 --- a/src/server/routes/queue.ts +++ b/src/server/routes/queue.ts @@ -146,7 +146,7 @@ export function createQueueRoutes(conversations: Map) { }); const counts = getQueueRowCounts(); - logger.info({ + logger.debug({ messageStatuses, responseStatuses, channel, diff --git a/tinyoffice/src/lib/api.ts b/tinyoffice/src/lib/api.ts index 07325192..e64a0673 100644 --- a/tinyoffice/src/lib/api.ts +++ b/tinyoffice/src/lib/api.ts @@ -95,6 +95,7 @@ export interface QueueResponseRow { sender: string; senderId: string | null; agent: string | null; + conversationId: string | null; message: string; originalMessage: string | null; files: string[]; From fdd4398a621cd02e3acb8abb5084b7b7d165d8b8 Mon Sep 17 00:00:00 2001 From: Mike Roberts Date: Sun, 8 Mar 2026 14:32:03 -0700 Subject: [PATCH 4/6] Unify heartbeat logging with shared structured logger - Source heartbeat cron logging from `lib/common.sh` to reuse level parsing and rotation - Replace inline heartbeat JSON logging with `write_structured_log` - Tighten TypeScript logging source types to runtime log files and document safe stream rotation --- lib/common.sh | 26 +++++------- lib/heartbeat-cron.sh | 94 ++++--------------------------------------- src/lib/logging.ts | 8 ++-- 3 files changed, 24 insertions(+), 104 deletions(-) diff --git a/lib/common.sh b/lib/common.sh index 9cadfd3f..f811a3b6 100644 --- a/lib/common.sh +++ b/lib/common.sh @@ -159,6 +159,13 @@ normalize_log_level() { esac } +is_explicit_log_level() { + case "$(printf '%s' "${1:-}" | tr '[:upper:]' '[:lower:]')" in + trace|verbose|debug|info|warn|warning|error|err|fatal) return 0 ;; + *) return 1 ;; + esac +} + log_level_priority() { case "$(normalize_log_level "$1")" in debug) echo 0 ;; @@ -175,21 +182,10 @@ log() { local threshold local msg - case "$(normalize_log_level "$candidate_level")" in - debug|info|warn|error) - if [ "$candidate_level" = "$(normalize_log_level "$candidate_level")" ] || \ - [ "$candidate_level" = "DEBUG" ] || [ "$candidate_level" = "INFO" ] || \ - [ "$candidate_level" = "WARN" ] || [ "$candidate_level" = "WARNING" ] || \ - [ "$candidate_level" = "ERROR" ] || [ "$candidate_level" = "verbose" ] || \ - [ "$candidate_level" = "VERBOSE" ] || [ "$candidate_level" = "trace" ] || \ - [ "$candidate_level" = "TRACE" ] || [ "$candidate_level" = "fatal" ] || \ - [ "$candidate_level" = "FATAL" ] || [ "$candidate_level" = "err" ] || \ - [ "$candidate_level" = "ERR" ]; then - level="$(normalize_log_level "$candidate_level")" - shift - fi - ;; - esac + if is_explicit_log_level "$candidate_level"; then + level="$(normalize_log_level "$candidate_level")" + shift + fi msg="$*" [ -n "$msg" ] || return 0 diff --git a/lib/heartbeat-cron.sh b/lib/heartbeat-cron.sh index eeda8c04..8818f97c 100755 --- a/lib/heartbeat-cron.sh +++ b/lib/heartbeat-cron.sh @@ -3,6 +3,7 @@ SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" PROJECT_ROOT="$(cd "$SCRIPT_DIR/.." && pwd)" +source "$PROJECT_ROOT/lib/common.sh" if [ -z "$TINYCLAW_HOME" ]; then if [ -f "$PROJECT_ROOT/.tinyclaw/settings.json" ]; then TINYCLAW_HOME="$PROJECT_ROOT/.tinyclaw" @@ -11,6 +12,7 @@ if [ -z "$TINYCLAW_HOME" ]; then fi fi LOG_FILE="$TINYCLAW_HOME/logs/heartbeat.log" +LOG_DIR="$(dirname "$LOG_FILE")" SETTINGS_FILE="$TINYCLAW_HOME/settings.json" API_PORT="${TINYCLAW_API_PORT:-3777}" API_URL="http://localhost:${API_PORT}" @@ -25,82 +27,16 @@ INTERVAL=${INTERVAL:-3600} mkdir -p "$(dirname "$LOG_FILE")" -rotate_log_file() { - local file="$1" - local max_bytes=$((10 * 1024 * 1024)) - local max_files=5 - - [ -f "$file" ] || return 0 - - local size - size=$(wc -c < "$file" | tr -d ' ') - if [ "$size" -lt "$max_bytes" ]; then - return 0 - fi - - local ext="${file##*.}" - local base="${file%.*}" - local i - for ((i=max_files; i>=1; i--)); do - local current="${base}.${i}.${ext}" - local previous - if [ "$i" -eq 1 ]; then - previous="$file" - else - previous="${base}.$((i-1)).${ext}" - fi - - [ -f "$previous" ] || continue - [ ! -f "$current" ] || rm -f "$current" - mv "$previous" "$current" - done -} - -normalize_log_level() { - local raw - raw=$(printf '%s' "${1:-info}" | tr '[:upper:]' '[:lower:]') - case "$raw" in - trace|verbose) echo "debug" ;; - debug) echo "debug" ;; - info|"") echo "info" ;; - warn|warning) echo "warn" ;; - error|err|fatal) echo "error" ;; - *) echo "info" ;; - esac -} - -log_level_priority() { - case "$(normalize_log_level "$1")" in - debug) echo 0 ;; - info) echo 1 ;; - warn) echo 2 ;; - error) echo 3 ;; - *) echo 1 ;; - esac -} - log() { local candidate_level="${1:-}" local level="info" local threshold local msg - local timestamp - - case "$(normalize_log_level "$candidate_level")" in - debug|info|warn|error) - if [ "$candidate_level" = "$(normalize_log_level "$candidate_level")" ] || \ - [ "$candidate_level" = "DEBUG" ] || [ "$candidate_level" = "INFO" ] || \ - [ "$candidate_level" = "WARN" ] || [ "$candidate_level" = "WARNING" ] || \ - [ "$candidate_level" = "ERROR" ] || [ "$candidate_level" = "verbose" ] || \ - [ "$candidate_level" = "VERBOSE" ] || [ "$candidate_level" = "trace" ] || \ - [ "$candidate_level" = "TRACE" ] || [ "$candidate_level" = "fatal" ] || \ - [ "$candidate_level" = "FATAL" ] || [ "$candidate_level" = "err" ] || \ - [ "$candidate_level" = "ERR" ]; then - level="$(normalize_log_level "$candidate_level")" - shift - fi - ;; - esac + + if is_explicit_log_level "$candidate_level"; then + level="$(normalize_log_level "$candidate_level")" + shift + fi msg="$*" [ -n "$msg" ] || return 0 @@ -110,22 +46,8 @@ log() { return 0 fi - timestamp=$(date -u '+%Y-%m-%dT%H:%M:%SZ') - echo "[$(date '+%Y-%m-%d %H:%M:%S')] $msg" - rotate_log_file "$LOG_FILE" - if command -v jq >/dev/null 2>&1; then - jq -nc \ - --arg time "$timestamp" \ - --arg level "$level" \ - --arg source "heartbeat" \ - --arg component "heartbeat" \ - --arg msg "$msg" \ - '{time:$time,level:$level,source:$source,component:$component,msg:$msg}' >> "$LOG_FILE" - else - node -e 'const [time, level, source, component, msg] = process.argv.slice(1); console.log(JSON.stringify({ time, level, source, component, msg }));' \ - "$timestamp" "$level" "heartbeat" "heartbeat" "$msg" >> "$LOG_FILE" - fi + write_structured_log "heartbeat" "heartbeat" "$level" "$msg" } log "Heartbeat started (interval: ${INTERVAL}s, API: ${API_URL})" diff --git a/src/lib/logging.ts b/src/lib/logging.ts index 3b8886f8..864828ed 100644 --- a/src/lib/logging.ts +++ b/src/lib/logging.ts @@ -6,13 +6,12 @@ import pino, { type Logger } from 'pino'; import { LOG_DIR } from './config'; export type RuntimeLogFile = 'queue' | 'api' | 'telegram' | 'discord' | 'whatsapp' | 'daemon' | 'heartbeat'; -export type LogSource = RuntimeLogFile | 'api'; export type LogLevel = 'debug' | 'info' | 'warn' | 'error'; export interface LogEntry { time: string; level: LogLevel; - source: LogSource | string; + source: RuntimeLogFile | string; component: string; msg: string; channel?: string; @@ -36,7 +35,7 @@ export interface LogEntry { interface CreateLoggerOptions { runtime: RuntimeLogFile; - source?: LogSource; + source?: RuntimeLogFile; component: string; bindings?: Record; } @@ -113,6 +112,9 @@ class RotatingFileStream extends Writable { return; } + // Closing the old stream is asynchronous, but rotating immediately is safe here: + // the file descriptor keeps the old inode alive while renameSync moves the path, + // so in-flight writes still land in the pre-rotation file on Unix-like systems. this.stream.end(); for (let index = MAX_ROTATED_FILES; index >= 1; index--) { From a8c60a173bb7b522b823af66eabbff5316c98872 Mon Sep 17 00:00:00 2001 From: Mike Roberts Date: Sun, 8 Mar 2026 14:35:02 -0700 Subject: [PATCH 5/6] Harden logging paths and return 500 on agent provision errors - Catch ENOENT when reading rotated log files to avoid transient failures - Split Discord message excerpt into debug logs and stop logging pairing codes - Return HTTP 500 from PUT /api/agents/:id when provisioning fails --- src/channels/discord-client.ts | 9 +++++++-- src/lib/logging.ts | 14 ++++++++++++-- src/server/routes/agents.ts | 10 ++++++++-- 3 files changed, 27 insertions(+), 6 deletions(-) diff --git a/src/channels/discord-client.ts b/src/channels/discord-client.ts index 1841e894..9d3047c5 100644 --- a/src/channels/discord-client.ts +++ b/src/channels/discord-client.ts @@ -256,14 +256,19 @@ client.on(Events.MessageCreate, async (message: Message) => { channel: 'discord', sender, messageId, - excerpt: excerptText(messageText || '[attachment only]'), context: { fileCount: downloadedFiles.length, senderId: message.author.id }, }, 'Message received'); + logger.debug({ + channel: 'discord', + sender, + messageId, + excerpt: excerptText(messageText || '[attachment only]'), + }, 'Message received excerpt'); const pairing = ensureSenderPaired(PAIRING_FILE, 'discord', message.author.id, sender); if (!pairing.approved && pairing.code) { if (pairing.isNewPending) { - logger.info({ channel: 'discord', sender, context: { senderId: message.author.id, pairingCode: pairing.code } }, 'Blocked unpaired sender'); + logger.info({ channel: 'discord', sender, context: { senderId: message.author.id } }, 'Blocked unpaired sender'); await message.reply(pairingMessage(pairing.code)); } else { logger.info({ channel: 'discord', sender, context: { senderId: message.author.id } }, 'Blocked pending sender without re-sending pairing message'); diff --git a/src/lib/logging.ts b/src/lib/logging.ts index 864828ed..d79403d6 100644 --- a/src/lib/logging.ts +++ b/src/lib/logging.ts @@ -331,8 +331,18 @@ async function readEntriesForRuntime( if (remaining <= 0) { break; } - const entries = await readNewestEntriesFromFile(filePath, remaining, matcher); - results.push(...entries); + try { + const entries = await readNewestEntriesFromFile(filePath, remaining, matcher); + results.push(...entries); + } catch (error) { + const fsError = error as NodeJS.ErrnoException; + // listFilesForRuntime() can race with RotatingFileStream.rotateIfNeeded(), + // which may rename or remove a file between enumeration and read. + if (fsError.code === 'ENOENT' || /no such file/i.test(fsError.message ?? '')) { + continue; + } + throw error; + } } return results; } diff --git a/src/server/routes/agents.ts b/src/server/routes/agents.ts index 8f8a8bf1..74140cca 100644 --- a/src/server/routes/agents.ts +++ b/src/server/routes/agents.ts @@ -117,13 +117,19 @@ app.put('/api/agents/:id', async (c) => { } logger.info({ agentId }, 'Agent saved'); - return c.json({ + const response = { ok: !provisionError, agent: settings.agents![agentId], provisioned: isNew && !provisionError, provisionSteps, ...(provisionError ? { provisionError } : {}), - }); + }; + + if (provisionError) { + return c.json(response, 500); + } + + return c.json(response); }); // DELETE /api/agents/:id From 00e06397d89880c82bc7e9aea8df4215a62f216f Mon Sep 17 00:00:00 2001 From: Mike Roberts Date: Sun, 8 Mar 2026 14:49:20 -0700 Subject: [PATCH 6/6] Harden log search matching and normalize level filters - Escape `%`, `_`, and `\` in queue search terms and use `LIKE ... ESCAPE '\\'` - Make log level filtering case-insensitive in API parsing and log matching - Preserve explicit `limit=0` by checking `limit !== undefined` in TinyOffice API params --- src/lib/db.ts | 30 ++++++++++++++++++------------ src/lib/logging.ts | 2 +- src/server/routes/logs.ts | 2 +- tinyoffice/src/lib/api.ts | 2 +- 4 files changed, 21 insertions(+), 15 deletions(-) diff --git a/src/lib/db.ts b/src/lib/db.ts index 7d3e722e..7898c39e 100644 --- a/src/lib/db.ts +++ b/src/lib/db.ts @@ -292,7 +292,13 @@ function buildInClause(values: readonly string[]): string { function normalizeSearchTerm(search?: string): string | undefined { const trimmed = search?.trim(); - return trimmed ? `%${trimmed.toLowerCase()}%` : undefined; + if (!trimmed) return undefined; + const escaped = trimmed + .toLowerCase() + .replace(/\\/g, '\\\\') + .replace(/%/g, '\\%') + .replace(/_/g, '\\_'); + return `%${escaped}%`; } // ── Messages (incoming queue) ──────────────────────────────────────────────── @@ -441,11 +447,11 @@ export function getQueueMessages(options: GetQueueMessagesOptions): QueueMessage const searchTerm = normalizeSearchTerm(options.search); if (searchTerm) { clauses.push(`( - LOWER(message) LIKE ? - OR LOWER(sender) LIKE ? - OR LOWER(message_id) LIKE ? - OR LOWER(COALESCE(agent, '')) LIKE ? - OR LOWER(COALESCE(conversation_id, '')) LIKE ? + LOWER(message) LIKE ? ESCAPE '\\' + OR LOWER(sender) LIKE ? ESCAPE '\\' + OR LOWER(message_id) LIKE ? ESCAPE '\\' + OR LOWER(COALESCE(agent, '')) LIKE ? ESCAPE '\\' + OR LOWER(COALESCE(conversation_id, '')) LIKE ? ESCAPE '\\' )`); params.push(searchTerm, searchTerm, searchTerm, searchTerm, searchTerm); } @@ -491,12 +497,12 @@ export function getQueueResponses(options: GetQueueResponsesOptions): QueueRespo const searchTerm = normalizeSearchTerm(options.search); if (searchTerm) { clauses.push(`( - LOWER(message) LIKE ? - OR LOWER(COALESCE(original_message, '')) LIKE ? - OR LOWER(sender) LIKE ? - OR LOWER(message_id) LIKE ? - OR LOWER(COALESCE(agent, '')) LIKE ? - OR LOWER(COALESCE(conversation_id, '')) LIKE ? + LOWER(message) LIKE ? ESCAPE '\\' + OR LOWER(COALESCE(original_message, '')) LIKE ? ESCAPE '\\' + OR LOWER(sender) LIKE ? ESCAPE '\\' + OR LOWER(message_id) LIKE ? ESCAPE '\\' + OR LOWER(COALESCE(agent, '')) LIKE ? ESCAPE '\\' + OR LOWER(COALESCE(conversation_id, '')) LIKE ? ESCAPE '\\' )`); params.push(searchTerm, searchTerm, searchTerm, searchTerm, searchTerm, searchTerm); } diff --git a/src/lib/logging.ts b/src/lib/logging.ts index d79403d6..70f0268c 100644 --- a/src/lib/logging.ts +++ b/src/lib/logging.ts @@ -255,7 +255,7 @@ function matchesFilters(entry: LogEntry, options: ReadLogsOptions, sourceFilter: if (sourceFilter.length > 0 && !sourceFilter.includes(String(entry.source))) { return false; } - if (options.level && String(entry.level) !== options.level) { + if (options.level && String(entry.level).toLowerCase() !== options.level.toLowerCase()) { return false; } if (options.channel && String(entry.channel ?? '') !== options.channel) { diff --git a/src/server/routes/logs.ts b/src/server/routes/logs.ts index 9781d30b..b45dab56 100644 --- a/src/server/routes/logs.ts +++ b/src/server/routes/logs.ts @@ -8,7 +8,7 @@ app.get('/api/logs', async (c) => { const rawLimit = parseInt(c.req.query('limit') || '100', 10); const limit = Number.isFinite(rawLimit) ? Math.min(Math.max(rawLimit, 1), 1000) : 100; const source = c.req.query('source')?.split(',').map(item => item.trim()).filter(Boolean) ?? []; - const level = c.req.query('level') || undefined; + const level = c.req.query('level')?.trim().toLowerCase() || undefined; const channel = c.req.query('channel') || undefined; const agentId = c.req.query('agentId') || undefined; const messageId = c.req.query('messageId') || undefined; diff --git a/tinyoffice/src/lib/api.ts b/tinyoffice/src/lib/api.ts index e64a0673..a10be819 100644 --- a/tinyoffice/src/lib/api.ts +++ b/tinyoffice/src/lib/api.ts @@ -198,7 +198,7 @@ export async function getQueueRows(params?: { searchParams.set("responseStatus", params.responseStatus.join(",")); } if (params?.search) searchParams.set("search", params.search); - if (params?.limit) searchParams.set("limit", String(params.limit)); + if (params?.limit !== undefined) searchParams.set("limit", String(params.limit)); if (params?.channel) searchParams.set("channel", params.channel); if (params?.agentId) searchParams.set("agentId", params.agentId); if (params?.sender) searchParams.set("sender", params.sender);