From 9fa86f7d39d1fa5a5c93bd115da72aa41dd3cc11 Mon Sep 17 00:00:00 2001 From: arahangua Date: Mon, 11 Aug 2025 13:09:34 +0900 Subject: [PATCH 1/3] fixed: readme subsection title links --- README.md | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/README.md b/README.md index 3d7596a..274b59c 100644 --- a/README.md +++ b/README.md @@ -11,7 +11,7 @@ **Official Python SDK for CZero Engine** *Personal AI Interface: full local AI suite with document processing, semantic search, and RAG system with AI personas* -[Installation](#-installation) • [Quick Start](#-quick-start) • [Examples](#-examples) • [API Docs](#-api-reference) • [Contributing](CONTRIBUTING.md) +[Installation](#-installation) • [Quick Start](#-quick-start) • [Examples](#-examples) • [API Docs](https://docs.czero.cc) • [Contributing](CONTRIBUTING.md) From c4e5f2e16221881b1edf25a1c7bbccc55b605456 Mon Sep 17 00:00:00 2001 From: arahangua Date: Mon, 11 Aug 2025 18:34:42 +0900 Subject: [PATCH 2/3] fixed:remarks on non-existent personas and handling of chatresponse --- examples/02_rag_system.py | 63 +++++++++++-------- src/czero_engine/client.py | 23 ++++++- src/czero_engine/models.py | 17 +++++ .../workflows/persona_workflow.py | 17 +---- 4 files changed, 75 insertions(+), 45 deletions(-) diff --git a/examples/02_rag_system.py b/examples/02_rag_system.py index b359bc4..761aeff 100644 --- a/examples/02_rag_system.py +++ b/examples/02_rag_system.py @@ -20,6 +20,8 @@ async def rag_example(): print("\n🚀 RAG System Example") print("=" * 50) + workspace_id = None + # Step 1: Create knowledge base from documents print("\n1. Creating Knowledge Base") print("-" * 30) @@ -70,41 +72,47 @@ async def rag_example(): chunk_overlap=50 ) + workspace_id = result['workspace']['id'] print(f"✅ Created workspace: {result['workspace']['name']}") - print(f" ID: {result['workspace']['id']}") + print(f" ID: {workspace_id}") print(f" Processed {result['files_processed']} files") print(f" Created {result['chunks_created']} chunks") + + # Important: Give the system time to index the documents + print("\n⏳ Waiting for indexing to complete...") + await asyncio.sleep(5) # Wait 5 seconds for indexing - # Step 2: Demonstrate hierarchical search - print("\n2. Hierarchical Semantic Search") + # Step 2: Verify documents are indexed by searching + print("\n2. Verifying Document Indexing") print("-" * 30) async with CZeroEngineClient() as client: - # Search with hierarchy support - results = await client.semantic_search( - query="How does AI and machine learning work?", + # Search for content we just indexed, using workspace filter + test_results = await client.semantic_search( + query="artificial intelligence", limit=3, - include_hierarchy=True, - hierarchy_level=None # Search all levels + similarity_threshold=0.3, # Low threshold to ensure we find something + workspace_filter=workspace_id # Search only in our workspace ) - print(f"Found {len(results.results)} results with hierarchy:") - for i, res in enumerate(results.results, 1): - print(f"\n {i}. Score: {res.similarity:.3f}") - print(f" {res.content[:100]}...") - if res.parent_chunk: - print(f" ↳ Has parent context") + if test_results.results: + print(f"✅ Found {len(test_results.results)} indexed documents") + for i, res in enumerate(test_results.results, 1): + print(f" {i}. Score: {res.similarity:.3f}") + print(f" {res.content[:100]}...") + else: + print("⚠️ Documents may still be indexing. Continuing with example...") # Step 3: Use RAG for Q&A print("\n3. RAG-Enhanced Q&A") print("-" * 30) + print("Note: RAG searches across all workspaces, not just the one we created.") async with RAGWorkflow() as rag_workflow: - # Ask questions with RAG + # Ask questions that match our indexed documents questions = [ - "What is CZero Engine and what are its main features?", + "What is artificial intelligence and machine learning?", "How does semantic search work?", - "What's the difference between AI, machine learning, and deep learning?", - "Does CZero Engine support GPU acceleration?" + "What features does CZero Engine provide?" ] for i, question in enumerate(questions, 1): @@ -112,7 +120,7 @@ async def rag_example(): response = await rag_workflow.ask( question=question, chunk_limit=3, - similarity_threshold=0.5 + similarity_threshold=0.3 # Lower threshold to be more inclusive ) print(f"💡 A{i}: {response.response[:250]}...") @@ -124,7 +132,7 @@ async def rag_example(): # Step 4: Compare with and without RAG print("\n4. RAG vs Non-RAG Comparison") print("-" * 30) - comparison_q = "What document processing features does CZero Engine provide?" + comparison_q = "What is machine learning and how does it relate to AI?" async with RAGWorkflow() as rag_workflow: comparison = await rag_workflow.compare_with_without_rag( @@ -133,28 +141,31 @@ async def rag_example(): print(f"\n🤔 Question: {comparison_q}") print("\n❌ Without RAG (generic response):") - print(f" {comparison['without_rag'][:200]}...") + print(f" {comparison['without_rag'].response[:200]}...") print("\n✅ With RAG (context-aware):") - print(f" {comparison['with_rag'][:200]}...") + print(f" {comparison['with_rag'].response[:200]}...") print(f"\n📊 Statistics:") - print(f" Context chunks used: {comparison['chunks_used']}") + chunks_used = len(comparison['with_rag'].context_used) if comparison['with_rag'].context_used else 0 + print(f" Context chunks used: {chunks_used}") print(f" Improvement: More specific and accurate with RAG") # Step 5: Find similar content print("\n5. Similarity Search") print("-" * 30) async with CZeroEngineClient() as client: - # Get all chunks first + # Search in our workspace for semantic search content search_res = await client.semantic_search( query="semantic search", - limit=1 + limit=1, + workspace_filter=workspace_id ) if search_res.results: chunk_id = search_res.results[0].chunk_id similar = await client.similarity_search( chunk_id=chunk_id, - limit=3 + limit=3, + similarity_threshold=0.3 # Lower threshold ) print(f"Content similar to chunk '{chunk_id[:20]}...':\n") diff --git a/src/czero_engine/client.py b/src/czero_engine/client.py index 6017dcd..6599f62 100644 --- a/src/czero_engine/client.py +++ b/src/czero_engine/client.py @@ -14,7 +14,7 @@ SimilaritySearchRequest, RecommendationsRequest, DocumentsResponse, DocumentMetadata, EmbeddingRequest, EmbeddingResponse, - WorkspaceCreateRequest, WorkspaceResponse, + WorkspaceCreateRequest, WorkspaceResponse, WorkspaceListResponse, WorkspaceInfo, ProcessFilesRequest, ProcessFilesResponse, ProcessingConfig, PersonaListResponse, PersonaChatRequest, PersonaChatResponse, HealthResponse, @@ -99,7 +99,8 @@ async def chat( temperature: float = 0.7, similarity_threshold: float = 0.7, chunk_limit: int = 5, - use_web_search: bool = False + use_web_search: bool = False, + workspace_filter: Optional[str] = None ) -> ChatResponse: """ Send a chat message to CZero Engine LLM with optional RAG. @@ -117,6 +118,7 @@ async def chat( similarity_threshold: Minimum similarity for RAG chunks chunk_limit: Maximum number of context chunks to retrieve use_web_search: Whether to enable web search (if available) + workspace_filter: Optional workspace ID to filter RAG context Returns: ChatResponse with generated text and optional context chunks @@ -132,7 +134,8 @@ async def chat( use_web_search=use_web_search, system_prompt=system_prompt, max_tokens=max_tokens, - temperature=temperature + temperature=temperature, + workspace_filter=workspace_filter ) self._log(f"Sending chat request (RAG: {use_rag})...") @@ -341,6 +344,20 @@ async def create_workspace( response.raise_for_status() return WorkspaceResponse(**response.json()) + async def list_workspaces(self) -> WorkspaceListResponse: + """ + List all available workspaces. + + Returns a list of workspaces with their IDs, names, paths, and status. + + Returns: + WorkspaceListResponse containing list of WorkspaceInfo objects + """ + self._log("Listing workspaces...") + response = await self.client.get(f"{self.base_url}/api/workspaces") + response.raise_for_status() + return WorkspaceListResponse(**response.json()) + async def process_files( self, workspace_id: str, diff --git a/src/czero_engine/models.py b/src/czero_engine/models.py index aba4025..9bd2167 100644 --- a/src/czero_engine/models.py +++ b/src/czero_engine/models.py @@ -22,6 +22,7 @@ class ChatRequest(BaseModel): system_prompt: Optional[str] = None max_tokens: Optional[int] = 1024 temperature: Optional[float] = 0.7 + workspace_filter: Optional[str] = None class ContextChunk(BaseModel): @@ -131,6 +132,22 @@ class WorkspaceResponse(BaseModel): created_at: str +class WorkspaceInfo(BaseModel): + """Information about a workspace.""" + id: str + name: str + path: str + description: Optional[str] = None + status: str + created_at: str + updated_at: str + + +class WorkspaceListResponse(BaseModel): + """Response model for /api/workspaces endpoint.""" + workspaces: List[WorkspaceInfo] + + class ProcessingConfig(BaseModel): """Configuration for file processing.""" chunk_size: Optional[int] = 1000 diff --git a/src/czero_engine/workflows/persona_workflow.py b/src/czero_engine/workflows/persona_workflow.py index c257ce5..4518b17 100644 --- a/src/czero_engine/workflows/persona_workflow.py +++ b/src/czero_engine/workflows/persona_workflow.py @@ -28,9 +28,7 @@ class PersonaWorkflow: Workflow for interacting with AI personas in CZero Engine. Personas provide specialized interaction styles and expertise: - - Gestalt: Adaptive general assistant - - Sage: Research and analysis expert - - Pioneer: Innovation and creative solutions + - Gestalt: General AI assistant (default persona) Each persona maintains conversation context for coherent dialogue. """ @@ -409,19 +407,6 @@ async def example_personas(): "Can you help me understand semantic search?" ) - # Switch to Sage persona - await workflow.select_persona("sage") - response = await workflow.chat( - "What are the philosophical implications of AI?" - ) - - # Multi-persona discussion - discussion = await workflow.multi_persona_discussion( - topic="The future of human-AI collaboration", - persona_ids=["gestalt-default", "sage", "pioneer"], - rounds=2 - ) - # Compare persona responses comparison = await workflow.persona_comparison( "How should we approach learning new technologies?" From d6e1a89b8001537501b3cac4b2eddf08511d94f0 Mon Sep 17 00:00:00 2001 From: arahangua Date: Mon, 25 Aug 2025 14:10:51 +0900 Subject: [PATCH 3/3] updated workflow template according to new consistency os refactor --- README.md | 130 ++++--- examples/02_rag_system.py | 168 +++++++-- examples/04_document_processing.py | 20 +- src/czero_engine/client.py | 199 +++++++++- src/czero_engine/models.py | 103 +++++- src/czero_engine/workflows/knowledge_base.py | 20 +- tests/test_all_endpoints.py | 361 +++++++++++++++++++ tests/test_processing_pipeline.py | 194 ++++++++++ 8 files changed, 1079 insertions(+), 116 deletions(-) create mode 100644 tests/test_all_endpoints.py create mode 100644 tests/test_processing_pipeline.py diff --git a/README.md b/README.md index 274b59c..da777d1 100644 --- a/README.md +++ b/README.md @@ -75,29 +75,33 @@ from czero_engine.workflows import KnowledgeBaseWorkflow async with KnowledgeBaseWorkflow() as kb: result = await kb.create_knowledge_base( name="Technical Docs", - directory_path="./documents", - chunk_size=1000, - chunk_overlap=200 + directory_path="./documents" ) - print(f"Processed {result['files_processed']} chunks") # Hierarchical chunking creates multiple chunks per file + print(f"Processed {result['files_processed']} files, created {result['chunks_created']} chunks") # Hierarchical chunking creates multiple chunks per file ``` ### 2. RAG-Enhanced Q&A ```python -from czero_engine.workflows import RAGWorkflow - -async with RAGWorkflow() as rag: +# Use direct client for RAG-enhanced chat +async with CZeroEngineClient() as client: # Ask with document context - answer = await rag.ask( - question="What are the key features?", + response = await client.chat( + message="What are the key features?", + use_rag=True, chunk_limit=5, - similarity_threshold=0.7 + similarity_threshold=0.3 # Lower threshold for better recall ) # Compare with/without RAG - comparison = await rag.compare_with_without_rag( - question="Explain semantic search" + response_no_rag = await client.chat( + message="Explain semantic search", + use_rag=False + ) + response_with_rag = await client.chat( + message="Explain semantic search", + use_rag=True, + similarity_threshold=0.3 ) ``` @@ -115,19 +119,18 @@ results = await client.semantic_search( ### 4. AI Persona Interactions ```python -from czero_engine.workflows import PersonaWorkflow - -async with PersonaWorkflow() as personas: +# Use direct client for persona interactions +async with CZeroEngineClient() as client: # Chat with default Gestalt persona - await personas.select_persona("gestalt-default") # Adaptive Intelligence - response = await personas.chat( - "Analyze the implications of AGI" + response = await client.chat_with_persona( + persona_id="gestalt-default", # Adaptive Intelligence + message="Analyze the implications of AGI" ) - # Or chat directly without selecting - response = await personas.chat( - "What are the key features of CZero Engine?", - persona_id="gestalt-default" + # Or use regular chat (defaults to Gestalt if no persona specified) + response = await client.chat( + message="What are the key features of CZero Engine?", + use_rag=True ) ``` @@ -204,12 +207,10 @@ async with CZeroEngineClient( path="./papers" ) - # Process documents (uses SmallToBig hierarchical chunking by default) + # Process documents (uses SmallToBig hierarchical chunking automatically) result = await client.process_files( workspace_id=workspace.id, - files=["paper1.pdf", "paper2.md"], - chunk_size=500, - chunk_overlap=100 + files=["paper1.pdf", "paper2.md"] ) # Semantic search @@ -231,8 +232,8 @@ async with CZeroEngineClient( # Check system health uv run czero health -# Create knowledge base -uv run czero create-kb ./docs --name "My KB" --chunk-size 1000 +# Create knowledge base (uses hierarchical chunking automatically) +uv run czero create-kb ./docs --name "My KB" # Search documents uv run czero search "query text" --limit 10 --threshold 0.7 @@ -318,19 +319,33 @@ from czero_engine.models import ( ```python async def build_qa_system(docs_dir: str): - # 1. Create knowledge base - async with KnowledgeBaseWorkflow() as kb: - await kb.create_knowledge_base("QA KB", docs_dir) - workspace_id = kb.workspace_id - - # 2. Interactive Q&A - async with RAGWorkflow() as rag: + async with CZeroEngineClient() as client: + # 1. Create workspace and process documents + workspace = await client.create_workspace( + name="QA KB", + path=docs_dir + ) + + # Process documents in the directory + files = list(Path(docs_dir).glob("**/*")) + file_paths = [str(f) for f in files if f.is_file()] + await client.process_files( + workspace_id=workspace.id, + files=file_paths + ) + + # 2. Interactive Q&A while True: q = input("Question: ") if q == 'quit': break - answer = await rag.ask(q, workspace_filter=workspace_id) - print(f"Answer: {answer.response}\n") + response = await client.chat( + message=q, + use_rag=True, + workspace_filter=workspace.id, + similarity_threshold=0.3 + ) + print(f"Answer: {response.response}\n") ``` ### Document Similarity Analysis @@ -353,20 +368,33 @@ async def analyze_similarity(doc1: str, doc2: str): ### Batch Processing with Progress ```python -async with DocumentProcessingWorkflow(verbose=True) as processor: - files = processor.discover_files("./docs", patterns=["*.pdf"]) - - stats = await processor.process_documents( - files=files, - workspace_name="Batch Process", - batch_size=10, # Process 10 files at a time - chunk_size=800 +async with CZeroEngineClient() as client: + # Create workspace + workspace = await client.create_workspace( + name="Batch Process", + path="./docs" ) - print(f"Files submitted: {stats.total_files}") - print(f"Chunks created: {stats.total_chunks}") # Hierarchical chunks - print(f"Est. Success rate: {stats.success_rate:.1f}%") - print(f"Throughput: {stats.total_chunks/stats.processing_time:.1f} chunks/s") + # Discover files + files = list(Path("./docs").glob("**/*.pdf")) + file_paths = [str(f) for f in files] + + # Process in batches of 10 + import math + batch_size = 10 + total_chunks = 0 + + for i in range(0, len(file_paths), batch_size): + batch = file_paths[i:i+batch_size] + result = await client.process_files( + workspace_id=workspace.id, + files=batch + ) + total_chunks += result.chunks_created + print(f"Batch {i//batch_size + 1}: {result.chunks_created} chunks") + + print(f"Files submitted: {len(file_paths)}") + print(f"Total chunks created: {total_chunks}") # Hierarchical chunks ``` ## 🌐 Cloud AI Integration @@ -438,9 +466,11 @@ CZERO_VERBOSE=false ## 📊 Performance Tips 1. **Batch Processing**: Process multiple files in parallel -2. **Chunk Size**: 500-1000 tokens for general documents +2. **Hierarchical Chunking**: System automatically uses SmallToBig chunking for optimal retrieval 3. **Hierarchy**: Use hierarchical search for structured documents 4. **Models**: Ensure LLM and embedding models are pre-loaded +5. **Local LLM Performance**: Expect 10-30 second response times for chat operations +6. **Similarity Thresholds**: Use 0.3 or lower for broader results with E5 models ## 🤝 Contributing diff --git a/examples/02_rag_system.py b/examples/02_rag_system.py index 761aeff..3eab8f2 100644 --- a/examples/02_rag_system.py +++ b/examples/02_rag_system.py @@ -6,6 +6,9 @@ - Chat with RAG context - Similarity-based recommendations - Comparing responses with and without RAG + +NOTE: Local LLMs may take longer to generate responses compared to cloud APIs. +Expect 10-30 seconds for complex queries depending on your hardware. """ import asyncio @@ -63,13 +66,11 @@ async def rag_example(): even when exact keywords don't match. """) - # Create knowledge base + # Create knowledge base using hierarchical chunking result = await kb_workflow.create_knowledge_base( name="AI Documentation", directory_path=str(docs_dir), - file_patterns=["*.txt", "*.md"], - chunk_size=500, - chunk_overlap=50 + file_patterns=["*.txt", "*.md"] ) workspace_id = result['workspace']['id'] @@ -105,10 +106,10 @@ async def rag_example(): # Step 3: Use RAG for Q&A print("\n3. RAG-Enhanced Q&A") print("-" * 30) - print("Note: RAG searches across all workspaces, not just the one we created.") - async with RAGWorkflow() as rag_workflow: - - # Ask questions that match our indexed documents + print("Note: Local LLMs may take 10-30 seconds per response.") + + async with CZeroEngineClient() as client: + # Ask questions using the actual chat API with RAG questions = [ "What is artificial intelligence and machine learning?", "How does semantic search work?", @@ -117,37 +118,75 @@ async def rag_example(): for i, question in enumerate(questions, 1): print(f"\n📝 Q{i}: {question}") - response = await rag_workflow.ask( - question=question, - chunk_limit=3, - similarity_threshold=0.3 # Lower threshold to be more inclusive - ) - print(f"💡 A{i}: {response.response[:250]}...") + print(" Generating response (this may take a moment with local LLM)...") - if response.context_used: - print(f" 📚 Used {len(response.context_used)} context chunks") - for j, ctx in enumerate(response.context_used[:2], 1): - print(f" {j}. {ctx.content[:60]}...") + try: + # Use the actual chat API with RAG config + response = await client.chat( + message=question, + use_rag=True, + workspace_filter=workspace_id, # Filter to our workspace + similarity_threshold=0.3, # Lower threshold for better recall + chunk_limit=5, + max_tokens=200 # Limit response length for faster generation + ) + + # Show the response + print(f"💡 A{i}: {response.response[:250]}...") + + # Show context if available + if response.context_used: + print(f" 📚 Used {len(response.context_used)} context chunks") + for j, ctx in enumerate(response.context_used[:2], 1): + print(f" {j}. Score: {ctx.similarity:.3f}") + print(f" {ctx.content[:60]}...") + + except asyncio.TimeoutError: + print(" ⚠️ Response generation timed out. Local LLMs can be slow.") + except Exception as e: + print(f" ❌ Error: {e}") # Step 4: Compare with and without RAG print("\n4. RAG vs Non-RAG Comparison") print("-" * 30) comparison_q = "What is machine learning and how does it relate to AI?" - async with RAGWorkflow() as rag_workflow: - comparison = await rag_workflow.compare_with_without_rag( - question=comparison_q - ) - + async with CZeroEngineClient() as client: print(f"\n🤔 Question: {comparison_q}") + + # Without RAG print("\n❌ Without RAG (generic response):") - print(f" {comparison['without_rag'].response[:200]}...") + print(" Generating...") + try: + response_no_rag = await client.chat( + message=comparison_q, + use_rag=False, + max_tokens=150 + ) + print(f" {response_no_rag.response[:200]}...") + except Exception as e: + print(f" Error: {e}") + + # With RAG print("\n✅ With RAG (context-aware):") - print(f" {comparison['with_rag'].response[:200]}...") - print(f"\n📊 Statistics:") - chunks_used = len(comparison['with_rag'].context_used) if comparison['with_rag'].context_used else 0 - print(f" Context chunks used: {chunks_used}") - print(f" Improvement: More specific and accurate with RAG") + print(" Generating...") + try: + response_with_rag = await client.chat( + message=comparison_q, + use_rag=True, + workspace_filter=workspace_id, + similarity_threshold=0.3, + chunk_limit=5, + max_tokens=150 + ) + print(f" {response_with_rag.response[:200]}...") + + print(f"\n📊 Statistics:") + chunks_used = len(response_with_rag.context_used) if response_with_rag.context_used else 0 + print(f" Context chunks used: {chunks_used}") + print(f" Improvement: RAG provides context from your documents") + except Exception as e: + print(f" Error: {e}") # Step 5: Find similar content print("\n5. Similarity Search") @@ -162,10 +201,9 @@ async def rag_example(): if search_res.results: chunk_id = search_res.results[0].chunk_id - similar = await client.similarity_search( + similar = await client.find_similar_chunks( chunk_id=chunk_id, - limit=3, - similarity_threshold=0.3 # Lower threshold + limit=3 ) print(f"Content similar to chunk '{chunk_id[:20]}...':\n") @@ -174,10 +212,75 @@ async def rag_example(): print(f" {res.content[:80]}...") +async def advanced_rag_example(): + """Demonstrate hierarchical retrieval for better context.""" + print("\n6. Advanced Hierarchical Retrieval") + print("-" * 30) + + async with CZeroEngineClient() as client: + # First create a workspace with some documents + workspace_id = await quick_setup_workspace(client) + + if workspace_id: + # Use hierarchical retrieval endpoint for better context + print("\nUsing hierarchical retrieval for enhanced context...") + try: + response = await client.hierarchical_retrieve( + query="machine learning and AI", + workspace_id=workspace_id, + limit=3, + similarity_threshold=0.3, + include_document_info=True + ) + + print(f"Found {len(response.results)} hierarchical results:") + for i, result in enumerate(response.results[:2], 1): + print(f"\n Result {i}:") + print(f" Small chunk (precise): {result.small_chunk.content[:100]}...") + print(f" Similarity: {result.small_chunk.similarity_score:.3f}") + if result.big_chunk: + print(f" Big chunk (context): {result.big_chunk.content[:100]}...") + if result.document_info: + print(f" Source: {result.document_info.document_name}") + + except Exception as e: + print(f"Error with hierarchical retrieval: {e}") + + +async def quick_setup_workspace(client): + """Quick helper to set up a test workspace.""" + try: + import tempfile + from pathlib import Path + + with tempfile.TemporaryDirectory() as temp_dir: + # Create a test document + doc_path = Path(temp_dir) / "test.txt" + doc_path.write_text("Machine learning is a subset of AI that enables systems to learn from data.") + + # Create workspace + workspace = await client.create_workspace( + name="Hierarchical Test", + path=temp_dir + ) + + # Process file + await client.process_files( + workspace_id=workspace.id, + files=[str(doc_path)] + ) + + await asyncio.sleep(1) # Brief wait for indexing + return workspace.id + except: + return None + + async def main(): """Run RAG examples with error handling.""" try: await rag_example() + await advanced_rag_example() print("\n✅ RAG examples completed successfully!") except Exception as e: print(f"\n❌ Error: {e}") @@ -186,6 +289,7 @@ async def main(): print("2. Check that API server is active") print("3. Verify embedding models are loaded") print("4. Confirm vector database is initialized") + print("5. Note: Local LLMs can be slow (10-30s per response)") if __name__ == "__main__": diff --git a/examples/04_document_processing.py b/examples/04_document_processing.py index 52ea2f5..2ccb2bc 100644 --- a/examples/04_document_processing.py +++ b/examples/04_document_processing.py @@ -129,9 +129,7 @@ def create_chunks(text, size): files = [str(tech_doc), str(business_doc), str(code_doc)] result = await client.process_files( workspace_id=workspace.id, - files=files, - chunk_size=200, # Smaller chunks for demo - chunk_overlap=50 + files=files ) print(f"✅ Processing complete:") @@ -268,9 +266,7 @@ def process(self, document): if md_files: stats = await workflow.process_documents( files=md_files, - workspace_name="Documentation", - chunk_size=300, - chunk_overlap=50 + workspace_name="Documentation" ) print(f"\n📝 Markdown files:") print(f" Files submitted: {stats.total_files}") @@ -282,9 +278,7 @@ def process(self, document): if py_files: stats = await workflow.process_documents( files=py_files, - workspace_name="Source Code", - chunk_size=200, - chunk_overlap=30 + workspace_name="Source Code" ) print(f"\n🐍 Python files:") print(f" Files submitted: {stats.total_files}") @@ -316,9 +310,7 @@ def process(self, document): stats = await workflow.process_documents( files=batch_files, workspace_name="Batch Demo", - batch_size=3, # Process 3 files at a time - chunk_size=100, - chunk_overlap=20 + batch_size=3 # Process 3 files at a time ) print(f"\n⚡ Batch processing results:") @@ -374,9 +366,7 @@ async def hierarchical_processing(): result = await client.process_files( workspace_id=workspace.id, - files=[str(doc_path)], - chunk_size=100, - chunk_overlap=20 + files=[str(doc_path)] ) print(f"✅ Hierarchical processing complete:") diff --git a/src/czero_engine/client.py b/src/czero_engine/client.py index 6599f62..d77d2ee 100644 --- a/src/czero_engine/client.py +++ b/src/czero_engine/client.py @@ -12,12 +12,15 @@ ChatRequest, ChatResponse, SemanticSearchRequest, SemanticSearchResponse, SimilaritySearchRequest, RecommendationsRequest, - DocumentsResponse, DocumentMetadata, + DocumentsResponse, DocumentMetadata, DocumentFullTextResponse, EmbeddingRequest, EmbeddingResponse, WorkspaceCreateRequest, WorkspaceResponse, WorkspaceListResponse, WorkspaceInfo, ProcessFilesRequest, ProcessFilesResponse, ProcessingConfig, PersonaListResponse, PersonaChatRequest, PersonaChatResponse, + PersonaCreateRequest, PersonaCreateResponse, HealthResponse, + AddDialogueRequest, AddDialogueResponse, + HierarchicalRetrievalRequest, HierarchicalRetrievalResponse, ) console = Console() @@ -275,6 +278,36 @@ async def list_documents(self) -> DocumentsResponse: response.raise_for_status() return DocumentsResponse(**response.json()) + async def get_document(self, document_id: str) -> DocumentMetadata: + """ + Get metadata for a specific document. + + Args: + document_id: ID of the document + + Returns: + DocumentMetadata for the document + """ + self._log(f"Fetching document: {document_id}") + response = await self.client.get(f"{self.base_url}/api/documents/{document_id}") + response.raise_for_status() + return DocumentMetadata(**response.json()) + + async def get_document_full_text(self, document_id: str) -> DocumentFullTextResponse: + """ + Get the full text content of a document. + + Args: + document_id: ID of the document + + Returns: + DocumentFullTextResponse with full document content + """ + self._log(f"Fetching full text for document: {document_id}") + response = await self.client.get(f"{self.base_url}/api/documents/{document_id}/full-text") + response.raise_for_status() + return DocumentFullTextResponse(**response.json()) + # ==================== Embedding Generation ==================== async def generate_embedding( @@ -405,6 +438,55 @@ async def process_files( response.raise_for_status() return ProcessFilesResponse(**response.json()) + async def delete_workspace(self, workspace_id: str) -> Dict[str, Any]: + """ + Delete a workspace by ID. + + Args: + workspace_id: ID of the workspace to delete + + Returns: + Success status and message + """ + self._log(f"Deleting workspace: {workspace_id}") + response = await self.client.delete(f"{self.base_url}/api/workspaces/{workspace_id}") + response.raise_for_status() + return response.json() + + async def add_dialogue_to_workspace( + self, + workspace_id: str, + dialogue_text: str, + character_name: str = "Unknown Character" + ) -> AddDialogueResponse: + """ + Add dialogue text to a workspace as a new document. + + This is useful for adding conversation history or character dialogues + to your knowledge base. + + Args: + workspace_id: ID of the workspace + dialogue_text: The dialogue text to add + character_name: Name of the character/speaker + + Returns: + AddDialogueResponse with processing results + """ + request = AddDialogueRequest( + workspace_id=workspace_id, + dialogue_text=dialogue_text, + character_name=character_name + ) + + self._log(f"Adding dialogue to workspace {workspace_id}") + response = await self.client.post( + f"{self.base_url}/api/workspaces/add-dialogue", + json=request.model_dump() + ) + response.raise_for_status() + return AddDialogueResponse(**response.json()) + # ==================== Persona Endpoints ==================== async def list_personas(self) -> PersonaListResponse: @@ -466,6 +548,121 @@ async def persona_chat( response.raise_for_status() return PersonaChatResponse(**response.json()) + async def create_persona( + self, + id: str, + name: str, + specialty: str, + system_prompt_template: str, + tagline: Optional[str] = None, + description: Optional[str] = None, + background: Optional[str] = None, + traits: Optional[List[str]] = None, + greeting_message: Optional[str] = None, + workspace_id: Optional[str] = None + ) -> PersonaCreateResponse: + """ + Create a new AI persona. + + Args: + id: Unique ID for the persona + name: Display name of the persona + specialty: The persona's area of expertise + system_prompt_template: System prompt that defines the persona's behavior + tagline: Short tagline for the persona + description: Detailed description + background: Background story + traits: List of personality traits + greeting_message: Initial greeting message + workspace_id: Optional workspace to associate with + + Returns: + PersonaCreateResponse with creation status + """ + request = PersonaCreateRequest( + id=id, + name=name, + tagline=tagline, + description=description, + background=background, + traits=traits, + specialty=specialty, + greeting_message=greeting_message, + system_prompt_template=system_prompt_template, + workspace_id=workspace_id + ) + + self._log(f"Creating persona: {name}") + response = await self.client.post( + f"{self.base_url}/api/personas/create", + json=request.model_dump(exclude_none=True) + ) + response.raise_for_status() + return PersonaCreateResponse(**response.json()) + + async def delete_persona(self, persona_id: str) -> Dict[str, Any]: + """ + Delete a persona by ID. + + Args: + persona_id: ID of the persona to delete + + Returns: + Success status and message + """ + self._log(f"Deleting persona: {persona_id}") + response = await self.client.delete(f"{self.base_url}/api/personas/{persona_id}") + response.raise_for_status() + return response.json() + + # ==================== Hierarchical Retrieval ==================== + + async def hierarchical_retrieve( + self, + query: str, + workspace_id: str, + limit: int = 5, + similarity_threshold: float = 0.3, + include_kg_triples: bool = False, + include_document_info: bool = False + ) -> HierarchicalRetrievalResponse: + """ + Perform hierarchical retrieval with enhanced context. + + This advanced retrieval method provides: + - Small chunks for precision + - Big chunks for context + - Optional knowledge graph triples + - Optional document information + + Args: + query: Search query + workspace_id: Workspace to search in (required) + limit: Maximum number of results + similarity_threshold: Minimum similarity score + include_kg_triples: Include knowledge graph relationships + include_document_info: Include document metadata + + Returns: + HierarchicalRetrievalResponse with structured results + """ + request = HierarchicalRetrievalRequest( + query=query, + workspace_id=workspace_id, + limit=limit, + similarity_threshold=similarity_threshold, + include_kg_triples=include_kg_triples, + include_document_info=include_document_info + ) + + self._log(f"Performing hierarchical retrieval in workspace {workspace_id}") + response = await self.client.post( + f"{self.base_url}/api/retrieve", + json=request.model_dump() + ) + response.raise_for_status() + return HierarchicalRetrievalResponse(**response.json()) + # ==================== Utility Methods ==================== def print_search_results(self, response: SemanticSearchResponse): diff --git a/src/czero_engine/models.py b/src/czero_engine/models.py index 9bd2167..5139cd4 100644 --- a/src/czero_engine/models.py +++ b/src/czero_engine/models.py @@ -221,4 +221,105 @@ class HealthResponse(BaseModel): status: str service: str version: str - timestamp: str \ No newline at end of file + timestamp: str + + +# Persona Creation Models +class PersonaCreateRequest(BaseModel): + """Request model for /api/personas/create endpoint.""" + id: str + name: str + tagline: Optional[str] = None + description: Optional[str] = None + background: Optional[str] = None + traits: Optional[List[str]] = None + specialty: str + greeting_message: Optional[str] = None + system_prompt_template: str + workspace_id: Optional[str] = None + + +class PersonaCreateResponse(BaseModel): + """Response model for persona creation.""" + success: bool + persona_id: str + message: str + + +# Workspace Add Dialogue Models +class AddDialogueRequest(BaseModel): + """Request model for /api/workspaces/add-dialogue endpoint.""" + workspace_id: str + dialogue_text: str + character_name: str = "Unknown Character" + + +class AddDialogueResponse(BaseModel): + """Response model for adding dialogue to workspace.""" + success: bool + chunks_created: int + file_path: str + message: str + + +# Document Full Text Model +class DocumentFullTextResponse(BaseModel): + """Response model for /api/documents/:id/full-text endpoint.""" + document_id: str + title: str + file_path: str + content: str + length: int + + +# Hierarchical Retrieval Models +class ChunkWithScore(BaseModel): + """Chunk with similarity score for hierarchical retrieval.""" + chunk_id: str + content: str + similarity_score: float + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class ChunkInfo(BaseModel): + """Basic chunk information for hierarchical retrieval.""" + chunk_id: str + content: str + metadata: Dict[str, Any] = Field(default_factory=dict) + + +class KGTriple(BaseModel): + """Knowledge graph triple.""" + subject: str + predicate: str + object: str + + +class DocumentInfo(BaseModel): + """Document information for hierarchical retrieval.""" + document_id: str + document_name: str + total_chunks: int + + +class HierarchicalResult(BaseModel): + """Single hierarchical retrieval result.""" + small_chunk: ChunkWithScore + big_chunk: Optional[ChunkInfo] = None + kg_triples: Optional[List[KGTriple]] = None + document_info: Optional[DocumentInfo] = None + + +class HierarchicalRetrievalRequest(BaseModel): + """Request model for /api/retrieve endpoint.""" + query: str + workspace_id: str + limit: int = 5 + similarity_threshold: float = 0.3 + include_kg_triples: bool = False + include_document_info: bool = False + + +class HierarchicalRetrievalResponse(BaseModel): + """Response model for hierarchical retrieval.""" + results: List[HierarchicalResult] \ No newline at end of file diff --git a/src/czero_engine/workflows/knowledge_base.py b/src/czero_engine/workflows/knowledge_base.py index 7827e09..30edc9d 100644 --- a/src/czero_engine/workflows/knowledge_base.py +++ b/src/czero_engine/workflows/knowledge_base.py @@ -56,8 +56,6 @@ async def create_knowledge_base( name: str, directory_path: str, file_patterns: Optional[List[str]] = None, - chunk_size: int = 1000, - chunk_overlap: int = 200, description: Optional[str] = None ) -> Dict[str, Any]: """ @@ -67,8 +65,6 @@ async def create_knowledge_base( name: Name for the knowledge base/workspace directory_path: Path to directory containing documents file_patterns: Optional file patterns to include (e.g., ["*.pdf", "*.txt"]) - chunk_size: Size of text chunks for processing - chunk_overlap: Overlap between chunks description: Optional description for the workspace Returns: @@ -140,12 +136,10 @@ async def create_knowledge_base( batch_paths = [str(f.absolute()) for f in batch] try: - # Process batch + # Process batch using hierarchical chunking result = await self.client.process_files( workspace_id=workspace.id, - files=batch_paths, - chunk_size=chunk_size, - chunk_overlap=chunk_overlap + files=batch_paths ) total_processed += result.files_processed @@ -174,8 +168,6 @@ async def create_knowledge_base( summary.add_row("Files Processed", str(total_processed)) summary.add_row("Files Failed", str(failed_files)) summary.add_row("Chunks Created", str(total_chunks)) - summary.add_row("Chunk Size", f"{chunk_size} chars") - summary.add_row("Chunk Overlap", f"{chunk_overlap} chars") console.print(summary) @@ -183,11 +175,7 @@ async def create_knowledge_base( "workspace": workspace.model_dump(), "files_processed": total_processed, "files_failed": failed_files, - "chunks_created": total_chunks, - "processing_config": { - "chunk_size": chunk_size, - "chunk_overlap": chunk_overlap - } + "chunks_created": total_chunks } async def query( @@ -311,8 +299,6 @@ async def example_knowledge_base(): name="Technical Documentation", directory_path="./docs", file_patterns=["*.md", "*.txt", "*.pdf"], - chunk_size=1000, - chunk_overlap=200, description="Technical documentation and guides" ) diff --git a/tests/test_all_endpoints.py b/tests/test_all_endpoints.py new file mode 100644 index 0000000..4c42392 --- /dev/null +++ b/tests/test_all_endpoints.py @@ -0,0 +1,361 @@ +"""Comprehensive API endpoint testing script for CZero Engine. + +This script tests all API endpoints to ensure they work correctly. +Run this after starting the CZero Engine app with API server enabled. +""" + +import asyncio +import json +import os +import tempfile +from typing import Dict, Any, Optional +from pathlib import Path +import httpx +from rich.console import Console +from rich.table import Table +from rich.panel import Panel +from rich.progress import Progress, SpinnerColumn, TextColumn +import uuid + +console = Console() + +class EndpointTester: + def __init__(self, base_url: str = "http://localhost:1421"): + self.base_url = base_url + self.client = httpx.AsyncClient(timeout=60.0) + self.test_results = [] + self.workspace_id = None + self.document_id = None + self.persona_id = None + self.chunk_id = None + + async def __aenter__(self): + return self + + async def __aexit__(self, exc_type, exc_val, exc_tb): + await self.client.aclose() + + def record_result(self, endpoint: str, method: str, success: bool, message: str, details: Any = None): + """Record test result for reporting.""" + self.test_results.append({ + "endpoint": endpoint, + "method": method, + "success": success, + "message": message, + "details": details + }) + + async def test_endpoint(self, name: str, method: str, path: str, json_data: Optional[Dict] = None) -> Dict: + """Test a single endpoint and record results.""" + console.print(f"\n[cyan]Testing:[/cyan] {method} {path}") + + try: + if method == "GET": + response = await self.client.get(f"{self.base_url}{path}") + elif method == "POST": + response = await self.client.post(f"{self.base_url}{path}", json=json_data) + elif method == "DELETE": + response = await self.client.delete(f"{self.base_url}{path}") + else: + raise ValueError(f"Unsupported method: {method}") + + response.raise_for_status() + data = response.json() + + self.record_result(path, method, True, f"✅ {name} successful", data) + console.print(f" [green]✅ Success[/green] - Status: {response.status_code}") + + # Print sample response + if isinstance(data, dict): + for key in list(data.keys())[:3]: # Show first 3 fields + value = str(data[key])[:50] + "..." if len(str(data[key])) > 50 else str(data[key]) + console.print(f" {key}: {value}") + + return data + + except httpx.HTTPStatusError as e: + error_msg = f"HTTP {e.response.status_code}" + try: + error_data = e.response.json() + error_msg += f" - {error_data.get('error', 'Unknown error')}" + except: + pass + + self.record_result(path, method, False, f"❌ {name} failed: {error_msg}") + console.print(f" [red]❌ Failed[/red] - {error_msg}") + return {} + + except Exception as e: + self.record_result(path, method, False, f"❌ {name} error: {str(e)}") + console.print(f" [red]❌ Error[/red] - {str(e)}") + return {} + + async def run_all_tests(self): + """Run all endpoint tests in sequence.""" + console.print(Panel.fit("🚀 CZero Engine API Endpoint Testing", style="bold cyan")) + + # 1. Health Check + console.print("\n[bold yellow]═══ Health & Status ═══[/bold yellow]") + await self.test_endpoint("Health Check", "GET", "/api/health") + + # 2. Workspace Management (create workspace first for other tests) + console.print("\n[bold yellow]═══ Workspace Management ═══[/bold yellow]") + + with tempfile.TemporaryDirectory() as temp_dir: + # Create workspace + workspace_data = await self.test_endpoint( + "Create Workspace", "POST", "/api/workspaces/create", + { + "name": f"Test Workspace {uuid.uuid4().hex[:8]}", + "path": temp_dir, + "description": "API test workspace" + } + ) + if workspace_data: + self.workspace_id = workspace_data.get("id") + + # List workspaces + await self.test_endpoint("List Workspaces", "GET", "/api/workspaces") + + # Create test files for processing + if self.workspace_id: + test_file = Path(temp_dir) / "test_document.txt" + test_file.write_text("This is a test document for CZero Engine API testing. " * 50) + + # Process files + await self.test_endpoint( + "Process Files", "POST", "/api/workspaces/process", + { + "workspace_id": self.workspace_id, + "files": [str(test_file)], + "config": { + "chunk_size": 500, + "chunk_overlap": 100 + } + } + ) + + # Add dialogue + await self.test_endpoint( + "Add Dialogue", "POST", "/api/workspaces/add-dialogue", + { + "workspace_id": self.workspace_id, + "dialogue_text": "User: Hello AI!\nAI: Hello! How can I help you today?", + "character_name": "Test Character" + } + ) + + # 3. Document Management + console.print("\n[bold yellow]═══ Document Management ═══[/bold yellow]") + docs_data = await self.test_endpoint("List Documents", "GET", "/api/documents") + + if docs_data and docs_data.get("documents"): + self.document_id = docs_data["documents"][0]["id"] + + # Get single document + if self.document_id: + await self.test_endpoint( + "Get Document", "GET", f"/api/documents/{self.document_id}" + ) + + # Get document full text + await self.test_endpoint( + "Get Document Full Text", "GET", f"/api/documents/{self.document_id}/full-text" + ) + + # 4. Embedding Generation + console.print("\n[bold yellow]═══ Embedding Generation ═══[/bold yellow]") + embedding_data = await self.test_endpoint( + "Generate Embedding", "POST", "/api/embeddings/generate", + { + "text": "Test text for embedding generation" + } + ) + + # 5. Chat Endpoints + console.print("\n[bold yellow]═══ Chat/LLM Endpoints ═══[/bold yellow]") + + # Basic chat without RAG + await self.test_endpoint( + "Chat (No RAG)", "POST", "/api/chat/send", + { + "message": "What is 2+2?", + "use_rag": False, + "max_tokens": 50 + } + ) + + # Chat with RAG (if workspace available) + if self.workspace_id: + await self.test_endpoint( + "Chat with RAG", "POST", "/api/chat/send", + { + "message": "What is in the test document?", + "use_rag": True, + "workspace_filter": self.workspace_id, + "rag_config": { + "similarity_threshold": 0.3, + "chunk_limit": 5 + } + } + ) + + # 6. Vector Search + console.print("\n[bold yellow]═══ Vector Search ═══[/bold yellow]") + + # Semantic search + search_data = await self.test_endpoint( + "Semantic Search", "POST", "/api/vector/search/semantic", + { + "query": "test document", + "limit": 5, + "similarity_threshold": 0.3, + "workspace_filter": self.workspace_id if self.workspace_id else None + } + ) + + # Extract chunk_id for similarity search + if search_data and search_data.get("results"): + self.chunk_id = search_data["results"][0]["chunk_id"] + + # Similarity search + await self.test_endpoint( + "Similarity Search", "POST", "/api/vector/search/similarity", + { + "chunk_id": self.chunk_id, + "limit": 3 + } + ) + + # Recommendations + await self.test_endpoint( + "Get Recommendations", "POST", "/api/vector/recommendations", + { + "positive_chunk_ids": [self.chunk_id], + "limit": 5 + } + ) + + # 7. Hierarchical Retrieval + console.print("\n[bold yellow]═══ Hierarchical Retrieval ═══[/bold yellow]") + if self.workspace_id: + await self.test_endpoint( + "Hierarchical Retrieve", "POST", "/api/retrieve", + { + "query": "test document", + "workspace_id": self.workspace_id, + "limit": 5, + "similarity_threshold": 0.3, + "include_kg_triples": False, + "include_document_info": True + } + ) + + # 8. Persona Management + console.print("\n[bold yellow]═══ Persona Management ═══[/bold yellow]") + + # List personas + personas_data = await self.test_endpoint("List Personas", "GET", "/api/personas/list") + + # Create persona + persona_data = await self.test_endpoint( + "Create Persona", "POST", "/api/personas/create", + { + "id": f"test-persona-{uuid.uuid4().hex[:8]}", + "name": "Test Persona", + "tagline": "A test AI persona", + "description": "This persona is for testing", + "specialty": "testing", + "system_prompt_template": "You are a helpful test assistant.", + "workspace_id": self.workspace_id + } + ) + + if persona_data and persona_data.get("persona_id"): + self.persona_id = persona_data["persona_id"] + + # Chat with persona + await self.test_endpoint( + "Persona Chat", "POST", "/api/personas/chat", + { + "persona_id": self.persona_id, + "message": "Hello, test persona!", + "max_tokens": 100 + } + ) + + # Delete persona + await self.test_endpoint( + "Delete Persona", "DELETE", f"/api/personas/{self.persona_id}" + ) + + # 9. Cleanup - Delete test workspace + if self.workspace_id: + console.print("\n[bold yellow]═══ Cleanup ═══[/bold yellow]") + await self.test_endpoint( + "Delete Workspace", "DELETE", f"/api/workspaces/{self.workspace_id}" + ) + + # Print summary + self.print_summary() + + def print_summary(self): + """Print test results summary.""" + console.print("\n" + "="*60) + console.print(Panel.fit("📊 Test Results Summary", style="bold cyan")) + + table = Table(show_header=True, header_style="bold magenta") + table.add_column("Endpoint", style="cyan", width=40) + table.add_column("Method", style="yellow", width=10) + table.add_column("Status", width=10) + table.add_column("Message", style="dim", width=40) + + success_count = 0 + total_count = len(self.test_results) + + for result in self.test_results: + status_symbol = "✅" if result["success"] else "❌" + status_color = "green" if result["success"] else "red" + + if result["success"]: + success_count += 1 + + table.add_row( + result["endpoint"], + result["method"], + f"[{status_color}]{status_symbol}[/{status_color}]", + result["message"][:40] + "..." if len(result["message"]) > 40 else result["message"] + ) + + console.print(table) + + # Summary stats + success_rate = (success_count / total_count * 100) if total_count > 0 else 0 + console.print(f"\n[bold]Total Tests:[/bold] {total_count}") + console.print(f"[bold green]Passed:[/bold green] {success_count}") + console.print(f"[bold red]Failed:[/bold red] {total_count - success_count}") + console.print(f"[bold]Success Rate:[/bold] {success_rate:.1f}%") + + if success_rate == 100: + console.print("\n[bold green]🎉 All tests passed![/bold green]") + elif success_rate >= 80: + console.print("\n[bold yellow]⚠️ Most tests passed, but some issues found.[/bold yellow]") + else: + console.print("\n[bold red]❌ Many tests failed. Please check the API server.[/bold red]") + + +async def main(): + """Main entry point.""" + try: + async with EndpointTester() as tester: + await tester.run_all_tests() + except Exception as e: + console.print(f"\n[bold red]Fatal Error:[/bold red] {e}") + console.print("\n[yellow]Make sure:[/yellow]") + console.print("1. CZero Engine app is running") + console.print("2. API server is enabled (port 1421)") + console.print("3. At least one LLM and embedding model are loaded") + + +if __name__ == "__main__": + asyncio.run(main()) \ No newline at end of file diff --git a/tests/test_processing_pipeline.py b/tests/test_processing_pipeline.py new file mode 100644 index 0000000..76de0d4 --- /dev/null +++ b/tests/test_processing_pipeline.py @@ -0,0 +1,194 @@ +"""Test the processing pipeline to ensure it matches UI behavior.""" + +import asyncio +import httpx +import uuid +import tempfile +from pathlib import Path +import json + +async def test_processing_pipeline(): + """Test that API processing matches UI processing behavior.""" + base_url = "http://localhost:1421" + + async with httpx.AsyncClient(timeout=60.0) as client: + print("=" * 60) + print("Testing Processing Pipeline") + print("=" * 60) + + # 1. Create a test workspace with actual files + print("\n1. Creating test workspace with files...") + + with tempfile.TemporaryDirectory() as temp_dir: + workspace_id = None + + try: + # Create test files with content + test_files = [] + + # Create a markdown file + md_file = Path(temp_dir) / "test_document.md" + md_file.write_text("""# Test Document + +## Introduction +This is a test document for hierarchical chunking. + +## Section 1: Background +The document processing pipeline should create hierarchical chunks. +This means larger parent chunks and smaller child chunks. + +## Section 2: Details +Here are some important details: +- Point 1: Hierarchical chunking improves retrieval +- Point 2: It provides better context +- Point 3: It enables multi-level search + +## Conclusion +This test verifies the processing pipeline works correctly. +""") + test_files.append(str(md_file)) + + # Create a text file + txt_file = Path(temp_dir) / "notes.txt" + txt_file.write_text("""Important Notes + +These are some notes to test processing. +The system should chunk this content hierarchically. +Each paragraph might become a chunk. +And larger sections become parent chunks. + +This is another paragraph with different content. +It should be processed separately but maintain relationships. +""") + test_files.append(str(txt_file)) + + print(f" Created {len(test_files)} test files in {temp_dir}") + + # Create workspace + response = await client.post( + f"{base_url}/api/workspaces/create", + json={ + "name": f"Process Test {uuid.uuid4().hex[:8]}", + "path": temp_dir, + "description": "Testing processing pipeline" + } + ) + response.raise_for_status() + workspace_data = response.json() + workspace_id = workspace_data["id"] + print(f" ✅ Created workspace: {workspace_id}") + + # 2. Process the files + print("\n2. Processing workspace files...") + response = await client.post( + f"{base_url}/api/workspaces/process", + json={ + "workspace_id": workspace_id, + "files": test_files, + "config": None # Use default config (hierarchical) + } + ) + + if response.status_code == 200: + process_result = response.json() + print(f" ✅ Processing successful!") + print(f" Files processed: {process_result['files_processed']}") + print(f" Files failed: {process_result['files_failed']}") + print(f" Chunks created: {process_result['chunks_created']}") + print(f" Processing time: {process_result['processing_time']:.2f}s") + print(f" Message: {process_result['message']}") + + # 3. Verify chunks were created by searching + print("\n3. Verifying hierarchical chunks...") + + # Search for content + response = await client.post( + f"{base_url}/api/vector/search/semantic", + json={ + "query": "hierarchical chunking", + "workspace_filter": workspace_id, + "limit": 5, + "include_hierarchy": True + } + ) + + if response.status_code == 200: + search_results = response.json() + if search_results.get("results"): + print(f" ✅ Found {len(search_results['results'])} chunks") + + # Check first result + first_result = search_results["results"][0] + print(f" Chunk ID: {first_result['chunk_id'][:16]}...") + print(f" Content preview: {first_result['content'][:100]}...") + print(f" Similarity: {first_result['similarity']:.3f}") + + # Check for hierarchical structure + if first_result.get('parent_chunk'): + print(f" ✅ Has parent chunk (hierarchical structure confirmed)") + elif first_result.get('hierarchy_path'): + print(f" ✅ Has hierarchy path (hierarchical structure confirmed)") + else: + print(f" ⚠️ No hierarchical structure detected in search results") + else: + print(f" ❌ No chunks found in search") + else: + print(f" ❌ Search failed: {response.status_code}") + + # 4. Test hierarchical retrieval + print("\n4. Testing hierarchical retrieval...") + response = await client.post( + f"{base_url}/api/retrieve", + json={ + "query": "test document", + "workspace_id": workspace_id, + "limit": 3, + "include_document_info": True + } + ) + + if response.status_code == 200: + retrieval_results = response.json() + if retrieval_results.get("results"): + print(f" ✅ Hierarchical retrieval found {len(retrieval_results['results'])} results") + + first = retrieval_results["results"][0] + if first.get("small_chunk") and first.get("big_chunk"): + print(f" ✅ Has both small and big chunks") + print(f" Small chunk: {first['small_chunk']['content'][:50]}...") + print(f" Big chunk: {first['big_chunk']['content'][:50]}...") + + if first.get("document_info"): + doc_info = first["document_info"] + print(f" Document: {doc_info['document_name']}") + print(f" Total chunks: {doc_info['total_chunks']}") + else: + print(f" ❌ No hierarchical results found") + else: + print(f" ❌ Hierarchical retrieval failed: {response.status_code}") + + else: + print(f" ❌ Processing failed: {response.status_code}") + try: + error_data = response.json() + print(f" Error: {error_data.get('error', 'Unknown')}") + print(f" Details: {error_data.get('details', 'None')}") + except: + print(f" Raw response: {response.text}") + + # 5. Cleanup + if workspace_id: + print("\n5. Cleaning up...") + response = await client.delete(f"{base_url}/api/workspaces/{workspace_id}") + if response.status_code == 200: + print(f" ✅ Workspace deleted") + else: + print(f" ⚠️ Failed to delete workspace") + + except Exception as e: + print(f"\n❌ Error: {e}") + import traceback + traceback.print_exc() + +if __name__ == "__main__": + asyncio.run(test_processing_pipeline()) \ No newline at end of file