Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
43 changes: 43 additions & 0 deletions ols/app/models/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -657,6 +657,44 @@ class ToolFilteringConfig(BaseModel):
)


class SkillsConfig(BaseModel):
"""Configuration for skill selection using hybrid RAG retrieval.

If this config is present, skill selection is enabled. If absent, no skills are used.
"""

skills_dir: str = Field(
default="skills",
description="Path to directory containing skill subdirectories",
)

embed_model_path: Optional[str] = Field(
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Should this be configurable? We need to use the same embedding as we have for RAG/ToolsRAG.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We are doing exactly what we are doing in tools Rag. Its local configuration only (per Ashutosh request). Not exposed in CR

default=None,
description="Path to sentence transformer model for embeddings",
)

alpha: float = Field(
default=0.8,
ge=0.0,
le=1.0,
description="Weight for dense vs sparse retrieval (1.0 = full dense, 0.0 = full sparse)",
)

top_k: int = Field(
default=3,
ge=1,
le=20,
description="Number of candidate skills to consider during retrieval",
)

threshold: float = Field(
default=0.3,
ge=0.0,
le=1.0,
description="Minimum similarity score to accept a skill match",
)


class ApprovalType(StrEnum):
"""Approval strategy for tool execution."""

Expand Down Expand Up @@ -1117,6 +1155,8 @@ class OLSConfig(BaseModel):

tools_approval: Optional[ToolsApprovalConfig] = None

skills: Optional[SkillsConfig] = None

def __init__(
self, data: Optional[dict] = None, ignore_missing_certs: bool = False
) -> None:
Expand Down Expand Up @@ -1173,6 +1213,8 @@ def __init__(
self.tool_filtering = ToolFilteringConfig(**data.get("tool_filtering"))
if data.get("tools_approval", None) is not None:
self.tools_approval = ToolsApprovalConfig(**data.get("tools_approval"))
if data.get("skills", None) is not None:
self.skills = SkillsConfig(**data.get("skills"))

def __eq__(self, other: object) -> bool:
"""Compare two objects for equality."""
Expand All @@ -1198,6 +1240,7 @@ def __eq__(self, other: object) -> bool:
and self.proxy_config == other.proxy_config
and self.tool_filtering == other.tool_filtering
and self.tools_approval == other.tools_approval
and self.skills == other.skills
)
return False

Expand Down
1 change: 1 addition & 0 deletions ols/src/skills/__init__.py
Original file line number Diff line number Diff line change
@@ -0,0 +1 @@
"""Skills RAG for selecting skills based on query relevance."""
261 changes: 261 additions & 0 deletions ols/src/skills/skills_rag.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,261 @@
"""Hybrid Skills RAG implementation for skill selection."""

import logging
from collections.abc import Callable
from dataclasses import dataclass
from pathlib import Path

import frontmatter
from rank_bm25 import BM25Okapi

from ols.src.tools.tools_rag.hybrid_tools_rag import QdrantStore

logger = logging.getLogger(__name__)

_SKILL_MD = "skill.md"


@dataclass(frozen=True, slots=True)
class Skill:
"""A loaded skill artifact with parsed metadata and directory path."""

name: str
description: str
source_path: str

def load_content(self) -> str:
"""Read all files in the skill directory tree and concatenate on demand.

The main skill file body (everything after frontmatter) is returned
first, followed by the contents of any additional text files found
anywhere in the skill directory tree, each separated by a header
showing the relative path.

Returns:
Combined content of all files in the skill directory tree.

Raises:
OSError: If the skill directory or its files cannot be read.
"""
skill_dir = Path(self.source_path)
parts: list[str] = []

for entry in sorted(skill_dir.rglob("*")):
if not entry.is_file():
continue
try:
raw = entry.read_text(encoding="utf-8").strip()
except (UnicodeDecodeError, ValueError):
continue
if entry.name.lower() == _SKILL_MD:
parts.insert(0, frontmatter.loads(raw).content.strip())
else:
rel = entry.relative_to(skill_dir)
parts.append(f"## {rel}\n\n{raw}")

