Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion .github/workflows/test.yml
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,7 @@ jobs:
run: |
# Skip GPU tests as GitHub Actions runners don't have CUDA
# To run GPU tests locally: pytest tests/ -v -m "gpu"
pytest tests/ -v --tb=short -m "not slow and not gpu and not integration"
python -m pytest tests/ -v --tb=short -m "not slow and not gpu and not integration"

lint:
runs-on: ubuntu-latest
Expand Down
4 changes: 4 additions & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -8,3 +8,7 @@ dist/
*/.DS_Store
*.DS_Store

# Evaluation/Profiling ignores
*.prof
evaluation/sandbox/results/*
!evaluation/sandbox/results/.gitkeep
33 changes: 27 additions & 6 deletions contextpilot/context_index/compute_distance_cpu.py
Original file line number Diff line number Diff line change
Expand Up @@ -272,7 +272,7 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
start = time.time()
chunk_ids, original_positions, lengths, offsets = prepare_contexts_for_cpu(contexts)
prep_time = time.time() - start
print(f" Prepared in {prep_time:.1f}s")
print(f"+ Prepared in {prep_time:.1f}s")

# Generate batches of pair indices
print(f"\nGenerating pair batches...")
Expand All @@ -290,7 +290,7 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
if current_batch:
batches.append(current_batch)

print(f" Generated {len(batches):,} batches")
print(f"+ Generated {len(batches):,} batches")

# Prepare arguments for workers
worker_args = [
Expand All @@ -306,13 +306,13 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
start_time = time.time()
processed = 0

with Pool(num_workers) as pool:
for batch_results in pool.imap_unordered(compute_batch_worker, worker_args):
if num_workers == 1:
# Bypass multiprocessing Pool entirely to save initialization overhead
for args in worker_args:
batch_results = compute_batch_worker(args)
for i, j, dist in batch_results:
# Convert (i, j) to condensed index
condensed_idx = n * i - i * (i + 1) // 2 + j - i - 1
condensed_distances[condensed_idx] = dist

processed += 1

# Progress update
Expand All @@ -326,6 +326,27 @@ def compute_distance_matrix_cpu_optimized(contexts: List[List[int]],
f"Rate: {rate:,.0f} pairs/sec | "
f"Elapsed: {elapsed:.1f}s | "
f"ETA: {eta:.1f}s ({eta/60:.1f} min)")
else:
with Pool(num_workers) as pool:
for batch_results in pool.imap_unordered(compute_batch_worker, worker_args):
for i, j, dist in batch_results:
# Convert (i, j) to condensed index
condensed_idx = n * i - i * (i + 1) // 2 + j - i - 1
condensed_distances[condensed_idx] = dist

processed += 1

# Progress update
if processed % 100000 == 0 or processed == num_pairs:
elapsed = time.time() - start_time
rate = processed / elapsed if elapsed > 0 else 0
eta = (num_pairs - processed) / rate if rate > 0 else 0
progress_pct = processed / num_pairs * 100

print(f" {processed:,}/{num_pairs:,} ({progress_pct:.1f}%) | "
f"Rate: {rate:,.0f} pairs/sec | "
f"Elapsed: {elapsed:.1f}s | "
f"ETA: {eta:.1f}s ({eta/60:.1f} min)")

compute_time = time.time() - start_time
total_time = compute_time + prep_time
Expand Down
32 changes: 16 additions & 16 deletions contextpilot/server/live_index.py
Original file line number Diff line number Diff line change
Expand Up @@ -172,15 +172,15 @@ def build_and_schedule(self, contexts: List[List[int]],
print("\n1. Building static index...")
self.initial_result = self.fit_transform(contexts)

print(f" Built tree with {self.initial_result.stats['total_nodes']} nodes")
print(f" Leaf nodes: {self.initial_result.stats['leaf_nodes']}")
print(f" + Built tree with {self.initial_result.stats['total_nodes']} nodes")
print(f" + Leaf nodes: {self.initial_result.stats['leaf_nodes']}")

# Step 2: Inter-context scheduling
print("\n2. Scheduling contexts for optimal execution...")
scheduled_reordered, scheduled_originals, final_mapping, groups = \
self.inter_scheduler.schedule_contexts(self.initial_result)

print(f" Created {len(groups)} execution groups")
print(f" + Created {len(groups)} execution groups")

self.scheduled_result = {
'reordered_contexts': scheduled_reordered,
Expand All @@ -197,8 +197,8 @@ def build_and_schedule(self, contexts: List[List[int]],
num_input_contexts=len(contexts)
)

print(f" Initialized {len(self.metadata)} nodes with metadata")
print(f" Auto-assigned {len(request_id_mapping)} request IDs")
print(f" + Initialized {len(self.metadata)} nodes with metadata")
print(f" + Auto-assigned {len(request_id_mapping)} request IDs")

# Add request_id mapping to result (dict and ordered list)
self.scheduled_result['request_id_mapping'] = request_id_mapping
Expand All @@ -208,7 +208,7 @@ def build_and_schedule(self, contexts: List[List[int]],
self.is_live = True

print("\n" + "=" * 80)
print(" INDEX IS NOW LIVE - Ready for dynamic operations")
print("+ INDEX IS NOW LIVE - Ready for dynamic operations")
print("=" * 80 + "\n")

return self.scheduled_result
Expand Down Expand Up @@ -534,8 +534,8 @@ def build_incremental(self, contexts: List[List[int]],
# No match - will build new index for these
unmatched_contexts.append((i, context))

print(f" Found {len(matched_contexts)} contexts with matches")
print(f" Found {len(unmatched_contexts)} contexts without matches")
print(f" + Found {len(matched_contexts)} contexts with matches")
print(f" + Found {len(unmatched_contexts)} contexts without matches")

# Prepare result arrays (will fill in order)
request_ids = [None] * len(contexts)
Expand Down Expand Up @@ -585,7 +585,7 @@ def build_incremental(self, contexts: List[List[int]],
)
temp_result = temp_index.fit_transform(unmatched_only)

print(f" Built temp index with {temp_result.stats['total_nodes']} nodes")
print(f" + Built temp index with {temp_result.stats['total_nodes']} nodes")

# Step 4: Merge temp index into global index
print("\n4. Merging temp index into global index...")
Expand All @@ -606,16 +606,16 @@ def build_incremental(self, contexts: List[List[int]],
context_info.append((orig_idx, merged_request_ids[i], merged_search_paths[i]))

merged_count = len(unmatched_contexts)
print(f" Merged {merged_count} new subtrees under global root")
print(f" + Merged {merged_count} new subtrees under global root")

# Step 5: Schedule execution order
print("\n5. Scheduling execution order for cache reuse...")
scheduled_order = self._schedule_incremental(context_info)
groups = self._group_by_path_prefix(context_info)
print(f" Scheduled {len(scheduled_order)} contexts into {len(groups)} groups")
print(f" + Scheduled {len(scheduled_order)} contexts into {len(groups)} groups")

print("\n" + "=" * 80)
print(f" INCREMENTAL BUILD COMPLETE")
print(f"+ INCREMENTAL BUILD COMPLETE")
print(f" Matched & inserted: {len(matched_contexts)}")
print(f" Built & merged: {merged_count}")
print("=" * 80 + "\n")
Expand Down Expand Up @@ -906,15 +906,15 @@ def schedule_only(self, contexts: List[List[int]]) -> Dict:
print("\n1. Building static index...")
result = self.fit_transform(contexts)

print(f" Built tree with {result.stats['total_nodes']} nodes")
print(f" Leaf nodes: {result.stats['leaf_nodes']}")
print(f" + Built tree with {result.stats['total_nodes']} nodes")
print(f" + Leaf nodes: {result.stats['leaf_nodes']}")

# Step 2: Inter-context scheduling
print("\n2. Scheduling contexts for optimal execution...")
scheduled_reordered, scheduled_originals, final_mapping, groups = \
self.inter_scheduler.schedule_contexts(result)

print(f" Created {len(groups)} execution groups")
print(f" + Created {len(groups)} execution groups")

# Return results without going live (stateless)
scheduled_result = {
Expand All @@ -931,7 +931,7 @@ def schedule_only(self, contexts: List[List[int]]) -> Dict:
}

print("\n" + "=" * 80)
print(" BATCH SCHEDULED (Stateless - no cache tracking)")
print("+ BATCH SCHEDULED (Stateless - no cache tracking)")
print("=" * 80 + "\n")

return scheduled_result
Expand Down
8 changes: 4 additions & 4 deletions docs/guides/multi_turn.md
Original file line number Diff line number Diff line change
Expand Up @@ -256,10 +256,10 @@ print(f"New docs: {result['new_docs']}") # [2]

| Operation | `/reorder` | `/deduplicate` |
|-----------|----------|----------------|
| Index build | | ✗ |
| Clustering | | ✗ |
| Search | | ✗ |
| Deduplication | | |
| Index build | + | ✗ |
| Clustering | + | ✗ |
| Search | + | ✗ |
| Deduplication | + | + |
| **Latency** | ~50-200ms | ~1-5ms |

For multi-turn conversations, Turn 2+ typically doesn't need index operations — just deduplication against conversation history. The `/deduplicate` endpoint is **10-100x faster**.
Expand Down
147 changes: 147 additions & 0 deletions evaluation/benchmarks/run_bigcodebench_elm.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,147 @@
import asyncio
import json
import logging
import os
import re

# pip install datasets
from datasets import load_dataset
from openai import AsyncOpenAI

# Set PYTHONPATH in the environment before running
from refactored_plugins.skill_index import SkillAwareContextPlugin
from refactored_plugins.dedup import ContextDedupPlugin

logging.basicConfig(level=logging.INFO, format="%(asctime)s [%(levelname)s] %(name)s: %(message)s")
logger = logging.getLogger(__name__)

# Create a registry of 10 dummy tools to trigger the Skill plugin
DUMMY_TOOL_REGISTRY = {
f"tool_{i}": {
"type": "function",
"function": {
"name": f"tool_{i}",
"description": f"Dummy tool number {i}"
}
}
for i in range(1, 11)
}

async def process_task(task, skill_plugin, dedup_plugin, client, semaphore, output_file, turn_1_id):
"""
Processes a single BigCodeBench task through our ContextPilot plugins and ELM API.
"""
async with semaphore:
task_id = task.get("task_id", "unknown_task")
# BigCodeBench prompts are usually in 'complete_prompt' or 'instruction'
prompt = task.get("complete_prompt", task.get("instruction", "No prompt found."))

# Mock heavy agent request with redundant history and bloated tools
request = {
"user_id": "evaluator_1",
"parent_id": turn_1_id,
"_required_skills": ["tool_1", "tool_3", "tool_7"], # Require only 3 tools out of 10
"messages": [
{"role": "system", "content": "You are a senior python developer. Always wrap your code in ```python blocks."},
{"role": "user", "content": "Please help me write some code."},
{"role": "assistant", "content": "Of course! I can help you with that."},
{"role": "user", "content": prompt}
],
"tools": list(DUMMY_TOOL_REGISTRY.values())
}

# Pass through ContextPilot local plugins
optimized_request = await dedup_plugin.process(request)
optimized_request = await skill_plugin.process(optimized_request)

# Prepare ELM API request (OpenAI-compatible)
api_kwargs = {
"model": "gpt-5.5",
"messages": optimized_request.get("messages", [])
}
if "tools" in optimized_request and optimized_request["tools"]:
api_kwargs["tools"] = optimized_request["tools"]

try:
logger.info(f"Sending optimized task {task_id} to ELM API...")
response = await client.chat.completions.create(**api_kwargs)
response_content = response.choices[0].message.content
except Exception as e:
logger.error(f"API Error for {task_id}: {str(e)}")
response_content = ""

# Extract code block using regex
extracted_code = ""
if response_content:
match = re.search(r"```python\s*(.*?)\s*```", response_content, re.DOTALL)
if match:
extracted_code = match.group(1).strip()
else:
# Fallback if the LLM didn't use the markdown block
extracted_code = response_content.strip()

# Append result to JSONL
with open(output_file, "a", encoding="utf-8") as f:
f.write(json.dumps({"task_id": task_id, "solution": extracted_code}) + "\n")

logger.info(f"Finished {task_id}")

async def main():
api_key = os.environ.get("OPENAI_API_KEY", "dummy-elm-key")
base_url = os.environ.get("BASE_URL", "https://api.openai.com/v1")

client = AsyncOpenAI(api_key=api_key, base_url=base_url)

skill_plugin = SkillAwareContextPlugin(tool_registry=DUMMY_TOOL_REGISTRY)
dedup_plugin = ContextDedupPlugin()

# Pre-warm Dedup plugin with the initial messages to simulate conversation history
turn_1 = {
"user_id": "evaluator_1",
"messages": [
{"role": "system", "content": "You are a senior python developer. Always wrap your code in ```python blocks."},
{"role": "user", "content": "Please help me write some code."},
{"role": "assistant", "content": "Of course! I can help you with that."}
]
}
turn_1_res = await dedup_plugin.process(turn_1)
turn_1_id = turn_1_res.get("current_id")

# Load BigCodeBench dataset
logger.info("Loading BigCodeBench dataset...")
try:
dataset = load_dataset("bigcode/bigcodebench", split="train")
except Exception as e:
logger.warning(f"Failed to load split='train'. Trying standard default split. Error: {e}")
# Fallback to the common default split format if 'train' split does not exist
try:
dataset = load_dataset("bigcode/bigcodebench", split="v0.1.2")
except Exception:
dataset = load_dataset("bigcode/bigcodebench", split="v0.1.0_240822")

# Select first 5 tasks for a smoke test
tasks = list(dataset)[:5]
logger.info(f"Loaded {len(tasks)} tasks for smoke test.")

output_file = os.path.join(os.path.dirname(__file__), "elm_samples.jsonl")
if os.path.exists(output_file):
os.remove(output_file)

# Use a Semaphore with 1 to process sequentially and avoid early rate limits
semaphore = asyncio.Semaphore(1)

coroutines = [process_task(t, skill_plugin, dedup_plugin, client, semaphore, output_file, turn_1_id) for t in tasks]
await asyncio.gather(*coroutines)

print("\n=== Phase 2 Dataset Smoke Test Complete ===")
print(f"Results saved to {output_file}")

print("\n=== Combined Cost-Savings Telemetry ===")
metrics = {
"skill_plugin_metrics": skill_plugin.get_plugin_metrics(),
"dedup_plugin_metrics": dedup_plugin.get_plugin_metrics()
}
print(json.dumps(metrics, indent=2))

if __name__ == "__main__":
asyncio.run(main())
Loading