-
Notifications
You must be signed in to change notification settings - Fork 70
implementation of skills rag #2821
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
base: main
Are you sure you want to change the base?
Changes from all commits
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1 @@ | ||
| """Skills RAG for selecting skills based on query relevance.""" |
| Original file line number | Diff line number | Diff line change |
|---|---|---|
| @@ -0,0 +1,261 @@ | ||
| """Hybrid Skills RAG implementation for skill selection.""" | ||
|
|
||
| import logging | ||
| from collections.abc import Callable | ||
| from dataclasses import dataclass | ||
| from pathlib import Path | ||
|
|
||
| import frontmatter | ||
| from rank_bm25 import BM25Okapi | ||
|
|
||
| from ols.src.tools.tools_rag.hybrid_tools_rag import QdrantStore | ||
|
|
||
| logger = logging.getLogger(__name__) | ||
|
|
||
| _SKILL_MD = "skill.md" | ||
|
|
||
|
|
||
| @dataclass(frozen=True, slots=True) | ||
| class Skill: | ||
| """A loaded skill artifact with parsed metadata and directory path.""" | ||
|
|
||
| name: str | ||
| description: str | ||
| source_path: str | ||
|
|
||
| def load_content(self) -> str: | ||
| """Read all files in the skill directory tree and concatenate on demand. | ||
|
|
||
| The main skill file body (everything after frontmatter) is returned | ||
| first, followed by the contents of any additional text files found | ||
| anywhere in the skill directory tree, each separated by a header | ||
| showing the relative path. | ||
|
|
||
| Returns: | ||
| Combined content of all files in the skill directory tree. | ||
|
|
||
| Raises: | ||
| OSError: If the skill directory or its files cannot be read. | ||
| """ | ||
| skill_dir = Path(self.source_path) | ||
| parts: list[str] = [] | ||
|
|
||
| for entry in sorted(skill_dir.rglob("*")): | ||
| if not entry.is_file(): | ||
| continue | ||
| try: | ||
| raw = entry.read_text(encoding="utf-8").strip() | ||
| except (UnicodeDecodeError, ValueError): | ||
| continue | ||
| if entry.name.lower() == _SKILL_MD: | ||
| parts.insert(0, frontmatter.loads(raw).content.strip()) | ||
| else: | ||
| rel = entry.relative_to(skill_dir) | ||
| parts.append(f"## {rel}\n\n{raw}") | ||
|
|
||
| return "\n\n".join(parts) | ||
|
|
||
|
|
||
| def _find_skill_file(directory: Path) -> Path | None: | ||
| """Find the skill definition file in a directory (case-insensitive).""" | ||
| for child in directory.iterdir(): | ||
| if child.is_file() and child.name.lower() == _SKILL_MD: | ||
| return child | ||
| return None | ||
|
|
||
|
|
||
| def load_skills_from_directory(skills_dir: str | Path) -> list[Skill]: | ||
| """Load all skill definitions from a directory of skill subdirectories. | ||
|
|
||
| Each immediate subdirectory of ``skills_dir`` is treated as a skill. | ||
| The subdirectory must contain a ``skill.md`` or ``SKILL.md`` file with | ||
| YAML frontmatter. | ||
| The subdirectory path is stored as ``source_path`` so that ``load_content`` | ||
| can read all files in it on demand. | ||
|
|
||
| Args: | ||
| skills_dir: Root directory containing skill subdirectories. | ||
|
|
||
| Returns: | ||
| List of parsed Skill objects. | ||
| """ | ||
| skills_path = Path(skills_dir) | ||
| if not skills_path.is_dir(): | ||
| logger.warning("Skills directory does not exist: %s", skills_dir) | ||
| return [] | ||
|
|
||
| skills: list[Skill] = [] | ||
| for child in sorted(skills_path.iterdir()): | ||
| if not child.is_dir(): | ||
| continue | ||
| skill_file = _find_skill_file(child) | ||
| if skill_file is None: | ||
| logger.debug("Skipping directory without skill.md: %s", child) | ||
| continue | ||
| skill = _parse_skill_directory(child, skill_file) | ||
| if skill is not None: | ||
| skills.append(skill) | ||
|
|
||
| logger.info("Loaded %d skills from %s", len(skills), skills_dir) | ||
| return skills | ||
|
|
||
|
|
||
| def _parse_skill_directory(skill_dir: Path, skill_file: Path) -> Skill | None: | ||
| """Parse frontmatter from a skill directory's skill definition file. | ||
|
|
||
| Args: | ||
| skill_dir: Path to the skill directory (stored as source_path). | ||
| skill_file: Path to the skill definition file within the directory. | ||
|
|
||
| Returns: | ||
| Parsed Skill with source_path pointing to the directory, | ||
| or None if the file is malformed. | ||
| """ | ||
| try: | ||
| post = frontmatter.load(str(skill_file)) | ||
| except Exception: | ||
| logger.warning("Cannot read or parse skill file: %s", skill_file) | ||
| return None | ||
|
|
||
| name = post.metadata.get("name") | ||
| description = post.metadata.get("description", "") | ||
| if not name: | ||
| logger.warning("Skill file missing 'name' in frontmatter: %s", skill_file) | ||
| return None | ||
|
|
||
| return Skill( | ||
| name=name, | ||
| description=description, | ||
| source_path=str(skill_dir), | ||
| ) | ||
|
|
||
|
|
||
| class SkillsRAG: | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. Can you explore the option to reuse ToolsRAG? Because this seems like a reimplementation of it. StructuredTool.from_function(
name="skill_pod_failure_diagnosis",
description="Diagnose pods stuck in CrashLoopBackOff, ImagePullBackOff, Pending...",
func=lambda: skill.load_content(),
)Basically:
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. We never considered straight reuse. The conversation was about reusing the approach. The reason SkillsRAG is its own class (~80 lines) rather than a ToolsRAG instance is that ToolsRAG carries a lot of tool-specific concerns that don't apply to skills: server filtering in both dense and sparse paths, tool_json metadata, _convert_langchain_tool_to_dict, server-grouped return format, and remove_tools for dynamic MCP updates. We'd have to bypass or stub out most of that.
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. My point here is that if we wrap each skill as a
Contributor
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. A single index mixing skills and tools will require post-filtering to separate them. The shared infrastructure (QdrantStore, BM25Okapi) is already reused; SkillsRAG only adds ~30 lines of domain-specific wiring that doesn't fit ToolsRAG's server-centric model (server filtering, tool_json metadata, grouped-by-server return format). |
||
| """Hybrid RAG system for skill selection using dense and sparse retrieval.""" | ||
|
|
||
| def __init__( | ||
| self, | ||
| encode_fn: Callable[[str], list[float]], | ||
| alpha: float = 0.8, | ||
| top_k: int = 3, | ||
| threshold: float = 0.3, | ||
| ) -> None: | ||
| """Initialize the SkillsRAG system. | ||
|
|
||
| Args: | ||
| encode_fn: Function that encodes text into an embedding vector. | ||
| alpha: Weight for dense vs sparse (1.0 = full dense, 0.0 = full sparse). | ||
| top_k: Number of candidate skills to consider during retrieval. | ||
| threshold: Minimum fused score to accept a skill match. | ||
| """ | ||
| self.alpha = alpha | ||
| self.top_k = top_k | ||
| self.threshold = threshold | ||
| self._encode = encode_fn | ||
| self.bm25: BM25Okapi | None = None | ||
| self.store = QdrantStore() | ||
| self._skills: dict[str, Skill] = {} | ||
|
|
||
| def populate_skills(self, skills: list[Skill]) -> None: | ||
| """Index skills for hybrid retrieval. | ||
|
|
||
| Args: | ||
| skills: List of Skill objects to index. | ||
| """ | ||
| ids: list[str] = [] | ||
| docs: list[str] = [] | ||
| vectors: list[list[float]] = [] | ||
|
|
||
| for skill in skills: | ||
| text = f"{skill.name} {skill.description}" | ||
| vector = self._encode(text) | ||
|
|
||
| ids.append(skill.source_path) | ||
| docs.append(text) | ||
| vectors.append(vector) | ||
| self._skills[skill.source_path] = skill | ||
|
|
||
| self.store.upsert(ids, docs, vectors) | ||
| self._rebuild_bm25() | ||
| logger.info("Indexed %d skills for retrieval", len(skills)) | ||
|
|
||
| def retrieve_skill(self, query: str) -> Skill | None: | ||
| """Retrieve the best matching skill for a query. | ||
|
|
||
| Args: | ||
| query: User query to match against indexed skills. | ||
|
|
||
| Returns: | ||
| The best matching Skill, or None if no skill exceeds the threshold | ||
| (caller should fall back to default behavior). | ||
| """ | ||
| if not self._skills: | ||
| return None | ||
|
|
||
| q_vec = self._encode(query) | ||
|
|
||
| dense_ids, _, _ = self.store.search_with_scores(q_vec, self.top_k) | ||
| dense_scores = {t: 1.0 - i / self.top_k for i, t in enumerate(dense_ids)} | ||
|
|
||
| sparse_scores = self._retrieve_sparse_scores(query) | ||
| sparse_ids = sorted(sparse_scores, key=sparse_scores.get, reverse=True)[ | ||
| : self.top_k | ||
| ] | ||
|
|
||
| fused: dict[str, float] = {} | ||
| for t in set(list(dense_ids) + sparse_ids): | ||
| d = dense_scores.get(t, 0) | ||
| s = sparse_scores.get(t, 0) | ||
| fused[t] = self.alpha * d + (1 - self.alpha) * s | ||
|
|
||
| if not fused: | ||
| return None | ||
|
|
||
| best_id = max(fused, key=fused.get) # type: ignore[arg-type] | ||
| best_score = fused[best_id] | ||
|
|
||
| if best_score < self.threshold: | ||
| logger.debug( | ||
| "Best skill '%s' scored %.3f, below threshold %.3f", | ||
| best_id, | ||
| best_score, | ||
| self.threshold, | ||
| ) | ||
| return None | ||
|
|
||
| logger.info( | ||
| "Selected skill '%s' with score %.3f for query: %s", | ||
| best_id, | ||
| best_score, | ||
| query[:80], | ||
| ) | ||
| return self._skills.get(best_id) | ||
|
|
||
| def _rebuild_bm25(self) -> None: | ||
| """Rebuild BM25 index from stored documents.""" | ||
| all_data = self.store.get_all() | ||
| if not all_data["documents"]: | ||
| self.bm25 = None | ||
| return | ||
| sparse_docs = [doc.split() for doc in all_data["documents"]] | ||
| self.bm25 = BM25Okapi(sparse_docs) | ||
|
|
||
| def _retrieve_sparse_scores(self, query: str) -> dict[str, float]: | ||
| """Retrieve BM25 scores normalized to 0-1 range. | ||
|
|
||
| Args: | ||
| query: The query string. | ||
|
|
||
| Returns: | ||
| Dictionary mapping skill IDs to normalized BM25 scores. | ||
| """ | ||
| if self.bm25 is None: | ||
| return {} | ||
|
|
||
| all_data = self.store.get_all() | ||
| skill_ids = all_data["ids"] | ||
|
|
||
| raw_scores = self.bm25.get_scores(query.split()) | ||
| max_score = max(raw_scores) if max(raw_scores) > 0 else 1 | ||
|
|
||
| return {sid: score / max_score for sid, score in zip(skill_ids, raw_scores)} | ||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
Should this be configurable? We need to use the same embedding as we have for RAG/ToolsRAG.
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
We are doing exactly what we are doing in tools Rag. Its local configuration only (per Ashutosh request). Not exposed in CR