return "\n\n".join(parts)


def _find_skill_file(directory: Path) -> Path | None:
"""Find the skill definition file in a directory (case-insensitive)."""
for child in directory.iterdir():
if child.is_file() and child.name.lower() == _SKILL_MD:
return child
return None


def load_skills_from_directory(skills_dir: str | Path) -> list[Skill]:
"""Load all skill definitions from a directory of skill subdirectories.

Each immediate subdirectory of ``skills_dir`` is treated as a skill.
The subdirectory must contain a ``skill.md`` or ``SKILL.md`` file with
YAML frontmatter.
The subdirectory path is stored as ``source_path`` so that ``load_content``
can read all files in it on demand.

Args:
skills_dir: Root directory containing skill subdirectories.

Returns:
List of parsed Skill objects.
"""
skills_path = Path(skills_dir)
if not skills_path.is_dir():
logger.warning("Skills directory does not exist: %s", skills_dir)
return []

skills: list[Skill] = []
for child in sorted(skills_path.iterdir()):
if not child.is_dir():
continue
skill_file = _find_skill_file(child)
if skill_file is None:
logger.debug("Skipping directory without skill.md: %s", child)
continue
skill = _parse_skill_directory(child, skill_file)
if skill is not None:
skills.append(skill)

logger.info("Loaded %d skills from %s", len(skills), skills_dir)
return skills


def _parse_skill_directory(skill_dir: Path, skill_file: Path) -> Skill | None:
"""Parse frontmatter from a skill directory's skill definition file.

Args:
skill_dir: Path to the skill directory (stored as source_path).
skill_file: Path to the skill definition file within the directory.

Returns:
Parsed Skill with source_path pointing to the directory,
or None if the file is malformed.
"""
try:
post = frontmatter.load(str(skill_file))
except Exception:
logger.warning("Cannot read or parse skill file: %s", skill_file)
return None

name = post.metadata.get("name")
description = post.metadata.get("description", "")
if not name:
logger.warning("Skill file missing 'name' in frontmatter: %s", skill_file)
return None

return Skill(
name=name,
description=description,
source_path=str(skill_dir),
)


class SkillsRAG:
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Can you explore the option to reuse ToolsRAG? Because this seems like a reimplementation of it.
Maybe you can just create a new instance of that index with skills. Like dynamically build skills as tools

StructuredTool.from_function(
    name="skill_pod_failure_diagnosis",
    description="Diagnose pods stuck in CrashLoopBackOff, ImagePullBackOff, Pending...",
    func=lambda: skill.load_content(),
)

Basically:

  1. Create a ToolsRAG instance
  2. Index skills into it using name + description as the document text
  3. At query time, call retrieve_hybrid(query) to get the best match - future work
  4. Load the matched skill's content and inject into system prompt - future work

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

We never considered straight reuse. The conversation was about reusing the approach.

The reason SkillsRAG is its own class (~80 lines) rather than a ToolsRAG instance is that ToolsRAG carries a lot of tool-specific concerns that don't apply to skills: server filtering in both dense and sparse paths, tool_json metadata, _convert_langchain_tool_to_dict, server-grouped return format, and remove_tools for dynamic MCP updates. We'd have to bypass or stub out most of that.

Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

My point here is that if we wrap each skill as a StructuredTool (just name + description, func never called) and populate a new ToolsRAG instance with them, we can use retrieve_hybrid to find the matching skill and completely avoid creating SkillsRAG.
The retrieval logic is identical between the two classes. The only new code needed is a skill loader that reads directories, parses frontmatter, and returns list[StructuredTool].
It would be just a way of reusing existing components to get the appropriate results for the given query. After that, skills have a different path than tools.

Copy link
Contributor Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

