From a5aa87ca06d00a51e31bb086698f04b78eff4e50 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Thu, 11 Jun 2026 21:47:50 +0800 Subject: [PATCH 01/22] fix(memory): inject created_at/updated_at into MEMORY_FIELDS MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit The vector DB layer already stores timestamps for all memories, but the Markdown file layer (MEMORY_FIELDS metadata) never included them. This meant that reading a memory file directly gave no indication of when it was created or last updated. Changes: - memory_updater.py: _apply_upsert now injects created_at (preserved on updates, set to now for new files) and updated_at (always refreshed) into MEMORY_FIELDS after merging schema fields. - memory_type_registry.py: initialize_memory_files now includes created_at/updated_at when creating initial identity.md, soul.md, etc. - Added TestUpsertTimestamps with two regression tests covering new-file and update scenarios. The serialization/deserialization infrastructure in memory_file_utils.py already supported these fields — the gap was only in the write path. 🤖 Generated with [Qoder][https://qoder.com] --- .../session/memory/memory_type_registry.py | 9 +- openviking/session/memory/memory_updater.py | 11 +++ tests/session/memory/test_memory_updater.py | 92 +++++++++++++++++++ 3 files changed, 111 insertions(+), 1 deletion(-) diff --git a/openviking/session/memory/memory_type_registry.py b/openviking/session/memory/memory_type_registry.py index a07c20d179..b8ffb52fea 100644 --- a/openviking/session/memory/memory_type_registry.py +++ b/openviking/session/memory/memory_type_registry.py @@ -301,13 +301,20 @@ async def initialize_memory_files( pass # Add MEMORY_FIELDS comment with field metadata + from datetime import datetime, timezone + from openviking.session.memory.dataclass import MemoryFile from openviking.session.memory.utils.memory_file_utils import MemoryFileUtils + now = datetime.now(timezone.utc) + extra = dict(fields_with_init) + extra["created_at"] = now + extra["updated_at"] = now + mf = MemoryFile( uri=file_uri, memory_type=schema.memory_type, - extra_fields=fields_with_init, + extra_fields=extra, ) full_content = MemoryFileUtils.write( mf, diff --git a/openviking/session/memory/memory_updater.py b/openviking/session/memory/memory_updater.py index b7c0eac2c8..49cc190b5a 100644 --- a/openviking/session/memory/memory_updater.py +++ b/openviking/session/memory/memory_updater.py @@ -11,6 +11,7 @@ import re from dataclasses import dataclass +from datetime import datetime, timezone from typing import TYPE_CHECKING, Any, Dict, List, Optional, Tuple if TYPE_CHECKING: @@ -695,6 +696,16 @@ async def _apply_upsert( if key not in schema_field_names and key not in metadata and val is not None: metadata[key] = val + # Inject system-managed timestamps into MEMORY_FIELDS. + # created_at: inherited from existing file for updates; set to now for new files. + # updated_at: always refreshed to the current time. + now = datetime.now(timezone.utc) + if old_content and old_content.extra_fields.get("created_at"): + metadata["created_at"] = old_content.extra_fields["created_at"] + else: + metadata["created_at"] = now + metadata["updated_at"] = now + # Handle links/backlinks fields: merge with existing incoming_links_by_uri = getattr(resolved_op, "_incoming_links_by_uri", {}) incoming_backlinks_by_uri = getattr(resolved_op, "_incoming_backlinks_by_uri", {}) diff --git a/tests/session/memory/test_memory_updater.py b/tests/session/memory/test_memory_updater.py index 82161d7667..e0a8e5c245 100644 --- a/tests/session/memory/test_memory_updater.py +++ b/tests/session/memory/test_memory_updater.py @@ -847,3 +847,95 @@ async def mock_write_file(uri, content, **kwargs): assert "ALPHA" in parsed["content"] assert "BETA" in parsed["content"] assert "gamma" in parsed["content"] + + +class TestUpsertTimestamps: + """Regression tests: _apply_upsert must inject created_at and updated_at + into MEMORY_FIELDS for both new and updated files.""" + + def _make_updater(self, memory_type="notes"): + schema = MemoryTypeSchema( + memory_type=memory_type, + description="notes", + fields=[ + MemoryField( + name="content", field_type=FieldType.STRING, merge_op=MergeOp.PATCH + ), + ], + ) + registry = MemoryTypeRegistry() + registry.register(schema) + return MemoryUpdater(registry=registry) + + def _make_fs(self): + store: dict[str, str] = {} + mock_fs = MagicMock() + + async def mock_read_file(uri, **kwargs): + return store.get(uri) + + async def mock_write_file(uri, content, **kwargs): + store[uri] = content + + mock_fs.read_file = mock_read_file + mock_fs.write_file = mock_write_file + return store, mock_fs + + @pytest.mark.asyncio + async def test_upsert_injects_timestamps_on_new_file(self): + """A brand-new file (no old_content) must get created_at and updated_at.""" + uri = "viking://user/test/memories/notes/new.md" + updater = self._make_updater() + store, mock_fs = self._make_fs() + updater._get_viking_fs = MagicMock(return_value=mock_fs) + + op = ResolvedOperation( + old_memory_file_content=None, + memory_fields={"content": "Hello world"}, + memory_type="notes", + uris=[uri], + ) + await updater._apply_upsert(op, MagicMock()) + + parsed = parse_memory_file_with_fields(store[uri]) + assert "created_at" in parsed, "created_at must be present in MEMORY_FIELDS" + assert "updated_at" in parsed, "updated_at must be present in MEMORY_FIELDS" + assert parsed["created_at"] == parsed["updated_at"] + + @pytest.mark.asyncio + async def test_upsert_preserves_created_at_updates_updated_at(self): + """Updating an existing file must keep created_at and refresh updated_at.""" + from datetime import datetime, timezone + + uri = "viking://user/test/memories/notes/existing.md" + updater = self._make_updater() + store, mock_fs = self._make_fs() + updater._get_viking_fs = MagicMock(return_value=mock_fs) + + old_created = "2026-01-01T00:00:00+00:00" + old_updated = "2026-01-01T00:00:00+00:00" + + # Seed the store with an existing file that has timestamps + existing = MemoryFile( + uri=uri, + content="Old content", + extra_fields={"created_at": old_created, "updated_at": old_updated}, + ) + existing_content = MemoryFileUtils.write(existing) + store[uri] = existing_content + + patch = StrPatch(blocks=[SearchReplaceBlock(search="Old", replace="New")]) + op = ResolvedOperation( + old_memory_file_content=existing, + memory_fields={"content": patch}, + memory_type="notes", + uris=[uri], + ) + await updater._apply_upsert(op, MagicMock()) + + parsed = parse_memory_file_with_fields(store[uri]) + assert parsed["created_at"] == old_created, "created_at must be preserved from original" + assert parsed["updated_at"] != old_updated, "updated_at must be refreshed" + # updated_at should be a recent UTC time + updated_dt = datetime.fromisoformat(parsed["updated_at"]) + assert updated_dt.tzinfo is not None, "updated_at must be timezone-aware" From 264556cd3612ddee5586380f70f91802e91a087d Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:09:31 +0800 Subject: [PATCH 02/22] feat: add daemon data models --- openviking/daemon/__init__.py | 3 + openviking/daemon/models.py | 85 ++++++++++++++++++++++++++ openviking/daemon/watchers/__init__.py | 0 tests/daemon/__init__.py | 0 4 files changed, 88 insertions(+) create mode 100644 openviking/daemon/__init__.py create mode 100644 openviking/daemon/models.py create mode 100644 openviking/daemon/watchers/__init__.py create mode 100644 tests/daemon/__init__.py diff --git a/openviking/daemon/__init__.py b/openviking/daemon/__init__.py new file mode 100644 index 0000000000..b64223e583 --- /dev/null +++ b/openviking/daemon/__init__.py @@ -0,0 +1,3 @@ +""" +OpenViking Active Daemon - Monitors Claude Code JSONL logs and extracts knowledge into viking:// storage. +""" diff --git a/openviking/daemon/models.py b/openviking/daemon/models.py new file mode 100644 index 0000000000..0ea3d6b5c7 --- /dev/null +++ b/openviking/daemon/models.py @@ -0,0 +1,85 @@ +""" +Data models for OpenViking Active Daemon. +""" +from dataclasses import dataclass, field +from typing import Any, Dict, List, Optional + + +@dataclass +class FileCursor: + """Tracks file read position for incremental processing.""" + file_path: str + last_position: int = 0 + last_read_time: float = 0.0 + + def to_dict(self) -> Dict[str, Any]: + return { + "file_path": self.file_path, + "last_position": self.last_position, + "last_read_time": self.last_read_time, + } + + @classmethod + def from_dict(cls, data: Dict[str, Any]) -> "FileCursor": + return cls( + file_path=data["file_path"], + last_position=data.get("last_position", 0), + last_read_time=data.get("last_read_time", 0.0), + ) + + +@dataclass +class BatchBuffer: + """Buffer for accumulating events before batch processing.""" + lines: List[Dict[str, Any]] = field(default_factory=list) + byte_count: int = 0 + created_at: float = 0.0 + + def add_line(self, line: Dict[str, Any], byte_size: int): + self.lines.append(line) + self.byte_count += byte_size + + def is_empty(self) -> bool: + return len(self.lines) == 0 + + def clear(self): + self.lines.clear() + self.byte_count = 0 + self.created_at = 0.0 + + +@dataclass +class ConversationTurn: + """A complete user-assistant conversation turn.""" + user_prompt: str + assistant_response: str + session_id: Optional[str] = None + project_name: Optional[str] = None + timestamp: Optional[str] = None + + +@dataclass +class ExtractedKnowledge: + """Structured knowledge extracted from a conversation.""" + status: str # "EXTRACTED" | "IGNORED" + category: str # "skills" | "memories" | "resources" + title: str + content: str + confidence: float = 0.0 + project_name: Optional[str] = None + entity_links: List[str] = field(default_factory=list) + actionable_steps: List[str] = field(default_factory=list) + timestamp: Optional[str] = None + + def to_dict(self) -> Dict[str, Any]: + return { + "status": self.status, + "category": self.category, + "title": self.title, + "content": self.content, + "confidence": self.confidence, + "project_name": self.project_name, + "entity_links": self.entity_links, + "actionable_steps": self.actionable_steps, + "timestamp": self.timestamp, + } diff --git a/openviking/daemon/watchers/__init__.py b/openviking/daemon/watchers/__init__.py new file mode 100644 index 0000000000..e69de29bb2 diff --git a/tests/daemon/__init__.py b/tests/daemon/__init__.py new file mode 100644 index 0000000000..e69de29bb2 From d1f5f3d0a3d68088a96d154ddde2f012ff1f971b Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:09:43 +0800 Subject: [PATCH 03/22] feat: add cursor manager with SQLite persistence --- openviking/daemon/cursor_manager.py | 92 +++++++++++++++++++++++++++++ tests/daemon/test_cursor_manager.py | 61 +++++++++++++++++++ 2 files changed, 153 insertions(+) create mode 100644 openviking/daemon/cursor_manager.py create mode 100644 tests/daemon/test_cursor_manager.py diff --git a/openviking/daemon/cursor_manager.py b/openviking/daemon/cursor_manager.py new file mode 100644 index 0000000000..6b76bc2565 --- /dev/null +++ b/openviking/daemon/cursor_manager.py @@ -0,0 +1,92 @@ +""" +Cursor Manager for tracking file read positions. +Persists state in SQLite so Daemon can resume after restart. +""" +import sqlite3 +import time +from pathlib import Path +from typing import Dict + +from openviking.daemon.models import FileCursor +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class CursorManager: + """Manages file cursor state with SQLite persistence.""" + + def __init__(self, db_path: str): + self.db_path = db_path + self._init_db() + + def _init_db(self): + """Initialize the SQLite database and schema.""" + Path(self.db_path).parent.mkdir(parents=True, exist_ok=True) + + conn = sqlite3.connect(self.db_path) + try: + conn.execute(""" + CREATE TABLE IF NOT EXISTS file_cursors ( + file_path TEXT PRIMARY KEY, + last_position INTEGER NOT NULL DEFAULT 0, + last_read_time REAL NOT NULL DEFAULT 0.0, + updated_at TIMESTAMP DEFAULT CURRENT_TIMESTAMP + ) + """) + conn.commit() + finally: + conn.close() + + def get_cursor(self, file_path: str) -> FileCursor: + """Get the cursor state for a file. Returns zero-position cursor if not found.""" + conn = sqlite3.connect(self.db_path) + try: + row = conn.execute( + "SELECT last_position, last_read_time FROM file_cursors WHERE file_path = ?", + (file_path,), + ).fetchone() + + if row: + return FileCursor( + file_path=file_path, + last_position=row[0], + last_read_time=row[1], + ) + return FileCursor(file_path=file_path) + finally: + conn.close() + + def update_cursor(self, file_path: str, position: int): + """Update the cursor position for a file.""" + conn = sqlite3.connect(self.db_path) + try: + conn.execute( + """ + INSERT OR REPLACE INTO file_cursors (file_path, last_position, last_read_time) + VALUES (?, ?, ?) + """, + (file_path, position, time.time()), + ) + conn.commit() + finally: + conn.close() + + def get_all_cursors(self) -> Dict[str, FileCursor]: + """Get all tracked cursor states.""" + conn = sqlite3.connect(self.db_path) + try: + rows = conn.execute( + "SELECT file_path, last_position, last_read_time FROM file_cursors" + ).fetchall() + + return { + row[0]: FileCursor( + file_path=row[0], + last_position=row[1], + last_read_time=row[2], + ) + for row in rows + } + finally: + conn.close() diff --git a/tests/daemon/test_cursor_manager.py b/tests/daemon/test_cursor_manager.py new file mode 100644 index 0000000000..375df56d09 --- /dev/null +++ b/tests/daemon/test_cursor_manager.py @@ -0,0 +1,61 @@ +"""Tests for CursorManager.""" +import os +import tempfile + +import pytest + +from openviking.daemon.cursor_manager import CursorManager + + +@pytest.fixture +def temp_db(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + yield path + if os.path.exists(path): + os.unlink(path) + + +def test_default_cursor(temp_db): + manager = CursorManager(temp_db) + cursor = manager.get_cursor("/path/to/file.jsonl") + assert cursor.last_position == 0 + assert cursor.last_read_time == 0.0 + + +def test_save_and_load_cursor(temp_db): + manager = CursorManager(temp_db) + manager.update_cursor("/path/to/file.jsonl", 1024) + + cursor = manager.get_cursor("/path/to/file.jsonl") + assert cursor.last_position == 1024 + assert cursor.last_read_time > 0 + + +def test_persist_across_instances(temp_db): + manager1 = CursorManager(temp_db) + manager1.update_cursor("/path/to/file.jsonl", 2048) + + manager2 = CursorManager(temp_db) + cursor = manager2.get_cursor("/path/to/file.jsonl") + assert cursor.last_position == 2048 + + +def test_get_all_cursors(temp_db): + manager = CursorManager(temp_db) + manager.update_cursor("/path/file1.jsonl", 100) + manager.update_cursor("/path/file2.jsonl", 200) + + cursors = manager.get_all_cursors() + assert len(cursors) == 2 + assert cursors["/path/file1.jsonl"].last_position == 100 + assert cursors["/path/file2.jsonl"].last_position == 200 + + +def test_update_existing_cursor(temp_db): + manager = CursorManager(temp_db) + manager.update_cursor("/path/file.jsonl", 100) + manager.update_cursor("/path/file.jsonl", 500) + + cursor = manager.get_cursor("/path/file.jsonl") + assert cursor.last_position == 500 From 25c930f4ef301ed821baeb70a984002715afdf3d Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:09:54 +0800 Subject: [PATCH 04/22] feat: add knowledge deduplicator with MD5 hashing --- openviking/daemon/deduplicator.py | 35 ++++++++++++++++++++++++++ tests/daemon/test_deduplicator.py | 42 +++++++++++++++++++++++++++++++ 2 files changed, 77 insertions(+) create mode 100644 openviking/daemon/deduplicator.py create mode 100644 tests/daemon/test_deduplicator.py diff --git a/openviking/daemon/deduplicator.py b/openviking/daemon/deduplicator.py new file mode 100644 index 0000000000..9066b57bb4 --- /dev/null +++ b/openviking/daemon/deduplicator.py @@ -0,0 +1,35 @@ +""" +Knowledge deduplication based on content hashing. +Prevents duplicate knowledge from being written to viking:// storage. +""" +import hashlib +from typing import Set + +from openviking.daemon.models import ExtractedKnowledge +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class KnowledgeDeduplicator: + """Deduplicates knowledge items based on MD5 content hash.""" + + def __init__(self): + self.seen_hashes: Set[str] = set() + + def is_duplicate(self, knowledge: ExtractedKnowledge) -> bool: + """Check if this knowledge is a duplicate of something already seen.""" + content_hash = hashlib.md5( + knowledge.content.encode("utf-8") + ).hexdigest() + + if content_hash in self.seen_hashes: + logger.debug("Duplicate knowledge skipped: %s", knowledge.title) + return True + + self.seen_hashes.add(content_hash) + return False + + def clear(self): + """Clear the dedup cache.""" + self.seen_hashes.clear() diff --git a/tests/daemon/test_deduplicator.py b/tests/daemon/test_deduplicator.py new file mode 100644 index 0000000000..569c14a3fb --- /dev/null +++ b/tests/daemon/test_deduplicator.py @@ -0,0 +1,42 @@ +"""Tests for KnowledgeDeduplicator.""" +from openviking.daemon.deduplicator import KnowledgeDeduplicator +from openviking.daemon.models import ExtractedKnowledge + + +def _make_knowledge(title: str, content: str) -> ExtractedKnowledge: + return ExtractedKnowledge( + status="EXTRACTED", + category="memories", + title=title, + content=content, + ) + + +def test_first_occurrence_not_duplicate(): + dedup = KnowledgeDeduplicator() + k = _make_knowledge("Test", "Some unique content") + assert not dedup.is_duplicate(k) + + +def test_same_content_is_duplicate(): + dedup = KnowledgeDeduplicator() + k1 = _make_knowledge("Title A", "Same content") + k2 = _make_knowledge("Title B", "Same content") + assert not dedup.is_duplicate(k1) + assert dedup.is_duplicate(k2) + + +def test_different_content_not_duplicate(): + dedup = KnowledgeDeduplicator() + k1 = _make_knowledge("A", "Content 1") + k2 = _make_knowledge("B", "Content 2") + assert not dedup.is_duplicate(k1) + assert not dedup.is_duplicate(k2) + + +def test_clear_resets_cache(): + dedup = KnowledgeDeduplicator() + k = _make_knowledge("Test", "Content") + dedup.is_duplicate(k) + dedup.clear() + assert not dedup.is_duplicate(k) From 195990a6141dbc498494b3bbd574c56ce6538e39 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:18:22 +0800 Subject: [PATCH 05/22] feat: add Claude Code JSONL watcher with watchdog --- .../daemon/watchers/claude_code_watcher.py | 165 ++++++++++++++++++ tests/daemon/test_claude_code_watcher.py | 88 ++++++++++ 2 files changed, 253 insertions(+) create mode 100644 openviking/daemon/watchers/claude_code_watcher.py create mode 100644 tests/daemon/test_claude_code_watcher.py diff --git a/openviking/daemon/watchers/claude_code_watcher.py b/openviking/daemon/watchers/claude_code_watcher.py new file mode 100644 index 0000000000..91eca08159 --- /dev/null +++ b/openviking/daemon/watchers/claude_code_watcher.py @@ -0,0 +1,165 @@ +""" +Claude Code JSONL file watcher using watchdog for incremental monitoring. +""" +import json +import os +import time +from typing import Callable, Dict, List, Optional + +from watchdog.observers import Observer +from watchdog.events import FileSystemEventHandler + +from openviking.daemon.models import BatchBuffer +from openviking.daemon.cursor_manager import CursorManager +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class ClaudeCodeLogHandler(FileSystemEventHandler): + """Handles file system events for Claude Code JSONL files.""" + + def __init__( + self, + cursor_manager: CursorManager, + batch_callback: Callable[[List[Dict]], None], + batch_trigger_lines: int = 50, + batch_trigger_seconds: int = 300, + ): + super().__init__() + self.cursor_manager = cursor_manager + self.batch_callback = batch_callback + self.batch_trigger_lines = batch_trigger_lines + self.batch_trigger_seconds = batch_trigger_seconds + self.buffer = BatchBuffer() + self.last_batch_time = time.time() + + def on_modified(self, event): + """Handle file modification events.""" + if event.is_directory or not event.src_path.endswith(".jsonl"): + return + try: + self._process_file(event.src_path) + except Exception as e: + logger.error("Error processing %s: %s", event.src_path, e) + + def _process_file(self, file_path: str): + """Incrementally read new lines from a file and add to buffer.""" + cursor = self.cursor_manager.get_cursor(file_path) + current_size = os.path.getsize(file_path) + + if current_size <= cursor.last_position: + return + + with open(file_path, "r", encoding="utf-8") as f: + f.seek(cursor.last_position) + new_lines = f.readlines() + new_position = f.tell() + + parsed_events = [] + for line in new_lines: + event = self._parse_line(line) + if event: + parsed_events.append(event) + + filtered_events = self._filter_events(parsed_events) + + for event in filtered_events: + self.buffer.add_line(event, len(json.dumps(event))) + + self.cursor_manager.update_cursor(file_path, new_position) + self._check_batch_trigger() + + @staticmethod + def _parse_line(line: str) -> Optional[Dict]: + """Parse a single JSONL line. Returns None on failure.""" + try: + return json.loads(line.strip()) + except (json.JSONDecodeError, AttributeError): + return None + + @staticmethod + def _filter_events(events: List[Dict]) -> List[Dict]: + """Filter to only user/assistant message events.""" + return [ + e for e in events + if e.get("role") in ("user", "assistant") and e.get("type") == "message" + ] + + def _check_batch_trigger(self): + """Check if batch processing should be triggered.""" + should_trigger = False + + if time.time() - self.last_batch_time >= self.batch_trigger_seconds: + should_trigger = True + + if len(self.buffer.lines) >= self.batch_trigger_lines: + should_trigger = True + + if should_trigger and not self.buffer.is_empty(): + self._flush_buffer() + + def _flush_buffer(self): + """Send buffered events to the batch callback and reset buffer.""" + if self.buffer.is_empty(): + return + + logger.info("Flushing batch with %d events", len(self.buffer.lines)) + + try: + self.batch_callback(self.buffer.lines.copy()) + except Exception as e: + logger.error("Error in batch callback: %s", e) + + self.buffer.clear() + self.last_batch_time = time.time() + + def force_flush(self): + """Force flush the buffer regardless of trigger conditions.""" + self._flush_buffer() + + +class ClaudeCodeWatcher: + """Monitors Claude Code JSONL log files for new conversation data.""" + + def __init__( + self, + watch_dir: str, + cursor_manager: CursorManager, + batch_callback: Callable[[List[Dict]], None], + batch_trigger_lines: int = 50, + batch_trigger_seconds: int = 300, + ): + self.watch_dir = watch_dir + self.cursor_manager = cursor_manager + self.batch_callback = batch_callback + self.batch_trigger_lines = batch_trigger_lines + self.batch_trigger_seconds = batch_trigger_seconds + self.observer: Optional[Observer] = None + self.handler: Optional[ClaudeCodeLogHandler] = None + + def start(self): + """Start watching for file changes.""" + self.handler = ClaudeCodeLogHandler( + cursor_manager=self.cursor_manager, + batch_callback=self.batch_callback, + batch_trigger_lines=self.batch_trigger_lines, + batch_trigger_seconds=self.batch_trigger_seconds, + ) + + self.observer = Observer() + self.observer.schedule(self.handler, self.watch_dir, recursive=True) + self.observer.start() + logger.info("Claude Code watcher started on %s", self.watch_dir) + + def stop(self): + """Stop watching for file changes.""" + if self.observer: + self.observer.stop() + self.observer.join() + logger.info("Claude Code watcher stopped") + + def flush(self): + """Force flush any buffered events.""" + if self.handler: + self.handler.force_flush() diff --git a/tests/daemon/test_claude_code_watcher.py b/tests/daemon/test_claude_code_watcher.py new file mode 100644 index 0000000000..880a5d2634 --- /dev/null +++ b/tests/daemon/test_claude_code_watcher.py @@ -0,0 +1,88 @@ +"""Tests for ClaudeCodeWatcher parsing and filtering logic.""" +from openviking.daemon.watchers.claude_code_watcher import ClaudeCodeLogHandler + + +class FakeCursorManager: + """Minimal stub for testing.""" + def __init__(self): + self.cursors = {} + self.updates = [] + + def get_cursor(self, file_path): + from openviking.daemon.models import FileCursor + return self.cursors.get(file_path, FileCursor(file_path=file_path)) + + def update_cursor(self, file_path, position): + self.updates.append((file_path, position)) + + +def _make_handler(batch_trigger_lines=50, batch_trigger_seconds=300): + batches = [] + cursor_mgr = FakeCursorManager() + handler = ClaudeCodeLogHandler( + cursor_manager=cursor_mgr, + batch_callback=lambda lines: batches.append(lines), + batch_trigger_lines=batch_trigger_lines, + batch_trigger_seconds=batch_trigger_seconds, + ) + return handler, batches + + +def test_parse_valid_jsonl_line(): + handler, _ = _make_handler() + line = '{"timestamp": "2026-06-15T10:30:00Z", "role": "user", "content": "Hello", "type": "message"}' + event = handler._parse_line(line) + assert event is not None + assert event["role"] == "user" + assert event["content"] == "Hello" + + +def test_parse_invalid_line(): + handler, _ = _make_handler() + assert handler._parse_line("not valid json") is None + assert handler._parse_line("") is None + + +def test_filter_keeps_user_and_assistant_messages(): + handler, _ = _make_handler() + events = [ + {"role": "user", "type": "message", "content": "User question"}, + {"role": "assistant", "type": "message", "content": "AI answer"}, + {"role": "assistant", "type": "tool_call", "content": "Tool call"}, + {"role": "system", "type": "message", "content": "System msg"}, + ] + filtered = handler._filter_events(events) + assert len(filtered) == 2 + assert all(e["type"] == "message" for e in filtered) + + +def test_filter_excludes_tool_calls(): + handler, _ = _make_handler() + events = [ + {"role": "assistant", "type": "tool_call", "content": "call"}, + {"role": "assistant", "type": "tool_result", "content": "result"}, + ] + filtered = handler._filter_events(events) + assert len(filtered) == 0 + + +def test_buffer_add_and_flush(): + handler, batches = _make_handler(batch_trigger_lines=2) + handler.buffer.add_line({"role": "user", "content": "a"}, 10) + handler.buffer.add_line({"role": "assistant", "content": "b"}, 10) + handler._check_batch_trigger() + assert len(batches) == 1 + assert len(batches[0]) == 2 + + +def test_force_flush_empty_buffer(): + handler, batches = _make_handler() + handler.force_flush() + assert len(batches) == 0 + + +def test_force_flush_with_data(): + handler, batches = _make_handler() + handler.buffer.add_line({"role": "user", "content": "test"}, 10) + handler.force_flush() + assert len(batches) == 1 From fc844959e322d8720ece3f9c479ec9950e2bb00a Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:18:33 +0800 Subject: [PATCH 06/22] feat: add low-value conversation filter with regex patterns --- openviking/daemon/filters.py | 47 ++++++++++++++++++++++++++++++++ tests/daemon/test_filters.py | 53 ++++++++++++++++++++++++++++++++++++ 2 files changed, 100 insertions(+) create mode 100644 openviking/daemon/filters.py create mode 100644 tests/daemon/test_filters.py diff --git a/openviking/daemon/filters.py b/openviking/daemon/filters.py new file mode 100644 index 0000000000..4a3a58bbe2 --- /dev/null +++ b/openviking/daemon/filters.py @@ -0,0 +1,47 @@ +""" +Rule-based filters for low-value conversations. +Removes noise before LLM processing to save cost and improve quality. +""" +import re +from typing import Dict, List + +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class LowValueFilter: + """Filters out low-value conversations using regex rules.""" + + NOISE_PATTERNS = [ + r"^npm\s+(install|update|remove)", + r"^yarn\s+(add|remove)", + r"^pip\s+(install|uninstall)", + r"^git\s+(commit|push|pull|merge)", + r"^(SyntaxError|TypeError|ImportError|ModuleNotFoundError)", + r"^Retry\s+\d+/", + r"^Loading\.+", + r"^(format|indent|align)\s+(this|the)\s+code", + ] + + MIN_CONTENT_LENGTH = 20 + + def apply(self, events: List[Dict]) -> List[Dict]: + """Apply filtering rules to a list of events.""" + filtered = [] + + for event in events: + content = event.get("content", "").strip() + + # Rule 1: too short + if len(content) < self.MIN_CONTENT_LENGTH: + continue + + # Rule 2: noise pattern match + if any(re.match(p, content, re.IGNORECASE) for p in self.NOISE_PATTERNS): + continue + + filtered.append(event) + + logger.debug("Filtered %d events down to %d", len(events), len(filtered)) + return filtered diff --git a/tests/daemon/test_filters.py b/tests/daemon/test_filters.py new file mode 100644 index 0000000000..52d0c07d50 --- /dev/null +++ b/tests/daemon/test_filters.py @@ -0,0 +1,53 @@ +"""Tests for LowValueFilter.""" +from openviking.daemon.filters import LowValueFilter + + +def test_filter_short_content(): + f = LowValueFilter() + events = [ + {"content": "Short"}, + {"content": "This is a longer meaningful conversation about architecture"}, + ] + filtered = f.apply(events) + assert len(filtered) == 1 + + +def test_filter_noise_patterns(): + f = LowValueFilter() + events = [ + {"content": "npm install lodash --save"}, + {"content": "git commit -m 'fix bug'"}, + {"content": "Let's discuss the architecture design pattern for the new module"}, + ] + filtered = f.apply(events) + assert len(filtered) == 1 + assert "architecture" in filtered[0]["content"] + + +def test_filter_pip_install(): + f = LowValueFilter() + events = [ + {"content": "pip install requests library for HTTP calls"}, + ] + filtered = f.apply(events) + assert len(filtered) == 0 + + +def test_preserves_valid_content(): + f = LowValueFilter() + events = [ + {"content": "We decided to use PostgreSQL instead of MySQL for better JSON support"}, + {"content": "The memory leak was caused by unclosed database connections"}, + ] + filtered = f.apply(events) + assert len(filtered) == 2 + + +def test_empty_content_filtered(): + f = LowValueFilter() + events = [ + {"content": ""}, + {"content": " "}, + ] + filtered = f.apply(events) + assert len(filtered) == 0 From f07c29c6b82814b3b092264c89a9f9d693b9fd60 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:18:51 +0800 Subject: [PATCH 07/22] feat: add conversation reconstructor from flat events --- .../daemon/conversation_reconstructor.py | 53 +++++++++++++++++ .../daemon/test_conversation_reconstructor.py | 58 +++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 openviking/daemon/conversation_reconstructor.py create mode 100644 tests/daemon/test_conversation_reconstructor.py diff --git a/openviking/daemon/conversation_reconstructor.py b/openviking/daemon/conversation_reconstructor.py new file mode 100644 index 0000000000..2dba8f6898 --- /dev/null +++ b/openviking/daemon/conversation_reconstructor.py @@ -0,0 +1,53 @@ +""" +Reconstruct conversation turns from flat event lists. +Pairs user prompts with assistant responses into structured ConversationTurn objects. +""" +from typing import Dict, List + +from openviking.daemon.models import ConversationTurn +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class ConversationReconstructor: + """Reconstructs paired conversation turns from chronological events.""" + + def reconstruct(self, events: List[Dict]) -> List[ConversationTurn]: + """ + Pair user prompts with assistant responses. + Events are sorted by timestamp. An assistant response is paired + with the most recent unpaired user prompt. + """ + turns: List[ConversationTurn] = [] + current_user_prompt = None + current_metadata: Dict = {} + + sorted_events = sorted(events, key=lambda e: e.get("timestamp", "")) + + for event in sorted_events: + role = event.get("role") + content = event.get("content", "") + + if role == "user": + current_user_prompt = content + current_metadata = { + "session_id": event.get("session_id"), + "project_name": event.get("project_name"), + "timestamp": event.get("timestamp"), + } + elif role == "assistant" and current_user_prompt: + turns.append( + ConversationTurn( + user_prompt=current_user_prompt, + assistant_response=content, + session_id=current_metadata.get("session_id"), + project_name=current_metadata.get("project_name"), + timestamp=current_metadata.get("timestamp"), + ) + ) + current_user_prompt = None + current_metadata = {} + + logger.info("Reconstructed %d conversation turns from %d events", len(turns), len(events)) + return turns diff --git a/tests/daemon/test_conversation_reconstructor.py b/tests/daemon/test_conversation_reconstructor.py new file mode 100644 index 0000000000..b8c588855d --- /dev/null +++ b/tests/daemon/test_conversation_reconstructor.py @@ -0,0 +1,58 @@ +"""Tests for ConversationReconstructor.""" +from openviking.daemon.conversation_reconstructor import ConversationReconstructor + + +def test_reconstruct_simple_conversation(): + r = ConversationReconstructor() + events = [ + {"role": "user", "content": "How to configure PostgreSQL?", "timestamp": "2026-06-15T10:00:00Z"}, + {"role": "assistant", "content": "Edit postgresql.conf", "timestamp": "2026-06-15T10:00:01Z"}, + ] + turns = r.reconstruct(events) + assert len(turns) == 1 + assert turns[0].user_prompt == "How to configure PostgreSQL?" + assert turns[0].assistant_response == "Edit postgresql.conf" + + +def test_skip_orphaned_assistant(): + r = ConversationReconstructor() + events = [ + {"role": "assistant", "content": "Orphan answer", "timestamp": "2026-06-15T10:00:00Z"}, + ] + turns = r.reconstruct(events) + assert len(turns) == 0 + + +def test_multiple_turns(): + r = ConversationReconstructor() + events = [ + {"role": "user", "content": "Q1", "timestamp": "2026-06-15T10:00:00Z"}, + {"role": "assistant", "content": "A1", "timestamp": "2026-06-15T10:00:01Z"}, + {"role": "user", "content": "Q2", "timestamp": "2026-06-15T10:00:02Z"}, + {"role": "assistant", "content": "A2", "timestamp": "2026-06-15T10:00:03Z"}, + ] + turns = r.reconstruct(events) + assert len(turns) == 2 + assert turns[0].user_prompt == "Q1" + assert turns[1].user_prompt == "Q2" + + +def test_unpaired_user_prompt(): + r = ConversationReconstructor() + events = [ + {"role": "user", "content": "No answer", "timestamp": "2026-06-15T10:00:00Z"}, + ] + turns = r.reconstruct(events) + assert len(turns) == 0 + + +def test_preserves_metadata(): + r = ConversationReconstructor() + events = [ + {"role": "user", "content": "Q", "timestamp": "2026-06-15T10:00:00Z", "session_id": "s1", "project_name": "proj"}, + {"role": "assistant", "content": "A", "timestamp": "2026-06-15T10:00:01Z"}, + ] + turns = r.reconstruct(events) + assert turns[0].session_id == "s1" + assert turns[0].project_name == "proj" + assert turns[0].timestamp == "2026-06-15T10:00:00Z" From 32982fb18873565d21629dff85ccc94198ff4d37 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:25:22 +0800 Subject: [PATCH 08/22] feat: add LLM-based knowledge extractor with prompt template --- openviking/daemon/knowledge_extractor.py | 128 +++++++++++++++++++++++ tests/daemon/test_knowledge_extractor.py | 123 ++++++++++++++++++++++ 2 files changed, 251 insertions(+) create mode 100644 openviking/daemon/knowledge_extractor.py create mode 100644 tests/daemon/test_knowledge_extractor.py diff --git a/openviking/daemon/knowledge_extractor.py b/openviking/daemon/knowledge_extractor.py new file mode 100644 index 0000000000..27d5a9ea11 --- /dev/null +++ b/openviking/daemon/knowledge_extractor.py @@ -0,0 +1,128 @@ +""" +LLM-based knowledge extraction from conversations. +Uses OpenViking's existing LLM configuration for intelligent filtering and summarization. +""" +import json +import re +from typing import Dict, Optional + +from openviking.daemon.models import ConversationTurn, ExtractedKnowledge +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + +KNOWLEDGE_EXTRACTION_PROMPT = """# Role: OpenViking Automated Context Extraction Expert + +# Task: +Analyze the AI-assisted development conversation below and convert it into structured knowledge for viking:// storage. + +# Input: +User question: {user_prompt} +AI answer: {assistant_response} + +# Filtering rules: +Output if the conversation is just: +- Minor syntax error fixes (missing semicolons, typos) +- Invalid commands or retry loops +- Pure code formatting or indentation changes +- Simple factual questions ("what is Python") + +Extract ONLY when the conversation contains: +- Explicit system configuration decisions +- Root cause analysis for complex bugs +- New architectural rules for the project +- Core development intent +- Reusable skills or best practices + +# Output (strict JSON, no markdown code blocks): +{{ + "status": "EXTRACTED" | "IGNORED", + "category": "skills" | "memories" | "resources", + "confidence": 0.0-1.0, + "title": "One-line summary (max 30 chars)", + "project_name": "project name or null", + "entity_links": ["tech tags", "module names"], + "content": "Concise conclusion. What was the problem, what was the solution, why this choice.", + "actionable_steps": ["steps if skills, else empty"] +}} +""" + + +class KnowledgeExtractor: + """Extracts structured knowledge from conversation turns using LLM.""" + + def __init__(self, llm_service=None): + """ + Args: + llm_service: Optional LLM service. If None, will use OpenViking's default. + """ + self._llm_service = llm_service + + def _get_llm_service(self): + """Lazy-load the LLM service from OpenViking if not provided.""" + if self._llm_service is None: + from openviking.server.dependencies import get_service + self._llm_service = get_service().llm + return self._llm_service + + async def extract(self, turn: ConversationTurn) -> Optional[ExtractedKnowledge]: + """Extract knowledge from a conversation turn. Returns None if not valuable.""" + prompt = KNOWLEDGE_EXTRACTION_PROMPT.format( + user_prompt=turn.user_prompt, + assistant_response=turn.assistant_response, + ) + + try: + response = await self._call_llm(prompt) + + if not response or response.get("status") != "EXTRACTED": + return None + + if response.get("confidence", 0) < 0.6: + return None + + return ExtractedKnowledge( + status=response["status"], + category=response["category"], + title=response.get("title", "")[:50], + content=self._clean_content(response.get("content", "")), + confidence=response.get("confidence", 0.0), + project_name=response.get("project_name"), + entity_links=response.get("entity_links", []), + actionable_steps=response.get("actionable_steps", []), + timestamp=turn.timestamp, + ) + + except Exception as e: + logger.error("Error extracting knowledge: %s", e) + return None + + async def _call_llm(self, prompt: str) -> Optional[Dict]: + """Call LLM and parse JSON response.""" + try: + llm = self._get_llm_service() + response = await llm.complete( + prompt=prompt, + temperature=0.3, + max_tokens=500, + ) + + text = response.text.strip() + # Strip markdown code block markers if present + text = re.sub(r"```json\s*", "", text) + text = re.sub(r"\s*```", "", text) + + return json.loads(text) + + except json.JSONDecodeError as e: + logger.warning("JSON parse error in LLM response: %s", e) + return None + except Exception as e: + logger.error("LLM call failed: %s", e) + return None + + @staticmethod + def _clean_content(content: str) -> str: + """Remove markdown artifacts from content.""" + content = re.sub(r"```.*?```", "", content, flags=re.DOTALL) + return content.strip() diff --git a/tests/daemon/test_knowledge_extractor.py b/tests/daemon/test_knowledge_extractor.py new file mode 100644 index 0000000000..56391929d2 --- /dev/null +++ b/tests/daemon/test_knowledge_extractor.py @@ -0,0 +1,123 @@ +"""Tests for KnowledgeExtractor.""" +import json +from dataclasses import dataclass +from typing import Optional + +import pytest + +from openviking.daemon.knowledge_extractor import KnowledgeExtractor +from openviking.daemon.models import ConversationTurn, ExtractedKnowledge + + +@dataclass +class MockLLMResponse: + text: str + + +class MockLLMService: + """Mock LLM service that returns canned responses.""" + + def __init__(self, response_text: str): + self.response_text = response_text + + async def complete(self, **kwargs): + return MockLLMResponse(text=self.response_text) + + +def _make_turn(user="How to configure PostgreSQL?", assistant="Edit postgresql.conf"): + return ConversationTurn( + user_prompt=user, + assistant_response=assistant, + timestamp="2026-06-15T10:00:00Z", + ) + + +@pytest.mark.asyncio +async def test_extract_valid_knowledge(): + llm_response = json.dumps({ + "status": "EXTRACTED", + "category": "skills", + "confidence": 0.9, + "title": "PostgreSQL Config", + "content": "Configure PostgreSQL by editing postgresql.conf", + "project_name": "my-project", + "entity_links": ["PostgreSQL"], + "actionable_steps": ["Edit postgresql.conf"], + }) + extractor = KnowledgeExtractor(llm_service=MockLLMService(llm_response)) + result = await extractor.extract(_make_turn()) + + assert result is not None + assert result.status == "EXTRACTED" + assert result.category == "skills" + assert result.confidence == 0.9 + + +@pytest.mark.asyncio +async def test_extract_ignored_status(): + llm_response = json.dumps({ + "status": "IGNORED", + "category": "memories", + "confidence": 0.3, + "title": "Typo fix", + "content": "Fixed typo", + }) + extractor = KnowledgeExtractor(llm_service=MockLLMService(llm_response)) + result = await extractor.extract(_make_turn()) + + assert result is None + + +@pytest.mark.asyncio +async def test_extract_low_confidence(): + llm_response = json.dumps({ + "status": "EXTRACTED", + "category": "memories", + "confidence": 0.4, + "title": "Low confidence", + "content": "Some content", + }) + extractor = KnowledgeExtractor(llm_service=MockLLMService(llm_response)) + result = await extractor.extract(_make_turn()) + + assert result is None + + +@pytest.mark.asyncio +async def test_extract_invalid_json(): + extractor = KnowledgeExtractor(llm_service=MockLLMService("not valid json")) + result = await extractor.extract(_make_turn()) + + assert result is None + + +@pytest.mark.asyncio +async def test_extract_cleans_markdown(): + llm_response = json.dumps({ + "status": "EXTRACTED", + "category": "memories", + "confidence": 0.8, + "title": "Test", + "content": "```python\nsome code\n```\nActual content here", + }) + extractor = KnowledgeExtractor(llm_service=MockLLMService(llm_response)) + result = await extractor.extract(_make_turn()) + + assert result is not None + assert "```" not in result.content + + +@pytest.mark.asyncio +async def test_title_truncated(): + llm_response = json.dumps({ + "status": "EXTRACTED", + "category": "memories", + "confidence": 0.8, + "title": "A" * 100, + "content": "Content", + }) + extractor = KnowledgeExtractor(llm_service=MockLLMService(llm_response)) + result = await extractor.extract(_make_turn()) + + assert result is not None + assert len(result.title) <= 50 From 9cb2bb313d5aff7548c5abdea4349506181c7cb9 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:25:37 +0800 Subject: [PATCH 09/22] feat: add batch ETL pipeline with filtering and deduplication --- openviking/daemon/etl_pipeline.py | 64 +++++++++++++++++++++++++++++++ 1 file changed, 64 insertions(+) create mode 100644 openviking/daemon/etl_pipeline.py diff --git a/openviking/daemon/etl_pipeline.py b/openviking/daemon/etl_pipeline.py new file mode 100644 index 0000000000..f601537fd9 --- /dev/null +++ b/openviking/daemon/etl_pipeline.py @@ -0,0 +1,64 @@ +""" +Batch ETL pipeline for processing conversation events into structured knowledge. +Orchestrates: Filter -> Reconstruct -> Extract -> Deduplicate +""" +import asyncio +from typing import Dict, List + +from openviking.daemon.models import ExtractedKnowledge +from openviking.daemon.filters import LowValueFilter +from openviking.daemon.conversation_reconstructor import ConversationReconstructor +from openviking.daemon.knowledge_extractor import KnowledgeExtractor +from openviking.daemon.deduplicator import KnowledgeDeduplicator +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class BatchETLPipeline: + """Orchestrates the full ETL flow from raw events to structured knowledge.""" + + def __init__(self, llm_service=None): + self.filter = LowValueFilter() + self.reconstructor = ConversationReconstructor() + self.extractor = KnowledgeExtractor(llm_service=llm_service) + self.deduplicator = KnowledgeDeduplicator() + + async def process_batch(self, events: List[Dict]) -> List[ExtractedKnowledge]: + """ + Process a batch of raw conversation events. + + Flow: events -> filter -> reconstruct -> extract (parallel) -> deduplicate + """ + logger.info("Processing batch with %d events", len(events)) + + # Step 1: Filter low-value content + filtered_events = self.filter.apply(events) + logger.info("After filtering: %d events", len(filtered_events)) + + if not filtered_events: + return [] + + # Step 2: Reconstruct conversation turns + turns = self.reconstructor.reconstruct(filtered_events) + logger.info("Reconstructed %d conversation turns", len(turns)) + + if not turns: + return [] + + # Step 3: Extract knowledge in parallel + tasks = [self.extractor.extract(turn) for turn in turns] + results = await asyncio.gather(*tasks, return_exceptions=True) + + # Step 4: Filter errors and deduplicate + extracted: List[ExtractedKnowledge] = [] + for result in results: + if isinstance(result, Exception): + logger.error("Extraction failed: %s", result) + continue + + if result is not None and not self.deduplicator.is_duplicate(result): + extracted.append(result) + + logger.info("Extracted %d knowledge items from batch", len(extracted)) + return extracted From 541dbe9cbb8b347acc93161fc40376c9fd251d6c Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:30:48 +0800 Subject: [PATCH 10/22] feat: add knowledge router with viking:// URI mapping --- openviking/daemon/knowledge_router.py | 54 ++++++++++++++++++++++ tests/daemon/test_knowledge_router.py | 65 +++++++++++++++++++++++++++ 2 files changed, 119 insertions(+) create mode 100644 openviking/daemon/knowledge_router.py create mode 100644 tests/daemon/test_knowledge_router.py diff --git a/openviking/daemon/knowledge_router.py b/openviking/daemon/knowledge_router.py new file mode 100644 index 0000000000..d49cebbdc7 --- /dev/null +++ b/openviking/daemon/knowledge_router.py @@ -0,0 +1,54 @@ +""" +Route extracted knowledge to appropriate viking:// URIs based on category and project. +""" +import re +from typing import Optional + +from openviking.daemon.models import ExtractedKnowledge +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class KnowledgeRouter: + """Routes knowledge items to viking:// URIs based on category.""" + + def route(self, knowledge: ExtractedKnowledge) -> Optional[str]: + """ + Determine the target URI for a knowledge item. + + Routing rules: + - skills -> viking://skills/claude_code/.md + - memories (with project) -> viking://memories/projects/<project>/decisions.md + - memories (no project) -> viking://memories/global/<title>.md + - resources -> viking://resources/<tech_stack>/<title>.md + """ + category = knowledge.category + project_name = knowledge.project_name + title = self._sanitize_filename(knowledge.title) + + if category == "skills": + return f"viking://skills/claude_code/{title}.md" + + elif category == "memories": + if project_name: + safe_project = self._sanitize_filename(project_name) + return f"viking://memories/projects/{safe_project}/decisions.md" + else: + return f"viking://memories/global/{title}.md" + + elif category == "resources": + entity_links = knowledge.entity_links + tech_stack = self._sanitize_filename(entity_links[0]) if entity_links else "general" + return f"viking://resources/{tech_stack}/{title}.md" + + else: + logger.warning("Unknown category: %s", category) + return None + + @staticmethod + def _sanitize_filename(name: str) -> str: + """Sanitize a string for use as a filename.""" + sanitized = re.sub(r'[<>:"/\\|?*]', '_', name) + sanitized = sanitized.strip().replace(' ', '_') + return sanitized[:50] diff --git a/tests/daemon/test_knowledge_router.py b/tests/daemon/test_knowledge_router.py new file mode 100644 index 0000000000..f60382ea00 --- /dev/null +++ b/tests/daemon/test_knowledge_router.py @@ -0,0 +1,65 @@ +"""Tests for KnowledgeRouter.""" +from openviking.daemon.knowledge_router import KnowledgeRouter +from openviking.daemon.models import ExtractedKnowledge + + +def _make(**kwargs) -> ExtractedKnowledge: + defaults = { + "status": "EXTRACTED", + "category": "memories", + "title": "Test Title", + "content": "Some content", + } + defaults.update(kwargs) + return ExtractedKnowledge(**defaults) + + +def test_route_skills(): + router = KnowledgeRouter() + k = _make(category="skills", title="PostgreSQL Config") + uri = router.route(k) + assert uri.startswith("viking://skills/claude_code/") + assert uri.endswith(".md") + + +def test_route_memories_with_project(): + router = KnowledgeRouter() + k = _make(category="memories", title="Arch Decision", project_name="my-project") + uri = router.route(k) + assert "my-project" in uri + assert uri.endswith("decisions.md") + + +def test_route_memories_global(): + router = KnowledgeRouter() + k = _make(category="memories", title="Global Memory") + uri = router.route(k) + assert uri.startswith("viking://memories/global/") + + +def test_route_resources(): + router = KnowledgeRouter() + k = _make(category="resources", title="Redis Guide", entity_links=["Redis"]) + uri = router.route(k) + assert "Redis" in uri + assert uri.endswith(".md") + + +def test_route_resources_no_tags(): + router = KnowledgeRouter() + k = _make(category="resources", title="General", entity_links=[]) + uri = router.route(k) + assert "general" in uri + + +def test_route_unknown_category(): + router = KnowledgeRouter() + k = _make(category="unknown") + uri = router.route(k) + assert uri is None + + +def test_sanitize_filename(): + router = KnowledgeRouter() + assert router._sanitize_filename('file<>:name') == 'file___name' + assert router._sanitize_filename("a" * 100) == "a" * 50 From 97efb97d3518c348249d142f8907e680a319f234 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:31:05 +0800 Subject: [PATCH 11/22] feat: add Viking storage adapter with ResourceService integration --- openviking/daemon/storage_adapter.py | 152 +++++++++++++++++++++++++++ 1 file changed, 152 insertions(+) create mode 100644 openviking/daemon/storage_adapter.py diff --git a/openviking/daemon/storage_adapter.py b/openviking/daemon/storage_adapter.py new file mode 100644 index 0000000000..41d5924bbb --- /dev/null +++ b/openviking/daemon/storage_adapter.py @@ -0,0 +1,152 @@ +""" +Adapter for writing extracted knowledge to OpenViking via ResourceService. +Generates temporary Markdown files and ingests them through the standard resource pipeline. +""" +import os +import tempfile +from datetime import datetime +from typing import Optional + +from openviking.daemon.models import ExtractedKnowledge +from openviking.daemon.knowledge_router import KnowledgeRouter +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class VikingStorageAdapter: + """Writes structured knowledge to OpenViking via ResourceService.add_resource().""" + + def __init__(self, resource_service): + self.resource_service = resource_service + self.router = KnowledgeRouter() + + async def write_knowledge( + self, + knowledge: ExtractedKnowledge, + ctx, + ) -> bool: + """ + Write a knowledge item to viking:// storage. + + Args: + knowledge: The extracted knowledge to write + ctx: OpenViking RequestContext + + Returns: + True if successful, False otherwise + """ + uri = self.router.route(knowledge) + if not uri: + logger.warning("Cannot route knowledge: %s", knowledge.title) + return False + + temp_file = self._generate_temp_markdown(knowledge) + + try: + await self.resource_service.add_resource( + path=temp_file, + ctx=ctx, + to=uri, + build_index=True, + summarize=False, + ) + logger.info("Knowledge ingested: %s", uri) + return True + + except Exception as e: + logger.error("Failed to ingest knowledge to %s: %s", uri, e) + return False + + finally: + if os.path.exists(temp_file): + os.unlink(temp_file) + + def _generate_temp_markdown(self, knowledge: ExtractedKnowledge) -> str: + """Generate a temporary Markdown file for the knowledge item.""" + fd, path = tempfile.mkstemp(suffix=".md") + content = self._format_content(knowledge) + + with os.fdopen(fd, "w", encoding="utf-8") as f: + f.write(content) + + return path + + def _format_content(self, knowledge: ExtractedKnowledge) -> str: + """Format knowledge content as Markdown based on category.""" + timestamp = datetime.now().isoformat() + + if knowledge.category == "skills": + return self._format_skill(knowledge, timestamp) + elif knowledge.category == "memories": + return self._format_memory(knowledge, timestamp) + else: + return self._format_resource(knowledge, timestamp) + + @staticmethod + def _format_skill(knowledge: ExtractedKnowledge, timestamp: str) -> str: + """Format a skill knowledge item.""" + steps = "" + if knowledge.actionable_steps: + steps = "\n## Steps\n\n" + for i, step in enumerate(knowledge.actionable_steps, 1): + steps += f"{i}. {step}\n" + else: + steps = "\n## Steps\n\n(No specific steps)\n" + + return f"""# {knowledge.title} + +**Extracted**: {timestamp} +**Source**: Claude Code session +**Confidence**: {knowledge.confidence} + +## Content + +{knowledge.content} +{steps} +--- +*Auto-generated by OpenViking Active Daemon* +""" + + @staticmethod + def _format_memory(knowledge: ExtractedKnowledge, timestamp: str) -> str: + """Format a memory knowledge item (append-friendly).""" + ts = "" + if knowledge.timestamp: + try: + ts = datetime.fromisoformat(knowledge.timestamp).strftime("%Y-%m-%d %H:%M") + except ValueError: + ts = timestamp + else: + ts = timestamp + + tags = ", ".join(knowledge.entity_links) if knowledge.entity_links else "none" + + return f"""## [{ts}] {knowledge.title} + +{knowledge.content} + +**Tags**: {tags} + +--- + +""" + + @staticmethod + def _format_resource(knowledge: ExtractedKnowledge, timestamp: str) -> str: + """Format a resource knowledge item.""" + tech = knowledge.entity_links[0] if knowledge.entity_links else "general" + + return f"""# {knowledge.title} + +**Category**: {tech} +**Extracted**: {timestamp} +**Source**: Claude Code session + +## Content + +{knowledge.content} + +--- +*Auto-generated by OpenViking Active Daemon* +""" From 4d2b8f681c9777b156c4861d13ea71050a56e660 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:38:03 +0800 Subject: [PATCH 12/22] feat: add DaemonService with lifecycle management --- openviking/daemon/__init__.py | 6 +- openviking/daemon/service.py | 160 ++++++++++++++++++++++++++++++++++ 2 files changed, 165 insertions(+), 1 deletion(-) create mode 100644 openviking/daemon/service.py diff --git a/openviking/daemon/__init__.py b/openviking/daemon/__init__.py index b64223e583..ba94f0d050 100644 --- a/openviking/daemon/__init__.py +++ b/openviking/daemon/__init__.py @@ -1,3 +1,7 @@ """ -OpenViking Active Daemon - Monitors Claude Code JSONL logs and extracts knowledge into viking:// storage. +OpenViking Active Daemon package. +Monitors AI tool logs and automatically extracts knowledge into viking:// storage. """ +from openviking.daemon.service import DaemonService + +__all__ = ["DaemonService"] diff --git a/openviking/daemon/service.py b/openviking/daemon/service.py new file mode 100644 index 0000000000..d8e14baef8 --- /dev/null +++ b/openviking/daemon/service.py @@ -0,0 +1,160 @@ +""" +OpenViking Active Daemon main service. +Orchestrates file watching, ETL processing, and knowledge storage. +""" +import asyncio +import os +from pathlib import Path +from typing import Optional + +from openviking.daemon.cursor_manager import CursorManager +from openviking.daemon.watchers.claude_code_watcher import ClaudeCodeWatcher +from openviking.daemon.etl_pipeline import BatchETLPipeline +from openviking.daemon.storage_adapter import VikingStorageAdapter +from openviking_cli.utils.logger import get_logger + +logger = get_logger(__name__) + + +class DaemonService: + """ + OpenViking Active Daemon main service. + Monitors Claude Code JSONL files and extracts knowledge into viking:// storage. + """ + + def __init__( + self, + resource_service, + watch_dir: Optional[str] = None, + db_path: Optional[str] = None, + batch_trigger_lines: int = 50, + batch_trigger_seconds: int = 300, + ): + self.resource_service = resource_service + + # Default paths + home = Path.home() + self.watch_dir = watch_dir or str(home / ".claude" / "projects") + self.db_path = db_path or str( + home / ".qoderworkcn" / "openviking" / "daemon_cursors.db" + ) + + self.batch_trigger_lines = batch_trigger_lines + self.batch_trigger_seconds = batch_trigger_seconds + + # Components (initialized in start()) + self.cursor_manager: Optional[CursorManager] = None + self.watcher: Optional[ClaudeCodeWatcher] = None + self.etl_pipeline: Optional[BatchETLPipeline] = None + self.storage_adapter: Optional[VikingStorageAdapter] = None + + self._running = False + self._etl_task: Optional[asyncio.Task] = None + self._batch_queue: asyncio.Queue = asyncio.Queue() + + async def start(self): + """Start the Daemon service.""" + logger.info("Starting OpenViking Active Daemon...") + + # Initialize components + self.cursor_manager = CursorManager(self.db_path) + self.etl_pipeline = BatchETLPipeline() + self.storage_adapter = VikingStorageAdapter(self.resource_service) + + # Ensure watch directory exists + Path(self.watch_dir).mkdir(parents=True, exist_ok=True) + + # Start the ETL processing loop as a background task + self._etl_task = asyncio.create_task(self._etl_loop()) + + # Start the file watcher + # The watcher runs in a separate thread (watchdog), so we pass + # a sync callback that puts events onto the async queue + self.watcher = ClaudeCodeWatcher( + watch_dir=self.watch_dir, + cursor_manager=self.cursor_manager, + batch_callback=self._enqueue_batch, + batch_trigger_lines=self.batch_trigger_lines, + batch_trigger_seconds=self.batch_trigger_seconds, + ) + self.watcher.start() + + self._running = True + logger.info("Daemon started, watching: %s", self.watch_dir) + + async def stop(self): + """Stop the Daemon service.""" + logger.info("Stopping OpenViking Active Daemon...") + + self._running = False + + if self.watcher: + self.watcher.stop() + + # Signal the ETL loop to stop + if self._etl_task: + await self._batch_queue.put(None) # sentinel + try: + await asyncio.wait_for(self._etl_task, timeout=10) + except asyncio.TimeoutError: + self._etl_task.cancel() + + logger.info("Daemon stopped") + + def _enqueue_batch(self, events): + """Sync callback from watcher thread - puts events onto async queue.""" + try: + self._batch_queue.put_nowait(events) + except Exception as e: + logger.error("Failed to enqueue batch: %s", e) + + async def _etl_loop(self): + """Background loop that processes batches from the queue.""" + logger.info("ETL processing loop started") + + while self._running: + try: + events = await asyncio.wait_for( + self._batch_queue.get(), timeout=5.0 + ) + except asyncio.TimeoutError: + continue + + if events is None: + break # shutdown sentinel + + try: + extracted = await self.etl_pipeline.process_batch(events) + if not extracted: + logger.info("No knowledge extracted from batch") + continue + + # Write to OpenViking + for knowledge in extracted: + try: + from openviking.server.identity import RequestContext + ctx = RequestContext(user_id="daemon", session_id="daemon-session") + success = await self.storage_adapter.write_knowledge( + knowledge, ctx + ) + if success: + logger.info("Successfully wrote: %s", knowledge.title) + else: + logger.warning("Failed to write: %s", knowledge.title) + except Exception as e: + logger.error("Error writing knowledge: %s", e) + + except Exception as e: + logger.error("Error in ETL processing: %s", e, exc_info=True) + + logger.info("ETL processing loop stopped") + + async def flush(self): + """Force flush any buffered events.""" + if self.watcher: + self.watcher.flush() + logger.info("Manual flush triggered") + + @property + def is_running(self) -> bool: + return self._running From 12596ce0014c612ea59ccacf399dcce9dcac55fe Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:38:17 +0800 Subject: [PATCH 13/22] feat: add DaemonConfig with environment variable support --- openviking/server/config.py | 25 +++++++++++++++++++++++++ 1 file changed, 25 insertions(+) diff --git a/openviking/server/config.py b/openviking/server/config.py index 88943f8f4a..05ceeec4f6 100644 --- a/openviking/server/config.py +++ b/openviking/server/config.py @@ -156,6 +156,30 @@ class ToolOutputExternalizationConfig(BaseModel): model_config = {"extra": "forbid"} +class DaemonConfig(BaseModel): + """Configuration for OpenViking Active Daemon.""" + + enabled: bool = False + watch_dir: Optional[str] = None + db_path: Optional[str] = None + batch_trigger_lines: int = Field(50, gt=0) + batch_trigger_seconds: int = Field(300, gt=0) + + model_config = {"extra": "forbid"} + + @classmethod + def from_env(cls) -> "DaemonConfig": + """Load configuration from OV_DAEMON_* environment variables.""" + import os + return cls( + enabled=os.getenv("OV_DAEMON_ENABLED", "false").lower() == "true", + watch_dir=os.getenv("OV_DAEMON_WATCH_DIR"), + db_path=os.getenv("OV_DAEMON_DB_PATH"), + batch_trigger_lines=int(os.getenv("OV_DAEMON_BATCH_LINES", "50")), + batch_trigger_seconds=int(os.getenv("OV_DAEMON_BATCH_SECONDS", "300")), + ) + + class ServerConfig(BaseModel): host: str = "127.0.0.1" port: int = 1933 @@ -180,6 +204,7 @@ class ServerConfig(BaseModel): tool_output_externalization: ToolOutputExternalizationConfig = Field( default_factory=ToolOutputExternalizationConfig ) + daemon: DaemonConfig = Field(default_factory=DaemonConfig) model_config = {"extra": "forbid"} From 6ec1c4415b1d48160572fd9ae08f052f86ea6e93 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:38:33 +0800 Subject: [PATCH 14/22] feat: integrate Active Daemon into server bootstrap --- openviking/server/bootstrap.py | 35 +++++++++++++++++++++++++++++++++- 1 file changed, 34 insertions(+), 1 deletion(-) diff --git a/openviking/server/bootstrap.py b/openviking/server/bootstrap.py index 7fa0228e25..ae2f4241ed 100644 --- a/openviking/server/bootstrap.py +++ b/openviking/server/bootstrap.py @@ -17,7 +17,7 @@ import uvicorn from openviking.server.app import create_app -from openviking.server.config import load_server_config +from openviking.server.config import DaemonConfig, load_server_config from openviking_cli.utils.config import OPENVIKING_CONFIG_ENV from openviking_cli.utils.config.config_loader import resolve_config_path from openviking_cli.utils.config.consts import ( @@ -271,6 +271,31 @@ def main(): workers_info = f" (workers: {config.workers})" if config.workers > 1 else "" print(f"OpenViking HTTP Server is running on {config.host}:{config.port}{workers_info}") + # Start Active Daemon if enabled + daemon_service = None + daemon_config = config.daemon + if not daemon_config.enabled: + # Also check env var as fallback + daemon_config = DaemonConfig.from_env() + + if daemon_config.enabled: + try: + from openviking.daemon.service import DaemonService + from openviking.server.dependencies import get_service + resource_service = get_service().resource + daemon_service = DaemonService( + resource_service=resource_service, + watch_dir=daemon_config.watch_dir, + db_path=daemon_config.db_path, + batch_trigger_lines=daemon_config.batch_trigger_lines, + batch_trigger_seconds=daemon_config.batch_trigger_seconds, + ) + import asyncio + asyncio.get_event_loop().run_until_complete(daemon_service.start()) + print("Active Daemon started, watching:", daemon_service.watch_dir) + except Exception as e: + print(f"Warning: Failed to start Active Daemon: {e}", file=sys.stderr) + try: workers = config.workers if workers > 1: @@ -289,6 +314,14 @@ def main(): else: uvicorn.run(app, host=config.host, port=config.port, log_config=None) finally: + # Stop Active Daemon on shutdown + if daemon_service is not None: + try: + import asyncio + asyncio.get_event_loop().run_until_complete(daemon_service.stop()) + except Exception as e: + print(f"Error stopping Active Daemon: {e}", file=sys.stderr) + # Cleanup vikingbot process on shutdown if bot_process is not None: _stop_vikingbot_gateway(bot_process) From 153947dd16cf6f989612f90ebfcda9bc253d30c2 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:46:20 +0800 Subject: [PATCH 15/22] test: add integration tests for Active Daemon --- tests/daemon/test_integration.py | 189 +++++++++++++++++++++++++++++++ 1 file changed, 189 insertions(+) create mode 100644 tests/daemon/test_integration.py diff --git a/tests/daemon/test_integration.py b/tests/daemon/test_integration.py new file mode 100644 index 0000000000..d03f07e3e4 --- /dev/null +++ b/tests/daemon/test_integration.py @@ -0,0 +1,189 @@ +"""Integration tests for OpenViking Active Daemon.""" +import json +import os +import tempfile +from pathlib import Path +from unittest.mock import AsyncMock, MagicMock + +import pytest + +from openviking.daemon.cursor_manager import CursorManager +from openviking.daemon.deduplicator import KnowledgeDeduplicator +from openviking.daemon.etl_pipeline import BatchETLPipeline +from openviking.daemon.filters import LowValueFilter +from openviking.daemon.conversation_reconstructor import ConversationReconstructor +from openviking.daemon.knowledge_router import KnowledgeRouter +from openviking.daemon.storage_adapter import VikingStorageAdapter +from openviking.daemon.models import ExtractedKnowledge + + +@pytest.fixture +def temp_watch_dir(): + with tempfile.TemporaryDirectory() as tmpdir: + yield Path(tmpdir) + + +@pytest.fixture +def temp_db(): + fd, path = tempfile.mkstemp(suffix=".db") + os.close(fd) + yield path + if os.path.exists(path): + os.unlink(path) + + +def test_jsonl_file_created_and_readable(temp_watch_dir): + """Verify we can create and read JSONL files in the watch directory.""" + jsonl_file = temp_watch_dir / "test_session.jsonl" + + events = [ + { + "timestamp": "2026-06-15T10:00:00Z", + "role": "user", + "content": "How to configure PostgreSQL for high availability?", + "type": "message", + }, + { + "timestamp": "2026-06-15T10:00:01Z", + "role": "assistant", + "content": "Edit postgresql.conf, set max_connections=100 and enable WAL archiving", + "type": "message", + }, + ] + + with open(jsonl_file, "w", encoding="utf-8") as f: + for event in events: + f.write(json.dumps(event) + "\n") + + assert jsonl_file.exists() + lines = jsonl_file.read_text(encoding="utf-8").strip().split("\n") + assert len(lines) == 2 + + parsed = [json.loads(line) for line in lines] + assert parsed[0]["role"] == "user" + assert parsed[1]["role"] == "assistant" + + +def test_cursor_tracks_jsonl_progress(temp_watch_dir, temp_db): + """Verify CursorManager correctly tracks incremental reads.""" + jsonl_file = temp_watch_dir / "session.jsonl" + + # Write first batch + with open(jsonl_file, "w", encoding="utf-8") as f: + f.write(json.dumps({"role": "user", "content": "First question", "type": "message"}) + "\n") + + cursor_mgr = CursorManager(temp_db) + cursor = cursor_mgr.get_cursor(str(jsonl_file)) + assert cursor.last_position == 0 + + # Read the file + with open(jsonl_file, "r", encoding="utf-8") as f: + f.seek(cursor.last_position) + lines = f.readlines() + new_position = f.tell() + + assert len(lines) == 1 + assert new_position > 0 + + cursor_mgr.update_cursor(str(jsonl_file), new_position) + + # Verify cursor persisted + cursor2 = cursor_mgr.get_cursor(str(jsonl_file)) + assert cursor2.last_position == new_position + + # Append more data + with open(jsonl_file, "a", encoding="utf-8") as f: + f.write(json.dumps({"role": "assistant", "content": "Answer", "type": "message"}) + "\n") + + # Read incrementally + with open(jsonl_file, "r", encoding="utf-8") as f: + f.seek(cursor2.last_position) + new_lines = f.readlines() + + assert len(new_lines) == 1 + assert json.loads(new_lines[0])["role"] == "assistant" + + +def test_filter_and_reconstruct_pipeline(): + """Verify the filter -> reconstruct pipeline works end-to-end.""" + events = [ + {"role": "user", "content": "Hi", "type": "message", "timestamp": "2026-06-15T10:00:00Z"}, # Too short + {"role": "user", "content": "npm install lodash --save-dev", "type": "message", "timestamp": "2026-06-15T10:00:01Z"}, # Noise + {"role": "user", "content": "How should we handle database migrations in production?", "type": "message", "timestamp": "2026-06-15T10:00:02Z"}, + {"role": "assistant", "content": "Use Alembic with versioned migration scripts and run them in a CI pipeline before deployment", "type": "message", "timestamp": "2026-06-15T10:00:03Z"}, + ] + + # Step 1: Filter + filt = LowValueFilter() + filtered = filt.apply(events) + assert len(filtered) == 2 # "Hi" and "npm install" removed + + # Step 2: Reconstruct + reconstructor = ConversationReconstructor() + turns = reconstructor.reconstruct(filtered) + assert len(turns) == 1 + assert "database migrations" in turns[0].user_prompt + assert "Alembic" in turns[0].assistant_response + + +def test_knowledge_router_all_categories(): + """Verify router handles all categories correctly.""" + router = KnowledgeRouter() + + skill = ExtractedKnowledge(status="EXTRACTED", category="skills", title="PG Config", content="...") + assert "skills/claude_code" in router.route(skill) + + mem_proj = ExtractedKnowledge(status="EXTRACTED", category="memories", title="Arch", content="...", project_name="myapp") + assert "memories/projects/myapp" in router.route(mem_proj) + + mem_global = ExtractedKnowledge(status="EXTRACTED", category="memories", title="General", content="...") + assert "memories/global" in router.route(mem_global) + + resource = ExtractedKnowledge(status="EXTRACTED", category="resources", title="Redis", content="...", entity_links=["Redis"]) + assert "resources/Redis" in router.route(resource) + + +def test_deduplicator_prevents_duplicates(): + """Verify deduplicator blocks duplicate knowledge.""" + dedup = KnowledgeDeduplicator() + + k1 = ExtractedKnowledge(status="EXTRACTED", category="memories", title="A", content="Same content here") + k2 = ExtractedKnowledge(status="EXTRACTED", category="memories", title="B", content="Same content here") + k3 = ExtractedKnowledge(status="EXTRACTED", category="memories", title="C", content="Different content") + + assert not dedup.is_duplicate(k1) + assert dedup.is_duplicate(k2) + assert not dedup.is_duplicate(k3) + + +def test_storage_adapter_formats_content(): + """Verify storage adapter generates correct Markdown for each category.""" + mock_service = MagicMock() + adapter = VikingStorageAdapter(mock_service) + + skill = ExtractedKnowledge( + status="EXTRACTED", category="skills", title="Test Skill", + content="Do this thing", confidence=0.9, + actionable_steps=["Step 1", "Step 2"], + ) + content = adapter._format_content(skill) + assert "Test Skill" in content + assert "Step 1" in content + assert "Step 2" in content + + memory = ExtractedKnowledge( + status="EXTRACTED", category="memories", title="Decision", + content="We chose X", entity_links=["tag1"], + timestamp="2026-06-15T10:00:00Z", + ) + content = adapter._format_content(memory) + assert "Decision" in content + assert "tag1" in content + + resource = ExtractedKnowledge( + status="EXTRACTED", category="resources", title="Guide", + content="Reference material", entity_links=["Docker"], + ) + content = adapter._format_content(resource) + assert "Guide" in content + assert "Docker" in content From 4c5500fe07bd5dcaeed12e4679bd7e766bf0a379 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:46:33 +0800 Subject: [PATCH 16/22] docs: add Active Daemon usage and configuration guides --- docs/daemon/README.md | 61 ++++++++++++++++++++++++++++++++++++ docs/daemon/configuration.md | 50 +++++++++++++++++++++++++++++ 2 files changed, 111 insertions(+) create mode 100644 docs/daemon/README.md create mode 100644 docs/daemon/configuration.md diff --git a/docs/daemon/README.md b/docs/daemon/README.md new file mode 100644 index 0000000000..a7e5173c1f --- /dev/null +++ b/docs/daemon/README.md @@ -0,0 +1,61 @@ +# OpenViking Active Daemon + +自动监听 Claude Code 会话日志,提取知识并写入 OpenViking 知识库。 + +## 快速开始 + +### 启用 Daemon + +设置环境变量: + +```bash +export OV_DAEMON_ENABLED=true +export OV_DAEMON_WATCH_DIR=~/.claude/projects # 监听目录(可选) +export OV_DAEMON_BATCH_LINES=50 # 批处理触发行数(可选) +export OV_DAEMON_BATCH_SECONDS=300 # 批处理触发秒数(可选) +``` + +### 启动 OpenViking Server + +```bash +openviking serve +``` + +Daemon 会在服务器启动时自动运行(如果 `OV_DAEMON_ENABLED=true`)。 + +## 工作原理 + +1. **文件监听** — 监控 `~/.claude/projects/` 下的 `.jsonl` 文件变化 +2. **增量读取** — 文件游标技术,只处理新增内容 +3. **批量处理** — 累积 50 行或 5 分钟后触发 ETL 管道 +4. **知识提取** — 调用 LLM 过滤噪声,提取有价值的知识 +5. **自动存储** — 写入 `viking://skills/`、`viking://memories/`、`viking://resources/` + +## 知识分类 + +| 类型 | 目标路径 | 说明 | +|------|---------|------| +| Skills | `viking://skills/claude_code/<title>.md` | 可复用的操作指南 | +| Memories (有项目) | `viking://memories/projects/<project>/decisions.md` | 项目决策日志 | +| Memories (无项目) | `viking://memories/global/<title>.md` | 全局记忆 | +| Resources | `viking://resources/<tech>/<title>.md` | 参考资源 | + +## 架构 + +``` +Claude Code JSONL → File Watcher → Batch Buffer → Filter → Reconstruct + → LLM Extract → Deduplicate → Route → viking:// Storage +``` + +## 故障排查 + +### Daemon 未启动 +检查日志中是否有 `Active Daemon is disabled` 消息,确认 `OV_DAEMON_ENABLED=true`。 + +### 没有提取到知识 +- 确认 Claude Code 正在写入 JSONL 文件(`~/.claude/projects/` 下有 `.jsonl` 文件) +- 对话内容可能不够有价值(简单问答会被过滤) +- 查看日志中的 ETL 处理信息 + +### 知识写入失败 +检查 OpenViking ResourceService 是否正常运行。 diff --git a/docs/daemon/configuration.md b/docs/daemon/configuration.md new file mode 100644 index 0000000000..169bf341e1 --- /dev/null +++ b/docs/daemon/configuration.md @@ -0,0 +1,50 @@ +# Active Daemon 配置指南 + +## 环境变量 + +| 变量 | 说明 | 默认值 | +|------|------|--------| +| `OV_DAEMON_ENABLED` | 启用 Daemon | `false` | +| `OV_DAEMON_WATCH_DIR` | 监听目录 | `~/.claude/projects` | +| `OV_DAEMON_DB_PATH` | 游标数据库路径 | `~/.qoderworkcn/openviking/daemon_cursors.db` | +| `OV_DAEMON_BATCH_LINES` | 批处理触发行数 | `50` | +| `OV_DAEMON_BATCH_SECONDS` | 批处理触发秒数 | `300` | + +## YAML 配置 (ov.conf) + +```json +{ + "daemon": { + "enabled": true, + "watch_dir": "~/.claude/projects", + "batch_trigger_lines": 50, + "batch_trigger_seconds": 300 + } +} +``` + +## Docker 部署 + +```yaml +services: + openviking: + image: openviking:latest + environment: + - OV_DAEMON_ENABLED=true + - OV_DAEMON_WATCH_DIR=/data/claude-projects + - OV_DAEMON_DB_PATH=/data/daemon.db + volumes: + - ./claude-projects:/data/claude-projects + - ./daemon-data:/data + ports: + - "1933:1933" +``` + +## 日志 + +Daemon 使用 OpenViking 标准日志系统。关键日志: + +- `Claude Code watcher started on ...` — 监听器启动 +- `Flushing batch with N events` — 批处理触发 +- `Extracted N knowledge items` — 知识提取完成 +- `Knowledge ingested: viking://...` — 知识写入成功 From dfe41ad8e67e9057bbfb3a11db58b9e523302028 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 15:46:45 +0800 Subject: [PATCH 17/22] examples: add daemon startup scripts and docker-compose --- examples/daemon/docker-compose-daemon.yml | 14 ++++++++++++++ examples/daemon/start-daemon.bat | 10 ++++++++++ examples/daemon/start-daemon.sh | 12 ++++++++++++ 3 files changed, 36 insertions(+) create mode 100644 examples/daemon/docker-compose-daemon.yml create mode 100644 examples/daemon/start-daemon.bat create mode 100644 examples/daemon/start-daemon.sh diff --git a/examples/daemon/docker-compose-daemon.yml b/examples/daemon/docker-compose-daemon.yml new file mode 100644 index 0000000000..81dba7604b --- /dev/null +++ b/examples/daemon/docker-compose-daemon.yml @@ -0,0 +1,14 @@ +version: '3.8' + +services: + openviking: + image: openviking:latest + environment: + - OV_DAEMON_ENABLED=true + - OV_DAEMON_WATCH_DIR=/data/claude-projects + - OV_DAEMON_DB_PATH=/data/daemon.db + volumes: + - ./claude-projects:/data/claude-projects + - ./daemon-data:/data + ports: + - "1933:1933" diff --git a/examples/daemon/start-daemon.bat b/examples/daemon/start-daemon.bat new file mode 100644 index 0000000000..3e06e7e327 --- /dev/null +++ b/examples/daemon/start-daemon.bat @@ -0,0 +1,10 @@ +@echo off +REM Start OpenViking with Active Daemon enabled on Windows + +set OV_DAEMON_ENABLED=true +set OV_DAEMON_WATCH_DIR=%USERPROFILE%\.claude\projects +set OV_DAEMON_BATCH_LINES=50 +set OV_DAEMON_BATCH_SECONDS=300 + +echo Starting OpenViking with Active Daemon... +openviking serve %* diff --git a/examples/daemon/start-daemon.sh b/examples/daemon/start-daemon.sh new file mode 100644 index 0000000000..26129bb4f1 --- /dev/null +++ b/examples/daemon/start-daemon.sh @@ -0,0 +1,12 @@ +#!/bin/bash +# Start OpenViking with Active Daemon enabled +set -e + +echo "Starting OpenViking with Active Daemon..." + +export OV_DAEMON_ENABLED=true +export OV_DAEMON_WATCH_DIR="${OV_DAEMON_WATCH_DIR:-$HOME/.claude/projects}" +export OV_DAEMON_BATCH_LINES="${OV_DAEMON_BATCH_LINES:-50}" +export OV_DAEMON_BATCH_SECONDS="${OV_DAEMON_BATCH_SECONDS:-300}" + +openviking serve "$@" From 7689f36d568621b2e592ac994ade2594ce563ba6 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 21:22:56 +0800 Subject: [PATCH 18/22] feat: add --with-daemon CLI argument --- openviking/server/bootstrap.py | 8 ++++++++ 1 file changed, 8 insertions(+) diff --git a/openviking/server/bootstrap.py b/openviking/server/bootstrap.py index ae2f4241ed..8f9830560f 100644 --- a/openviking/server/bootstrap.py +++ b/openviking/server/bootstrap.py @@ -169,6 +169,12 @@ def main(): dest="with_bot", help="Enable Bot API proxy to Vikingbot (requires Vikingbot running)", ) + parser.add_argument( + "--with-daemon", + action="store_true", + dest="with_daemon", + help="Enable Active Daemon for automatic knowledge extraction from Claude Code logs", + ) parser.add_argument( "--bot-port", type=int, @@ -243,6 +249,8 @@ def main(): config.workers = args.workers if args.with_bot: config.with_bot = True + if args.with_daemon: + config.daemon.enabled = True # Configure logging for Uvicorn configure_uvicorn_logging() From 58f27bcc09c32b5d0a8d1532e5c4132db1040da0 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 21:23:06 +0800 Subject: [PATCH 19/22] feat: add daemon status API endpoint --- openviking/server/app.py | 2 ++ openviking/server/routers/__init__.py | 2 ++ openviking/server/routers/daemon.py | 43 +++++++++++++++++++++++++++ 3 files changed, 47 insertions(+) create mode 100644 openviking/server/routers/daemon.py diff --git a/openviking/server/app.py b/openviking/server/app.py index bdea7a5803..737f0318da 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -33,6 +33,7 @@ bot_router, console_router, content_router, + daemon_router, debug_router, filesystem_router, metrics_router, @@ -537,6 +538,7 @@ async def general_error_handler(request: Request, exc: Exception): app.include_router(watches_router) app.include_router(webdav_router) app.include_router(bot_router, prefix="/bot/v1") + app.include_router(daemon_router) # OAuth 2.1: when enabled, mount the official MCP SDK auth routes # (DCR / authorize / token / metadata) plus our authorize page + consent / diff --git a/openviking/server/routers/__init__.py b/openviking/server/routers/__init__.py index 1d35086880..83d50291cf 100644 --- a/openviking/server/routers/__init__.py +++ b/openviking/server/routers/__init__.py @@ -6,6 +6,7 @@ from openviking.server.routers.bot import router as bot_router from openviking.server.routers.console import router as console_router from openviking.server.routers.content import router as content_router +from openviking.server.routers.daemon import router as daemon_router from openviking.server.routers.debug import router as debug_router from openviking.server.routers.filesystem import router as filesystem_router from openviking.server.routers.metrics import router as metrics_router @@ -26,6 +27,7 @@ __all__ = [ "admin_router", "bot_router", + "daemon_router", "system_router", "resources_router", "filesystem_router", diff --git a/openviking/server/routers/daemon.py b/openviking/server/routers/daemon.py new file mode 100644 index 0000000000..86849b1ddd --- /dev/null +++ b/openviking/server/routers/daemon.py @@ -0,0 +1,43 @@ +# Copyright (c) 2026 Beijing Volcano Engine Technology Co., Ltd. +# SPDX-License-Identifier: AGPL-3.0 +"""Daemon status API endpoints.""" +from typing import Any, Dict + +from fastapi import APIRouter + +router = APIRouter(prefix="/api/v1/daemon", tags=["daemon"]) + + +@router.get("/status") +async def get_daemon_status() -> Dict[str, Any]: + """ + Get the current status of the Active Daemon. + + Returns: + { + "enabled": bool, + "running": bool, + "watch_dir": str | null, + "db_path": str | null, + "batch_trigger_lines": int, + "batch_trigger_seconds": int, + "cursor_count": int, + "last_flush_time": str | null + } + """ + # This is a placeholder — actual implementation needs access to DaemonService instance + # For now, return static config info + from openviking.server.config import DaemonConfig + + daemon_config = DaemonConfig.from_env() + + return { + "enabled": daemon_config.enabled, + "running": False, # Would need to track actual state + "watch_dir": daemon_config.watch_dir, + "db_path": daemon_config.db_path, + "batch_trigger_lines": daemon_config.batch_trigger_lines, + "batch_trigger_seconds": daemon_config.batch_trigger_seconds, + "cursor_count": 0, + "last_flush_time": None, + } From 6a0a3d47e15549af0835f31184cf84febe662ac2 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 21:29:51 +0800 Subject: [PATCH 20/22] feat: add daemon status card to home dashboard - Add i18n translations for daemon status in English and Chinese - Create DaemonStatusCard component with real-time status polling - Integrate daemon status card into home page layout - Card displays enabled/running status, watch directory, batch settings, cursor count, and last flush time - Auto-refreshes every 30 seconds with graceful error handling --- web-studio/src/i18n/locales/en.ts | 12 ++ web-studio/src/i18n/locales/zh-CN.ts | 12 ++ .../home/-components/daemon-status-card.tsx | 148 ++++++++++++++++++ web-studio/src/routes/home/route.tsx | 3 + 4 files changed, 175 insertions(+) create mode 100644 web-studio/src/routes/home/-components/daemon-status-card.tsx diff --git a/web-studio/src/i18n/locales/en.ts b/web-studio/src/i18n/locales/en.ts index 7326f436a8..c10333bb15 100644 --- a/web-studio/src/i18n/locales/en.ts +++ b/web-studio/src/i18n/locales/en.ts @@ -301,6 +301,18 @@ const en = { usageDisabled: 'Usage/Audit is not initialized, so live usage stats are unavailable.', }, + daemon: { + status: 'Active Daemon', + enabled: 'Enabled', + disabled: 'Disabled', + running: 'Running', + stopped: 'Stopped', + watchDir: 'Watch Directory', + batchLines: 'Batch Lines', + batchSeconds: 'Batch Seconds', + cursorCount: 'Tracked Files', + lastFlush: 'Last Flush', + }, operations: { page: { placeholder: 'Operations dashboard is under construction.', diff --git a/web-studio/src/i18n/locales/zh-CN.ts b/web-studio/src/i18n/locales/zh-CN.ts index 031b077cb0..a5cae301d0 100644 --- a/web-studio/src/i18n/locales/zh-CN.ts +++ b/web-studio/src/i18n/locales/zh-CN.ts @@ -298,6 +298,18 @@ const zhCN = { }, usageDisabled: 'Usage/Audit 未初始化,暂无实时统计。', }, + daemon: { + status: '主动守护进程', + enabled: '已启用', + disabled: '已禁用', + running: '运行中', + stopped: '已停止', + watchDir: '监听目录', + batchLines: '批处理行数', + batchSeconds: '批处理间隔(秒)', + cursorCount: '追踪文件数', + lastFlush: '上次刷新', + }, operations: { page: { placeholder: '运维面板能力尚未接入。', diff --git a/web-studio/src/routes/home/-components/daemon-status-card.tsx b/web-studio/src/routes/home/-components/daemon-status-card.tsx new file mode 100644 index 0000000000..5bf2a2d93c --- /dev/null +++ b/web-studio/src/routes/home/-components/daemon-status-card.tsx @@ -0,0 +1,148 @@ +import { useQuery } from '@tanstack/react-query'; +import { Activity, Clock, Database, FileText, ToggleLeft, ToggleRight } from 'lucide-react'; +import { useTranslation } from 'react-i18next'; +import { Card, CardContent, CardHeader, CardTitle } from '@/components/ui/card'; +import { Badge } from '@/components/ui/badge'; +import { Skeleton } from '@/components/ui/skeleton'; +import { ovClient } from '@/lib/ov-client/client'; + +interface DaemonStatus { + enabled: boolean; + running: boolean; + watch_dir: string | null; + db_path: string | null; + batch_trigger_lines: number; + batch_trigger_seconds: number; + cursor_count: number; + last_flush_time: string | null; +} + +async function fetchDaemonStatus(): Promise<DaemonStatus> { + const response = await ovClient.instance.get('/api/v1/daemon/status'); + return response.data as DaemonStatus; +} + +export function DaemonStatusCard() { + const { t } = useTranslation(); + + const { data, isLoading, error } = useQuery({ + queryKey: ['daemon-status'], + queryFn: fetchDaemonStatus, + refetchInterval: 30000, // Refresh every 30 seconds + }); + + if (isLoading) { + return ( + <Card> + <CardHeader> + <CardTitle className="flex items-center gap-2"> + <Activity className="h-4 w-4" /> + <Skeleton className="h-5 w-32" /> + </CardTitle> + </CardHeader> + <CardContent> + <Skeleton className="h-4 w-full mb-2" /> + <Skeleton className="h-4 w-3/4" /> + </CardContent> + </Card> + ); + } + + if (error || !data) { + return ( + <Card> + <CardHeader> + <CardTitle className="flex items-center gap-2 text-muted-foreground"> + <Activity className="h-4 w-4" /> + {t('daemon.status')} + </CardTitle> + </CardHeader> + <CardContent> + <p className="text-sm text-muted-foreground">Failed to load daemon status</p> + </CardContent> + </Card> + ); + } + + const statusColor = data.enabled && data.running + ? 'bg-green-500' + : data.enabled + ? 'bg-yellow-500' + : 'bg-gray-400'; + + const statusLabel = data.enabled && data.running + ? t('daemon.running') + : data.enabled + ? t('daemon.stopped') + : t('daemon.disabled'); + + return ( + <Card> + <CardHeader> + <CardTitle className="flex items-center justify-between"> + <div className="flex items-center gap-2"> + <Activity className="h-4 w-4" /> + {t('daemon.status')} + </div> + <Badge variant={data.enabled ? 'default' : 'secondary'} className={statusColor}> + {statusLabel} + </Badge> + </CardTitle> + </CardHeader> + <CardContent className="space-y-3"> + {/* Enabled/Running Status */} + <div className="flex items-center justify-between text-sm"> + <span className="text-muted-foreground flex items-center gap-1"> + {data.enabled ? <ToggleRight className="h-4 w-4" /> : <ToggleLeft className="h-4 w-4" />} + {t('daemon.enabled')} + </span> + <span className="font-medium">{data.enabled ? 'Yes' : 'No'}</span> + </div> + + {/* Watch Directory */} + {data.watch_dir && ( + <div className="flex items-center justify-between text-sm"> + <span className="text-muted-foreground flex items-center gap-1"> + <Database className="h-4 w-4" /> + {t('daemon.watchDir')} + </span> + <span className="font-mono text-xs truncate max-w-[200px]" title={data.watch_dir}> + {data.watch_dir.split('/').pop() || data.watch_dir} + </span> + </div> + )} + + {/* Batch Settings */} + <div className="grid grid-cols-2 gap-2 text-sm"> + <div className="flex items-center gap-1 text-muted-foreground"> + <FileText className="h-4 w-4" /> + {t('daemon.batchLines')} + </div> + <div className="text-right font-medium">{data.batch_trigger_lines}</div> + + <div className="flex items-center gap-1 text-muted-foreground"> + <Clock className="h-4 w-4" /> + {t('daemon.batchSeconds')} + </div> + <div className="text-right font-medium">{data.batch_trigger_seconds}s</div> + </div> + + {/* Cursor Count */} + <div className="flex items-center justify-between text-sm pt-2 border-t"> + <span className="text-muted-foreground">{t('daemon.cursorCount')}</span> + <span className="font-medium">{data.cursor_count}</span> + </div> + + {/* Last Flush Time */} + {data.last_flush_time && ( + <div className="flex items-center justify-between text-sm"> + <span className="text-muted-foreground">{t('daemon.lastFlush')}</span> + <span className="font-medium text-xs"> + {new Date(data.last_flush_time).toLocaleTimeString()} + </span> + </div> + )} + </CardContent> + </Card> + ); +} diff --git a/web-studio/src/routes/home/route.tsx b/web-studio/src/routes/home/route.tsx index a99aefa00d..fae60d7e7d 100644 --- a/web-studio/src/routes/home/route.tsx +++ b/web-studio/src/routes/home/route.tsx @@ -3,6 +3,7 @@ import { useQuery } from '@tanstack/react-query' import { createFileRoute } from '@tanstack/react-router' import { ContextCommitsPanel } from './-components/context-commits-panel' +import { DaemonStatusCard } from './-components/daemon-status-card' import { ContextDataPanel, TodayRetrievalsPanel, @@ -90,6 +91,8 @@ function HomePage() { /> </div> + <DaemonStatusCard /> + <TokenTrendPanel data={tokenSeries.data} isError={tokenSeries.isError} From 4a8b767ae0541d688dcfef1b1f796f46f1892715 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 21:38:03 +0800 Subject: [PATCH 21/22] fix: move daemon lifecycle to FastAPI lifespan for proper service initialization --- openviking/server/app.py | 34 ++++++++++++++++++++++++++++++++++ openviking/server/bootstrap.py | 33 --------------------------------- 2 files changed, 34 insertions(+), 33 deletions(-) diff --git a/openviking/server/app.py b/openviking/server/app.py index 737f0318da..5a1507e10c 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -276,6 +276,32 @@ async def _oauth_gc_loop(store) -> None: # noqa: ANN001 task_tracker = get_task_tracker() task_tracker.start_cleanup_loop() + # Start Active Daemon if enabled + daemon_service = None + if config.daemon.enabled: + try: + from openviking.daemon.service import DaemonService + from openviking.server.config import DaemonConfig + + # Check env var override + daemon_config = config.daemon + if not daemon_config.enabled: + daemon_config = DaemonConfig.from_env() + + if daemon_config.enabled: + resource_service = service.resource + daemon_service = DaemonService( + resource_service=resource_service, + watch_dir=daemon_config.watch_dir, + db_path=daemon_config.db_path, + batch_trigger_lines=daemon_config.batch_trigger_lines, + batch_trigger_seconds=daemon_config.batch_trigger_seconds, + ) + await daemon_service.start() + logger.info("Active Daemon started, watching: %s", daemon_service.watch_dir) + except Exception as e: + logger.warning("Failed to start Active Daemon: %s", e) + # Initialize tracing and OTLP log export from server.observability. from openviking.telemetry import tracer_module @@ -290,6 +316,14 @@ async def _oauth_gc_loop(store) -> None: # noqa: ANN001 await _initialize_runtime_state(app, service, config) yield + # Stop Active Daemon on shutdown + if daemon_service is not None: + try: + await daemon_service.stop() + logger.info("Active Daemon stopped") + except Exception as e: + logger.warning("Failed to stop Active Daemon: %s", e) + # Cleanup from openviking.metrics.global_api import shutdown_metrics_async from openviking.observability.usage_audit import shutdown_usage_audit diff --git a/openviking/server/bootstrap.py b/openviking/server/bootstrap.py index 8f9830560f..36a61a4256 100644 --- a/openviking/server/bootstrap.py +++ b/openviking/server/bootstrap.py @@ -279,31 +279,6 @@ def main(): workers_info = f" (workers: {config.workers})" if config.workers > 1 else "" print(f"OpenViking HTTP Server is running on {config.host}:{config.port}{workers_info}") - # Start Active Daemon if enabled - daemon_service = None - daemon_config = config.daemon - if not daemon_config.enabled: - # Also check env var as fallback - daemon_config = DaemonConfig.from_env() - - if daemon_config.enabled: - try: - from openviking.daemon.service import DaemonService - from openviking.server.dependencies import get_service - resource_service = get_service().resource - daemon_service = DaemonService( - resource_service=resource_service, - watch_dir=daemon_config.watch_dir, - db_path=daemon_config.db_path, - batch_trigger_lines=daemon_config.batch_trigger_lines, - batch_trigger_seconds=daemon_config.batch_trigger_seconds, - ) - import asyncio - asyncio.get_event_loop().run_until_complete(daemon_service.start()) - print("Active Daemon started, watching:", daemon_service.watch_dir) - except Exception as e: - print(f"Warning: Failed to start Active Daemon: {e}", file=sys.stderr) - try: workers = config.workers if workers > 1: @@ -322,14 +297,6 @@ def main(): else: uvicorn.run(app, host=config.host, port=config.port, log_config=None) finally: - # Stop Active Daemon on shutdown - if daemon_service is not None: - try: - import asyncio - asyncio.get_event_loop().run_until_complete(daemon_service.stop()) - except Exception as e: - print(f"Error stopping Active Daemon: {e}", file=sys.stderr) - # Cleanup vikingbot process on shutdown if bot_process is not None: _stop_vikingbot_gateway(bot_process) From 2bf1f800e42e5ca7190dcb1cf75af25dd1598801 Mon Sep 17 00:00:00 2001 From: baobaodae <2014596548@qq.com> Date: Mon, 15 Jun 2026 21:40:05 +0800 Subject: [PATCH 22/22] fix: use correct attribute name service.resources (plural) --- openviking/server/app.py | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/openviking/server/app.py b/openviking/server/app.py index 5a1507e10c..e4e5e2b7df 100644 --- a/openviking/server/app.py +++ b/openviking/server/app.py @@ -289,7 +289,7 @@ async def _oauth_gc_loop(store) -> None: # noqa: ANN001 daemon_config = DaemonConfig.from_env() if daemon_config.enabled: - resource_service = service.resource + resource_service = service.resources daemon_service = DaemonService( resource_service=resource_service, watch_dir=daemon_config.watch_dir,