A single index mixing skills and tools will require post-filtering to separate them. The shared infrastructure (QdrantStore, BM25Okapi) is already reused; SkillsRAG only adds ~30 lines of domain-specific wiring that doesn't fit ToolsRAG's server-centric model (server filtering, tool_json metadata, grouped-by-server return format).

"""Hybrid RAG system for skill selection using dense and sparse retrieval."""

def __init__(
self,
encode_fn: Callable[[str], list[float]],
alpha: float = 0.8,
top_k: int = 3,
threshold: float = 0.3,
) -> None:
"""Initialize the SkillsRAG system.

Args:
encode_fn: Function that encodes text into an embedding vector.
alpha: Weight for dense vs sparse (1.0 = full dense, 0.0 = full sparse).
top_k: Number of candidate skills to consider during retrieval.
threshold: Minimum fused score to accept a skill match.
"""
self.alpha = alpha
self.top_k = top_k
self.threshold = threshold
self._encode = encode_fn
self.bm25: BM25Okapi | None = None
self.store = QdrantStore()
self._skills: dict[str, Skill] = {}

def populate_skills(self, skills: list[Skill]) -> None:
"""Index skills for hybrid retrieval.

Args:
skills: List of Skill objects to index.
"""
ids: list[str] = []
docs: list[str] = []
vectors: list[list[float]] = []

for skill in skills:
text = f"{skill.name} {skill.description}"
vector = self._encode(text)

ids.append(skill.source_path)
docs.append(text)
vectors.append(vector)
self._skills[skill.source_path] = skill

self.store.upsert(ids, docs, vectors)
self._rebuild_bm25()
logger.info("Indexed %d skills for retrieval", len(skills))

def retrieve_skill(self, query: str) -> Skill | None:
"""Retrieve the best matching skill for a query.

Args:
query: User query to match against indexed skills.

Returns:
The best matching Skill, or None if no skill exceeds the threshold
(caller should fall back to default behavior).
"""
if not self._skills:
return None

q_vec = self._encode(query)

dense_ids, _, _ = self.store.search_with_scores(q_vec, self.top_k)
dense_scores = {t: 1.0 - i / self.top_k for i, t in enumerate(dense_ids)}

sparse_scores = self._retrieve_sparse_scores(query)
sparse_ids = sorted(sparse_scores, key=sparse_scores.get, reverse=True)[
: self.top_k
]

fused: dict[str, float] = {}
for t in set(list(dense_ids) + sparse_ids):
d = dense_scores.get(t, 0)
s = sparse_scores.get(t, 0)
fused[t] = self.alpha * d + (1 - self.alpha) * s

if not fused:
return None

best_id = max(fused, key=fused.get) # type: ignore[arg-type]
best_score = fused[best_id]

if best_score < self.threshold:
logger.debug(
"Best skill '%s' scored %.3f, below threshold %.3f",
best_id,
best_score,
self.threshold,
)
return None

logger.info(
"Selected skill '%s' with score %.3f for query: %s",
best_id,
best_score,
query[:80],
)
return self._skills.get(best_id)

def _rebuild_bm25(self) -> None:
"""Rebuild BM25 index from stored documents."""
all_data = self.store.get_all()
if not all_data["documents"]:
self.bm25 = None
return
sparse_docs = [doc.split() for doc in all_data["documents"]]
self.bm25 = BM25Okapi(sparse_docs)

def _retrieve_sparse_scores(self, query: str) -> dict[str, float]:
"""Retrieve BM25 scores normalized to 0-1 range.

Args:
query: The query string.

Returns:
Dictionary mapping skill IDs to normalized BM25 scores.
"""
if self.bm25 is None:
return {}

all_data = self.store.get_all()
skill_ids = all_data["ids"]

raw_scores = self.bm25.get_scores(query.split())
max_score = max(raw_scores) if max(raw_scores) > 0 else 1

return {sid: score / max_score for sid, score in zip(skill_ids, raw_scores)}
Loading