diff --git a/examples/agent_example.py b/examples/agent_example.py index 5f461c8..56d91df 100644 --- a/examples/agent_example.py +++ b/examples/agent_example.py @@ -9,19 +9,15 @@ from langchain_core.prompts import ChatPromptTemplate, MessagesPlaceholder from langchain_openai import ChatOpenAI -from langchain_scrapegraph.tools import ( - GetCreditsTool, - SearchScraperTool, - SmartScraperTool, -) +from langchain_scrapegraph.tools import ExtractTool, GetCreditsTool, SearchTool load_dotenv() # Initialize the tools tools = [ - SmartScraperTool(), + ExtractTool(), GetCreditsTool(), - SearchScraperTool(), + SearchTool(), ] # Create the prompt template diff --git a/examples/agentic_scraper_tool.py b/examples/agentic_scraper_tool.py deleted file mode 100644 index a4e525d..0000000 --- a/examples/agentic_scraper_tool.py +++ /dev/null @@ -1,103 +0,0 @@ -import json - -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import AgenticScraperTool - -sgai_logger.set_logging(level="INFO") - -# Will automatically get SGAI_API_KEY from environment -tool = AgenticScraperTool() - -# Example 1: Basic usage with form filling and navigation -print("=== Example 1: Basic Form Filling ===") -url = "https://example.com/login" -steps = [ - "Type 'user@example.com' in email input box", - "Type 'password123' in password input box", - "Click on login button", -] - -try: - result = tool.invoke({"url": url, "steps": steps, "use_session": True}) - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") - -print("\n" + "=" * 50 + "\n") - -# Example 2: With AI extraction and structured output -print("=== Example 2: AI Extraction with Schema ===") -dashboard_url = "https://dashboard.example.com" -dashboard_steps = [ - "Navigate to user profile section", - "Click on settings tab", - "Wait for page to load", -] - -# Define the output schema for structured data -output_schema = { - "user_info": { - "type": "object", - "properties": { - "username": {"type": "string"}, - "email": {"type": "string"}, - "dashboard_sections": {"type": "array", "items": {"type": "string"}}, - "available_settings": {"type": "array", "items": {"type": "string"}}, - }, - } -} - -try: - result = tool.invoke( - { - "url": dashboard_url, - "steps": dashboard_steps, - "ai_extraction": True, - "user_prompt": "Extract user profile information and available dashboard sections and settings", - "output_schema": output_schema, - "use_session": True, - } - ) - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") - -print("\n" + "=" * 50 + "\n") - -# Example 3: E-commerce product search -print("=== Example 3: E-commerce Product Search ===") -ecommerce_url = "https://shop.example.com" -search_steps = [ - "Type 'laptop' in search input box", - "Click on search button", - "Wait for results to load", - "Click on first product", -] - -search_schema = { - "product_info": { - "type": "object", - "properties": { - "product_name": {"type": "string"}, - "price": {"type": "string"}, - "description": {"type": "string"}, - "availability": {"type": "string"}, - }, - } -} - -try: - result = tool.invoke( - { - "url": ecommerce_url, - "steps": search_steps, - "ai_extraction": True, - "user_prompt": "Extract product information including name, price, description, and availability", - "output_schema": search_schema, - "use_session": True, - } - ) - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") diff --git a/examples/agentic_scraper_tool_schema.py b/examples/agentic_scraper_tool_schema.py deleted file mode 100644 index 0eb1958..0000000 --- a/examples/agentic_scraper_tool_schema.py +++ /dev/null @@ -1,156 +0,0 @@ -import json - -from pydantic import BaseModel, Field -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import AgenticScraperTool - -sgai_logger.set_logging(level="INFO") - - -# Define output schemas for different use cases -class UserProfileInfo(BaseModel): - username: str = Field(description="The user's username") - email: str = Field(description="The user's email address") - dashboard_sections: list[str] = Field(description="Available dashboard sections") - available_settings: list[str] = Field(description="Available user settings") - - -class ProductInfo(BaseModel): - product_name: str = Field(description="The name of the product") - price: str = Field(description="The price of the product") - description: str = Field(description="Product description") - availability: str = Field(description="Product availability status") - rating: float = Field(description="Product rating out of 5") - - -class LoginResult(BaseModel): - success: bool = Field(description="Whether login was successful") - error_message: str = Field(description="Error message if login failed", default="") - redirect_url: str = Field(description="URL to redirect to after login", default="") - - -# Initialize the tool with different schemas for different use cases -print("=== Example 1: User Profile Extraction with Schema ===") -tool_with_profile_schema = AgenticScraperTool(llm_output_schema=UserProfileInfo) - -dashboard_url = "https://dashboard.example.com" -dashboard_steps = [ - "Navigate to user profile section", - "Click on settings tab", - "Wait for page to load", -] - -try: - result = tool_with_profile_schema.invoke( - { - "url": dashboard_url, - "steps": dashboard_steps, - "ai_extraction": True, - "user_prompt": "Extract user profile information and available dashboard sections and settings", - "use_session": True, - } - ) - print("User Profile Result:") - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") - -print("\n" + "=" * 50 + "\n") - -print("=== Example 2: Product Information Extraction with Schema ===") -tool_with_product_schema = AgenticScraperTool(llm_output_schema=ProductInfo) - -ecommerce_url = "https://shop.example.com" -search_steps = [ - "Type 'laptop' in search input box", - "Click on search button", - "Wait for results to load", - "Click on first product", -] - -try: - result = tool_with_product_schema.invoke( - { - "url": ecommerce_url, - "steps": search_steps, - "ai_extraction": True, - "user_prompt": "Extract product information including name, price, description, availability, and rating", - "use_session": True, - } - ) - print("Product Info Result:") - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") - -print("\n" + "=" * 50 + "\n") - -print("=== Example 3: Login Process with Schema ===") -tool_with_login_schema = AgenticScraperTool(llm_output_schema=LoginResult) - -login_url = "https://example.com/login" -login_steps = [ - "Type 'user@example.com' in email input box", - "Type 'password123' in password input box", - "Click on login button", - "Wait for response", -] - -try: - result = tool_with_login_schema.invoke( - { - "url": login_url, - "steps": login_steps, - "ai_extraction": True, - "user_prompt": "Determine if login was successful and extract any error messages or redirect URLs", - "use_session": True, - } - ) - print("Login Result:") - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") - -print("\n" + "=" * 50 + "\n") - -# Example 4: Using dictionary schema instead of Pydantic model -print("=== Example 4: Dictionary Schema ===") -tool_with_dict_schema = AgenticScraperTool() - -# Define schema as a dictionary -news_schema = { - "news_article": { - "type": "object", - "properties": { - "headline": {"type": "string"}, - "author": {"type": "string"}, - "publish_date": {"type": "string"}, - "content_summary": {"type": "string"}, - "tags": {"type": "array", "items": {"type": "string"}}, - }, - } -} - -news_url = "https://news.example.com" -news_steps = [ - "Navigate to latest news section", - "Click on first article", - "Wait for page to load", -] - -try: - result = tool_with_dict_schema.invoke( - { - "url": news_url, - "steps": news_steps, - "ai_extraction": True, - "user_prompt": "Extract article headline, author, publish date, content summary, and tags", - "output_schema": news_schema, - "use_session": True, - } - ) - print("News Article Result:") - print(json.dumps(result, indent=2)) -except Exception as e: - print(f"Error: {e}") diff --git a/examples/crawl_tool.py b/examples/crawl_tool.py new file mode 100644 index 0000000..dbd3c10 --- /dev/null +++ b/examples/crawl_tool.py @@ -0,0 +1,30 @@ +"""Crawl Tool Example - LangChain Tool (v2 API) + +Replaces the old SmartCrawler tool. Uses namespaced crawl operations. +""" + +import time + +from langchain_scrapegraph.tools import CrawlStartTool, CrawlStatusTool + +# Initialize tools +start_tool = CrawlStartTool() +status_tool = CrawlStatusTool() + +# Start a crawl job +result = start_tool.invoke( + { + "url": "https://example.com", + "depth": 2, + "max_pages": 5, + "format": "markdown", + } +) +print("Crawl started:", result) + +# Check status +crawl_id = result.get("id") +if crawl_id: + time.sleep(5) + status = status_tool.invoke({"crawl_id": crawl_id}) + print("Crawl status:", status) diff --git a/examples/extract_tool.py b/examples/extract_tool.py new file mode 100644 index 0000000..827a6b8 --- /dev/null +++ b/examples/extract_tool.py @@ -0,0 +1,18 @@ +"""Extract Tool Example - LangChain Tool (v2 API) + +Replaces the old SmartScraper tool. Extracts structured data from webpages using AI. +""" + +from langchain_scrapegraph.tools import ExtractTool + +# Initialize the tool +tool = ExtractTool() + +# Basic extraction +result = tool.invoke( + { + "url": "https://example.com", + "prompt": "Extract the main heading and first paragraph", + } +) +print(result) diff --git a/examples/extract_tool_schema.py b/examples/extract_tool_schema.py new file mode 100644 index 0000000..b02e0c7 --- /dev/null +++ b/examples/extract_tool_schema.py @@ -0,0 +1,22 @@ +"""Extract Tool with Schema Example - LangChain Tool (v2 API)""" + +from pydantic import BaseModel, Field + +from langchain_scrapegraph.tools import ExtractTool + + +class WebsiteInfo(BaseModel): + title: str = Field(description="The main title of the page") + description: str = Field(description="The main description") + + +# Initialize with output schema +tool = ExtractTool(llm_output_schema=WebsiteInfo) + +result = tool.invoke( + { + "url": "https://example.com", + "prompt": "Extract the title and description", + } +) +print(result) diff --git a/examples/monitor_tool.py b/examples/monitor_tool.py new file mode 100644 index 0000000..f79f138 --- /dev/null +++ b/examples/monitor_tool.py @@ -0,0 +1,25 @@ +"""Monitor Tool Example - LangChain Tool (v2 API) + +Replaces the old Scheduled Jobs tools. Uses the monitor namespace. +""" + +from langchain_scrapegraph.tools import MonitorCreateTool, MonitorListTool + +# Initialize tools +create_tool = MonitorCreateTool() +list_tool = MonitorListTool() + +# Create a monitor +result = create_tool.invoke( + { + "name": "Price Monitor", + "url": "https://example.com/products", + "prompt": "Extract current product prices", + "cron": "0 9 * * *", # Daily at 9 AM + } +) +print("Monitor created:", result) + +# List all monitors +monitors = list_tool.invoke({}) +print("All monitors:", monitors) diff --git a/examples/scheduled_jobs_client.py b/examples/scheduled_jobs_client.py deleted file mode 100644 index 4514e04..0000000 --- a/examples/scheduled_jobs_client.py +++ /dev/null @@ -1,162 +0,0 @@ -#!/usr/bin/env python3 -"""Scheduled Jobs Example - Direct Client Usage""" - -import os - -from scrapegraph_py import Client - - -def main(): - client = Client.from_env() - - print("πŸš€ ScrapeGraph AI Scheduled Jobs Example") - print("=" * 50) - - try: - print("\nπŸ“… Creating a scheduled SmartScraper job...") - - smartscraper_config = { - "website_url": "https://example.com", - "user_prompt": "Extract the main heading and description from the page", - } - - job = client.create_scheduled_job( - job_name="Daily Example Scraping", - service_type="smartscraper", - cron_expression="0 9 * * *", - job_config=smartscraper_config, - is_active=True, - ) - - job_id = job["id"] - print(f"βœ… Created job: {job['job_name']} (ID: {job_id})") - print(f" Next run: {job.get('next_run_at', 'Not scheduled')}") - - print("\nπŸ“… Creating a scheduled SearchScraper job...") - - searchscraper_config = { - "user_prompt": "Find the latest news about artificial intelligence", - "num_results": 5, - } - - search_job = client.create_scheduled_job( - job_name="Weekly AI News Search", - service_type="searchscraper", - cron_expression="0 10 * * 1", - job_config=searchscraper_config, - is_active=True, - ) - - search_job_id = search_job["id"] - print(f"βœ… Created job: {search_job['job_name']} (ID: {search_job_id})") - - print("\nπŸ“‹ Listing all scheduled jobs...") - - jobs_response = client.get_scheduled_jobs(page=1, page_size=10) - jobs = jobs_response["jobs"] - - print(f"Found {jobs_response['total']} total jobs:") - for job in jobs: - status = "🟒 Active" if job["is_active"] else "πŸ”΄ Inactive" - print(f" - {job['job_name']} ({job['service_type']}) - {status}") - print(f" Schedule: {job['cron_expression']}") - if job.get("next_run_at"): - print(f" Next run: {job['next_run_at']}") - - print(f"\nπŸ” Getting details for job {job_id}...") - - job_details = client.get_scheduled_job(job_id) - print(f"Job Name: {job_details['job_name']}") - print(f"Service Type: {job_details['service_type']}") - print(f"Created: {job_details['created_at']}") - print(f"Active: {job_details['is_active']}") - - print("\nπŸ“ Updating job schedule...") - - updated_job = client.update_scheduled_job( - job_id=job_id, - cron_expression="0 8 * * *", - job_name="Daily Example Scraping (Updated)", - ) - - print(f"βœ… Updated job: {updated_job['job_name']}") - print(f" New schedule: {updated_job['cron_expression']}") - - print(f"\n⏸️ Pausing job {job_id}...") - - pause_result = client.pause_scheduled_job(job_id) - print(f"βœ… {pause_result['message']}") - print(f" Job is now: {'Active' if pause_result['is_active'] else 'Paused'}") - - print(f"\n▢️ Resuming job {job_id}...") - - resume_result = client.resume_scheduled_job(job_id) - print(f"βœ… {resume_result['message']}") - print(f" Job is now: {'Active' if resume_result['is_active'] else 'Paused'}") - if resume_result.get("next_run_at"): - print(f" Next run: {resume_result['next_run_at']}") - - print(f"\nπŸš€ Manually triggering job {job_id}...") - - trigger_result = client.trigger_scheduled_job(job_id) - print(f"βœ… {trigger_result['message']}") - print(f" Execution ID: {trigger_result['execution_id']}") - print(f" Triggered at: {trigger_result['triggered_at']}") - - print(f"\nπŸ“Š Getting execution history for job {job_id}...") - - executions_response = client.get_job_executions( - job_id=job_id, page=1, page_size=5 - ) - - executions = executions_response["executions"] - print(f"Found {executions_response['total']} total executions:") - - for execution in executions: - status_emoji = { - "completed": "βœ…", - "failed": "❌", - "running": "πŸ”„", - "pending": "⏳", - }.get(execution["status"], "❓") - - print(f" {status_emoji} {execution['status'].upper()}") - print(f" Started: {execution['started_at']}") - if execution.get("completed_at"): - print(f" Completed: {execution['completed_at']}") - if execution.get("credits_used"): - print(f" Credits used: {execution['credits_used']}") - - print("\nπŸ”§ Filtering jobs by service type (smartscraper)...") - - filtered_jobs = client.get_scheduled_jobs( - service_type="smartscraper", is_active=True - ) - - print(f"Found {filtered_jobs['total']} active SmartScraper jobs:") - for job in filtered_jobs["jobs"]: - print(f" - {job['job_name']} (Schedule: {job['cron_expression']})") - - print("\nπŸ—‘οΈ Cleaning up - deleting created jobs...") - - delete_result1 = client.delete_scheduled_job(job_id) - print(f"βœ… {delete_result1['message']} (Job 1)") - - delete_result2 = client.delete_scheduled_job(search_job_id) - print(f"βœ… {delete_result2['message']} (Job 2)") - - print("\nπŸŽ‰ Scheduled jobs example completed successfully!") - - except Exception as e: - print(f"\n❌ Error: {str(e)}") - raise - - finally: - client.close() - - -if __name__ == "__main__": - if os.getenv("SGAI_MOCK", "0").lower() in ["1", "true", "yes"]: - print("πŸ§ͺ Running in MOCK mode - no real API calls will be made") - - main() diff --git a/examples/scheduled_jobs_tool.py b/examples/scheduled_jobs_tool.py deleted file mode 100644 index 110d904..0000000 --- a/examples/scheduled_jobs_tool.py +++ /dev/null @@ -1,190 +0,0 @@ -#!/usr/bin/env python3 -"""Scheduled Jobs Example - LangChain Tools""" - -import time - -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import ( - CreateScheduledJobTool, - DeleteScheduledJobTool, - GetJobExecutionsTool, - GetScheduledJobsTool, - GetScheduledJobTool, - PauseScheduledJobTool, - ResumeScheduledJobTool, - TriggerScheduledJobTool, - UpdateScheduledJobTool, -) - -sgai_logger.set_logging(level="INFO") - - -def main(): - print("πŸš€ ScrapeGraph AI Scheduled Jobs Example - LangChain Tools") - print("=" * 60) - - # Initialize tools - create_job_tool = CreateScheduledJobTool() - get_jobs_tool = GetScheduledJobsTool() - get_job_tool = GetScheduledJobTool() - update_job_tool = UpdateScheduledJobTool() - pause_job_tool = PauseScheduledJobTool() - resume_job_tool = ResumeScheduledJobTool() - trigger_job_tool = TriggerScheduledJobTool() - get_executions_tool = GetJobExecutionsTool() - delete_job_tool = DeleteScheduledJobTool() - - try: - print("\nπŸ“… Creating a scheduled SmartScraper job...") - - smartscraper_config = { - "website_url": "https://example.com", - "user_prompt": "Extract the main heading and description from the page", - } - - job = create_job_tool.invoke( - { - "job_name": "Daily Example Scraping", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "job_config": smartscraper_config, - "is_active": True, - } - ) - - job_id = job["id"] - print(f"βœ… Created job: {job['job_name']} (ID: {job_id})") - print(f" Next run: {job.get('next_run_at', 'Not scheduled')}") - - print("\nπŸ“… Creating a scheduled SearchScraper job...") - - searchscraper_config = { - "user_prompt": "Find the latest news about artificial intelligence", - "num_results": 5, - } - - search_job = create_job_tool.invoke( - { - "job_name": "Weekly AI News Search", - "service_type": "searchscraper", - "cron_expression": "0 10 * * 1", - "job_config": searchscraper_config, - "is_active": True, - } - ) - - search_job_id = search_job["id"] - print(f"βœ… Created job: {search_job['job_name']} (ID: {search_job_id})") - - print("\nπŸ“‹ Listing all scheduled jobs...") - - jobs_response = get_jobs_tool.invoke({"page": 1, "page_size": 10}) - jobs = jobs_response["jobs"] - - print(f"Found {jobs_response['total']} total jobs:") - for job_item in jobs: - status = "🟒 Active" if job_item["is_active"] else "πŸ”΄ Inactive" - print(f" - {job_item['job_name']} ({job_item['service_type']}) - {status}") - print(f" Schedule: {job_item['cron_expression']}") - if job_item.get("next_run_at"): - print(f" Next run: {job_item['next_run_at']}") - - print(f"\nπŸ” Getting details for job {job_id}...") - - job_details = get_job_tool.invoke({"job_id": job_id}) - print(f"Job Name: {job_details['job_name']}") - print(f"Service Type: {job_details['service_type']}") - print(f"Created: {job_details['created_at']}") - print(f"Active: {job_details['is_active']}") - - print("\nπŸ“ Updating job schedule...") - - updated_job = update_job_tool.invoke( - { - "job_id": job_id, - "cron_expression": "0 8 * * *", - "job_name": "Daily Example Scraping (Updated)", - } - ) - - print(f"βœ… Updated job: {updated_job['job_name']}") - print(f" New schedule: {updated_job['cron_expression']}") - - print(f"\n⏸️ Pausing job {job_id}...") - - pause_result = pause_job_tool.invoke({"job_id": job_id}) - print(f"βœ… {pause_result['message']}") - print(f" Job is now: {'Active' if pause_result['is_active'] else 'Paused'}") - - print(f"\n▢️ Resuming job {job_id}...") - - resume_result = resume_job_tool.invoke({"job_id": job_id}) - print(f"βœ… {resume_result['message']}") - print(f" Job is now: {'Active' if resume_result['is_active'] else 'Paused'}") - if resume_result.get("next_run_at"): - print(f" Next run: {resume_result['next_run_at']}") - - print(f"\nπŸš€ Manually triggering job {job_id}...") - - trigger_result = trigger_job_tool.invoke({"job_id": job_id}) - print(f"βœ… {trigger_result['message']}") - print(f" Execution ID: {trigger_result['execution_id']}") - print(f" Triggered at: {trigger_result['triggered_at']}") - - # Wait a moment for the execution to potentially start - time.sleep(2) - - print(f"\nπŸ“Š Getting execution history for job {job_id}...") - - executions_response = get_executions_tool.invoke( - {"job_id": job_id, "page": 1, "page_size": 5} - ) - - executions = executions_response["executions"] - print(f"Found {executions_response['total']} total executions:") - - for execution in executions: - status_emoji = { - "completed": "βœ…", - "failed": "❌", - "running": "πŸ”„", - "pending": "⏳", - }.get(execution["status"], "❓") - - print(f" {status_emoji} {execution['status'].upper()}") - print(f" Started: {execution['started_at']}") - if execution.get("completed_at"): - print(f" Completed: {execution['completed_at']}") - if execution.get("credits_used"): - print(f" Credits used: {execution['credits_used']}") - - print("\nπŸ”§ Filtering jobs by service type (smartscraper)...") - - filtered_jobs = get_jobs_tool.invoke( - {"service_type": "smartscraper", "is_active": True} - ) - - print(f"Found {filtered_jobs['total']} active SmartScraper jobs:") - for job_item in filtered_jobs["jobs"]: - print( - f" - {job_item['job_name']} (Schedule: {job_item['cron_expression']})" - ) - - print("\nπŸ—‘οΈ Cleaning up - deleting created jobs...") - - delete_result1 = delete_job_tool.invoke({"job_id": job_id}) - print(f"βœ… {delete_result1['message']} (Job 1)") - - delete_result2 = delete_job_tool.invoke({"job_id": search_job_id}) - print(f"βœ… {delete_result2['message']} (Job 2)") - - print("\nπŸŽ‰ Scheduled jobs example completed successfully!") - - except Exception as e: - print(f"\n❌ Error: {str(e)}") - raise - - -if __name__ == "__main__": - main() diff --git a/examples/scrape_client.py b/examples/scrape_client.py deleted file mode 100644 index a4e6444..0000000 --- a/examples/scrape_client.py +++ /dev/null @@ -1,242 +0,0 @@ -#!/usr/bin/env python3 -""" -Example demonstrating how to use the Scrape API with the scrapegraph-py SDK. - -This example shows how to: -1. Set up the client for Scrape -2. Make the API call to get HTML content from a website -3. Handle the response and save the HTML content -4. Demonstrate both regular and heavy JS rendering modes -5. Display the results and metadata - -Requirements: -- Python 3.7+ -- scrapegraph-py -- python-dotenv -- A .env file with your SGAI_API_KEY - -Example .env file: -SGAI_API_KEY=your_api_key_here -""" - -import time -from pathlib import Path -from typing import Optional - -from scrapegraph_py import Client - - -def scrape_website( - client: Client, - website_url: str, - render_heavy_js: bool = False, - headers: Optional[dict[str, str]] = None, -) -> dict: - """ - Get HTML content from a website using the Scrape API. - - Args: - client: The scrapegraph-py client instance - website_url: The URL of the website to get HTML from - render_heavy_js: Whether to render heavy JavaScript (defaults to False) - headers: Optional headers to send with the request - - Returns: - dict: A dictionary containing the HTML content and metadata - - Raises: - Exception: If the API request fails - """ - js_mode = "with heavy JS rendering" if render_heavy_js else "without JS rendering" - print(f"Getting HTML content from: {website_url}") - print(f"Mode: {js_mode}") - - start_time = time.time() - - try: - result = client.scrape( - website_url=website_url, - render_heavy_js=render_heavy_js, - headers=headers, - ) - execution_time = time.time() - start_time - print(f"Execution time: {execution_time:.2f} seconds") - return result - except Exception as e: - print(f"Error: {str(e)}") - raise - - -def save_html_content( - html_content: str, filename: str, output_dir: str = "scrape_output" -): - """ - Save HTML content to a file. - - Args: - html_content: The HTML content to save - filename: The name of the file (without extension) - output_dir: The directory to save the file in - """ - # Create output directory if it doesn't exist - output_path = Path(output_dir) - output_path.mkdir(exist_ok=True) - - # Save HTML file - html_file = output_path / f"{filename}.html" - with open(html_file, "w", encoding="utf-8") as f: - f.write(html_content) - - print(f"HTML content saved to: {html_file}") - return html_file - - -def analyze_html_content(html_content: str) -> dict: - """ - Analyze HTML content and provide basic statistics. - - Args: - html_content: The HTML content to analyze - - Returns: - dict: Basic statistics about the HTML content - """ - stats = { - "total_length": len(html_content), - "lines": len(html_content.splitlines()), - "has_doctype": html_content.strip().startswith(" dict: - """ - Analyze HTML content and provide basic statistics. - - Args: - html_content: The HTML content to analyze - - Returns: - dict: Basic statistics about the HTML content - """ - stats = { - "total_length": len(html_content), - "lines": len(html_content.splitlines()), - "has_doctype": html_content.strip().startswith(" 500 else ''}" - ) - - if "reference_urls" in response: - print(f"\nπŸ”— References: {len(response.get('reference_urls', []))}") - print("\nπŸ”— Reference URLs:") - for i, url in enumerate(response.get("reference_urls", []), 1): - print(f" {i}. {url}") - - return True - - except Exception as e: - print(f"❌ Error: {str(e)}") - return False - - finally: - # Close the client - client.close() - - -if __name__ == "__main__": - success = main() - exit(0 if success else 1) diff --git a/examples/searchscraper_tool.py b/examples/searchscraper_tool.py deleted file mode 100644 index a14d562..0000000 --- a/examples/searchscraper_tool.py +++ /dev/null @@ -1,16 +0,0 @@ -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SearchScraperTool - -sgai_logger.set_logging(level="INFO") - -# Will automatically get SGAI_API_KEY from environment -tool = SearchScraperTool() - -# Example prompt -user_prompt = "What are the key features and pricing of ChatGPT Plus?" - -# Use the tool -result = tool.invoke({"user_prompt": user_prompt}) - -print("\nResult:", result) diff --git a/examples/searchscraper_tool_schema.py b/examples/searchscraper_tool_schema.py deleted file mode 100644 index 9ada05e..0000000 --- a/examples/searchscraper_tool_schema.py +++ /dev/null @@ -1,41 +0,0 @@ -from typing import Dict, List - -from pydantic import BaseModel, Field -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SearchScraperTool - - -class Feature(BaseModel): - name: str = Field(description="Name of the feature") - description: str = Field(description="Description of the feature") - - -class PricingPlan(BaseModel): - name: str = Field(description="Name of the pricing plan") - price: Dict[str, str] = Field( - description="Price details including amount, currency, and period" - ) - features: List[str] = Field(description="List of features included in the plan") - - -class ProductInfo(BaseModel): - name: str = Field(description="Name of the product") - description: str = Field(description="Description of the product") - features: List[Feature] = Field(description="List of product features") - pricing: Dict[str, List[PricingPlan]] = Field(description="Pricing information") - reference_urls: List[str] = Field(description="Source URLs for the information") - - -sgai_logger.set_logging(level="INFO") - -# Initialize with Pydantic model class -tool = SearchScraperTool(llm_output_schema=ProductInfo) - -# Example prompt -user_prompt = "What are the key features and pricing of ChatGPT Plus?" - -# Use the tool - output will conform to ProductInfo schema -result = tool.invoke({"user_prompt": user_prompt}) - -print("\nResult:", result) diff --git a/examples/smartcrawler_tool.py b/examples/smartcrawler_tool.py deleted file mode 100644 index 5f7384e..0000000 --- a/examples/smartcrawler_tool.py +++ /dev/null @@ -1,30 +0,0 @@ -import json - -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SmartCrawlerTool - -sgai_logger.set_logging(level="INFO") - -# Will automatically get SGAI_API_KEY from environment -tool = SmartCrawlerTool() - -# Example based on the provided code snippet -url = "https://scrapegraphai.com/" -prompt = ( - "What does the company do? and I need text content from their privacy and terms" -) - -# Use the tool with crawling parameters -result = tool.invoke( - { - "url": url, - "prompt": prompt, - "cache_website": True, - "depth": 2, - "max_pages": 2, - "same_domain_only": True, - } -) - -print(json.dumps(result, indent=2)) diff --git a/examples/smartcrawler_tool_schema.py b/examples/smartcrawler_tool_schema.py deleted file mode 100644 index 80dda5f..0000000 --- a/examples/smartcrawler_tool_schema.py +++ /dev/null @@ -1,46 +0,0 @@ -import json - -from pydantic import BaseModel, Field -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SmartCrawlerTool - -sgai_logger.set_logging(level="INFO") - - -# Define the output schema -class CompanyInfo(BaseModel): - company_description: str = Field(description="What the company does") - privacy_policy: str = Field(description="Privacy policy content") - terms_of_service: str = Field(description="Terms of service content") - - -# Initialize the tool with the schema -tool = SmartCrawlerTool(llm_output_schema=CompanyInfo) - -# Example crawling with structured output -url = "https://scrapegraphai.com/" -prompt = ( - "What does the company do? and I need text content from their privacy and terms" -) - -# Use the tool with crawling parameters and structured output -result = tool.invoke( - { - "url": url, - "prompt": prompt, - "cache_website": True, - "depth": 2, - "max_pages": 2, - "same_domain_only": True, - } -) - -print(json.dumps(result, indent=2)) - -# The output will be structured according to the CompanyInfo schema: -# { -# "company_description": "...", -# "privacy_policy": "...", -# "terms_of_service": "..." -# } diff --git a/examples/smartscraper_js_example.py b/examples/smartscraper_js_example.py deleted file mode 100644 index 1dac5c9..0000000 --- a/examples/smartscraper_js_example.py +++ /dev/null @@ -1,86 +0,0 @@ -#!/usr/bin/env python3 -""" -SmartScraper Heavy JavaScript Example - -This example demonstrates how to use SmartScraper with render_heavy_js enabled -for scraping JavaScript-heavy websites that require full browser rendering. - -Features demonstrated: -- SmartScraper with heavy JavaScript rendering -- Basic error handling -- Environment variable configuration -- Simple API usage pattern - -Requirements: -- A .env file with your SGAI_API_KEY - -Example .env file: -SGAI_API_KEY=your_api_key_here -""" - -import os - -from dotenv import load_dotenv -from scrapegraph_py import Client -from scrapegraph_py.logger import sgai_logger - -# Load environment variables from .env file -load_dotenv() - -sgai_logger.set_logging(level="INFO") - - -def main(): - """Run a SmartScraper example with heavy JavaScript rendering.""" - print("🌐 SmartScraper Heavy JavaScript Example") - print("=" * 50) - - # Initialize the client with API key from environment variable - api_key = os.getenv("SGAI_API_KEY") - if not api_key: - print("❌ Error: SGAI_API_KEY environment variable not set") - print("Please either:") - print(" 1. Set environment variable: export SGAI_API_KEY='your-api-key-here'") - print(" 2. Create a .env file with: SGAI_API_KEY=your-api-key-here") - return False - - client = Client(api_key=api_key) - - try: - # Configuration - website_url = "https://example.com" # Replace with your target URL - user_prompt = "Find the CEO of company X and their contact details" - - print(f"πŸ”— Target URL: {website_url}") - print(f"πŸ“ Query: {user_prompt}") - print("πŸ”§ Mode: Heavy JavaScript rendering enabled") - - # SmartScraper request with render_heavy_js enabled - response = client.smartscraper( - website_url=website_url, - user_prompt=user_prompt, - render_heavy_js=True, # Enable heavy JavaScript rendering - ) - - print("\nβœ… SmartScraper completed successfully!") - print(f"πŸ“„ Request ID: {response.get('request_id', 'N/A')}") - - # Display the results - if "result" in response: - print("\nπŸ“ Extracted Information:") - print(response["result"]) - - return True - - except Exception as e: - print(f"❌ Error: {str(e)}") - return False - - finally: - # Close the client - client.close() - - -if __name__ == "__main__": - success = main() - exit(0 if success else 1) diff --git a/examples/smartscraper_tool.py b/examples/smartscraper_tool.py deleted file mode 100644 index 9f31ba1..0000000 --- a/examples/smartscraper_tool.py +++ /dev/null @@ -1,17 +0,0 @@ -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SmartScraperTool - -sgai_logger.set_logging(level="INFO") - -# Will automatically get SGAI_API_KEY from environment -tool = SmartScraperTool() - -# Example website and prompt -website_url = "https://www.example.com" -user_prompt = "Extract the main heading and first paragraph from this webpage" - -# Use the tool -result = tool.invoke({"website_url": website_url, "user_prompt": user_prompt}) - -print(result) diff --git a/examples/smartscraper_tool_schema.py b/examples/smartscraper_tool_schema.py deleted file mode 100644 index 3220881..0000000 --- a/examples/smartscraper_tool_schema.py +++ /dev/null @@ -1,47 +0,0 @@ -from typing import List - -from pydantic import BaseModel, Field -from scrapegraph_py.logger import sgai_logger - -from langchain_scrapegraph.tools import SmartScraperTool - - -class WebsiteInfo(BaseModel): - title: str = Field(description="The main title of the webpage") - description: str = Field(description="The main description or first paragraph") - urls: List[str] = Field(description="The URLs inside the webpage") - - -sgai_logger.set_logging(level="INFO") - -# Initialize with Pydantic model class -tool = SmartScraperTool(llm_output_schema=WebsiteInfo) - -# Example 1: Using website URL -website_url = "https://www.example.com" -user_prompt = "Extract info about the website" - -# Use the tool with URL -result_url = tool.invoke({"website_url": website_url, "user_prompt": user_prompt}) -print("\nResult from URL:", result_url) - -# Example 2: Using HTML content directly -html_content = """ - - -

Example Domain

-

This domain is for use in illustrative examples.

- More information... - - -""" - -# Use the tool with HTML content -result_html = tool.invoke( - { - "website_url": website_url, # Still required but will be overridden - "website_html": html_content, - "user_prompt": user_prompt, - } -) -print("\nResult from HTML:", result_html) diff --git a/langchain_scrapegraph/__init__.py b/langchain_scrapegraph/__init__.py index 30767d1..f45870f 100644 --- a/langchain_scrapegraph/__init__.py +++ b/langchain_scrapegraph/__init__.py @@ -1,3 +1,3 @@ """langchain-scrapegraph package.""" -__version__ = "1.10.0" +__version__ = "2.0.0" diff --git a/langchain_scrapegraph/tools/__init__.py b/langchain_scrapegraph/tools/__init__.py index edc9e53..f956e8b 100644 --- a/langchain_scrapegraph/tools/__init__.py +++ b/langchain_scrapegraph/tools/__init__.py @@ -1,37 +1,34 @@ -from .agentic_scraper import AgenticScraperTool +from .crawl import CrawlResumeTool, CrawlStartTool, CrawlStatusTool, CrawlStopTool from .credits import GetCreditsTool +from .extract import ExtractTool +from .history import HistoryTool from .markdownify import MarkdownifyTool -from .scheduled_jobs import ( - CreateScheduledJobTool, - DeleteScheduledJobTool, - GetJobExecutionsTool, - GetScheduledJobsTool, - GetScheduledJobTool, - PauseScheduledJobTool, - ResumeScheduledJobTool, - TriggerScheduledJobTool, - UpdateScheduledJobTool, +from .monitor import ( + MonitorCreateTool, + MonitorDeleteTool, + MonitorGetTool, + MonitorListTool, + MonitorPauseTool, + MonitorResumeTool, ) from .scrape import ScrapeTool -from .searchscraper import SearchScraperTool -from .smartcrawler import SmartCrawlerTool -from .smartscraper import SmartScraperTool +from .search import SearchTool __all__ = [ - "AgenticScraperTool", - "CreateScheduledJobTool", - "DeleteScheduledJobTool", + "CrawlResumeTool", + "CrawlStartTool", + "CrawlStatusTool", + "CrawlStopTool", + "ExtractTool", "GetCreditsTool", - "GetJobExecutionsTool", - "GetScheduledJobsTool", - "GetScheduledJobTool", + "HistoryTool", "MarkdownifyTool", - "PauseScheduledJobTool", - "ResumeScheduledJobTool", + "MonitorCreateTool", + "MonitorDeleteTool", + "MonitorGetTool", + "MonitorListTool", + "MonitorPauseTool", + "MonitorResumeTool", "ScrapeTool", - "SearchScraperTool", - "SmartCrawlerTool", - "SmartScraperTool", - "TriggerScheduledJobTool", - "UpdateScheduledJobTool", + "SearchTool", ] diff --git a/langchain_scrapegraph/tools/agentic_scraper.py b/langchain_scrapegraph/tools/agentic_scraper.py deleted file mode 100644 index 397dec2..0000000 --- a/langchain_scrapegraph/tools/agentic_scraper.py +++ /dev/null @@ -1,256 +0,0 @@ -# Models for agentic scraper endpoint - -from typing import Any, Dict, List, Optional, Type -from uuid import UUID - -from langchain_core.callbacks import ( - AsyncCallbackManagerForToolRun, - CallbackManagerForToolRun, -) -from langchain_core.tools import BaseTool -from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, Field, model_validator -from scrapegraph_py import Client - - -class AgenticScraperRequest(BaseModel): - url: str = Field( - ..., - example="https://dashboard.scrapegraphai.com/", - description="The URL to scrape", - ) - use_session: bool = Field( - default=True, description="Whether to use session for the scraping" - ) - steps: List[str] = Field( - ..., - example=[ - "Type email@gmail.com in email input box", - "Type test-password@123 in password inputbox", - "click on login", - ], - description="List of steps to perform on the webpage", - ) - user_prompt: Optional[str] = Field( - default=None, - example="Extract user information and available dashboard sections", - description="Prompt for AI extraction (only used when ai_extraction=True)", - ) - output_schema: Optional[Dict[str, Any]] = Field( - default=None, - example={ - "user_info": { - "type": "object", - "properties": { - "username": {"type": "string"}, - "email": {"type": "string"}, - "dashboard_sections": { - "type": "array", - "items": {"type": "string"}, - }, - }, - } - }, - description="Schema for structured data extraction (only used when ai_extraction=True)", - ) - ai_extraction: bool = Field( - default=False, - description="Whether to use AI for data extraction from the scraped content", - ) - - @model_validator(mode="after") - def validate_url(self) -> "AgenticScraperRequest": - if not self.url.strip(): - raise ValueError("URL cannot be empty") - if not (self.url.startswith("http://") or self.url.startswith("https://")): - raise ValueError("Invalid URL - must start with http:// or https://") - return self - - @model_validator(mode="after") - def validate_steps(self) -> "AgenticScraperRequest": - if not self.steps: - raise ValueError("Steps cannot be empty") - if any(not step.strip() for step in self.steps): - raise ValueError("All steps must contain valid instructions") - return self - - @model_validator(mode="after") - def validate_ai_extraction(self) -> "AgenticScraperRequest": - if self.ai_extraction: - if not self.user_prompt or not self.user_prompt.strip(): - raise ValueError("user_prompt is required when ai_extraction=True") - return self - - def model_dump(self, *args, **kwargs) -> dict: - # Set exclude_none=True to exclude None values from serialization - kwargs.setdefault("exclude_none", True) - return super().model_dump(*args, **kwargs) - - -class GetAgenticScraperRequest(BaseModel): - """Request model for get_agenticscraper endpoint""" - - request_id: str = Field(..., example="123e4567-e89b-12d3-a456-426614174000") - - @model_validator(mode="after") - def validate_request_id(self) -> "GetAgenticScraperRequest": - try: - # Validate the request_id is a valid UUID - UUID(self.request_id) - except ValueError: - raise ValueError("request_id must be a valid UUID") - return self - - -class AgenticScraperTool(BaseTool): - """Tool for performing agentic web scraping using ScrapeGraph AI. - - This tool allows you to define a series of steps to perform on a webpage, - such as filling forms, clicking buttons, and extracting data. - - Setup: - Install ``langchain-scrapegraph`` python package: - - .. code-block:: bash - - pip install langchain-scrapegraph - - Get your API key from ScrapeGraph AI (https://scrapegraphai.com) - and set it as an environment variable: - - .. code-block:: bash - - export SGAI_API_KEY="your-api-key" - - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - llm_output_schema: Optional Pydantic model or dictionary schema to structure the output. - If provided, the tool will ensure the output conforms to this schema. - - Instantiate: - .. code-block:: python - - from langchain_scrapegraph.tools import AgenticScraperTool - - # Will automatically get SGAI_API_KEY from environment - tool = AgenticScraperTool() - - # Or provide API key directly - tool = AgenticScraperTool(api_key="your-api-key") - - Use the tool: - .. code-block:: python - - # Basic usage with steps - result = tool.invoke({ - "url": "https://example.com/login", - "steps": [ - "Type 'user@example.com' in email input box", - "Type 'password123' in password input box", - "Click on login button" - ] - }) - - # With AI extraction - result = tool.invoke({ - "url": "https://dashboard.example.com", - "steps": [ - "Navigate to user profile section", - "Click on settings tab" - ], - "ai_extraction": True, - "user_prompt": "Extract user profile information and available settings", - "output_schema": { - "user_info": { - "type": "object", - "properties": { - "username": {"type": "string"}, - "email": {"type": "string"}, - "settings": {"type": "array", "items": {"type": "string"}} - } - } - } - }) - - """ - - name: str = "agentic_scraper" - description: str = ( - "Perform agentic web scraping by executing a series of steps on a webpage. " - "Supports form filling, button clicking, navigation, and AI-powered data extraction." - ) - args_schema: Type[BaseModel] = AgenticScraperRequest - return_direct: bool = False - - api_key: Optional[str] = Field(default=None, description="ScrapeGraph AI API key") - client: Optional[Client] = Field( - default=None, description="ScrapeGraph client instance" - ) - llm_output_schema: Optional[Type[BaseModel]] = Field( - default=None, description="Optional Pydantic model to structure the output" - ) - - def __init__(self, **kwargs): - super().__init__(**kwargs) - self.api_key = get_from_dict_or_env(kwargs, "api_key", "SGAI_API_KEY") - if not self.client: - self.client = Client(api_key=self.api_key) - - def _run( - self, - url: str, - steps: List[str], - use_session: bool = True, - user_prompt: Optional[str] = None, - output_schema: Optional[Dict[str, Any]] = None, - ai_extraction: bool = False, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Run the agentic scraper tool.""" - try: - # Prepare the request payload - payload = { - "url": url, - "use_session": use_session, - "steps": steps, - "ai_extraction": ai_extraction, - } - - if ai_extraction and user_prompt: - payload["user_prompt"] = user_prompt - if output_schema: - payload["output_schema"] = output_schema - - # Call the ScrapeGraph API - response = self.client.agentic_scraper(**payload) - - return response - - except Exception as e: - if run_manager: - run_manager.on_tool_error(e, tool_name=self.name) - raise e - - async def _arun( - self, - url: str, - steps: List[str], - use_session: bool = True, - user_prompt: Optional[str] = None, - output_schema: Optional[Dict[str, Any]] = None, - ai_extraction: bool = False, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Run the agentic scraper tool asynchronously.""" - # For now, just call the sync version - # In a real implementation, you might want to use async HTTP client - return self._run( - url=url, - steps=steps, - use_session=use_session, - user_prompt=user_prompt, - output_schema=output_schema, - ai_extraction=ai_extraction, - run_manager=run_manager, - ) diff --git a/langchain_scrapegraph/tools/crawl.py b/langchain_scrapegraph/tools/crawl.py new file mode 100644 index 0000000..b9698e5 --- /dev/null +++ b/langchain_scrapegraph/tools/crawl.py @@ -0,0 +1,216 @@ +from typing import Any, Dict, List, Optional, Type + +from langchain_core.callbacks import ( + AsyncCallbackManagerForToolRun, + CallbackManagerForToolRun, +) +from langchain_core.tools import BaseTool +from langchain_core.utils import get_from_dict_or_env +from pydantic import BaseModel, Field, model_validator +from scrapegraph_py import Client + + +class CrawlStartInput(BaseModel): + url: str = Field(description="The starting URL for the crawl") + depth: int = Field(default=2, description="Maximum crawl depth (1-10)") + max_pages: int = Field(default=10, description="Maximum pages to crawl (1-100)") + format: str = Field( + default="markdown", description="Output format: 'markdown' or 'html'" + ) + include_patterns: Optional[List[str]] = Field( + default=None, description="URL patterns to include" + ) + exclude_patterns: Optional[List[str]] = Field( + default=None, description="URL patterns to exclude" + ) + + +class CrawlIdInput(BaseModel): + crawl_id: str = Field(description="The crawl job ID") + + +class CrawlStartTool(BaseTool): + """Tool for starting a crawl job using ScrapeGraph AI v2 API.""" + + name: str = "CrawlStart" + description: str = ( + "Start a crawl job that will crawl multiple pages from a starting URL. " + "Returns a job ID that can be used to check status." + ) + args_schema: Type[BaseModel] = CrawlStartInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + url: str, + depth: int = 2, + max_pages: int = 10, + format: str = "markdown", + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + + response = self.client.crawl.start( + url=url, + depth=depth, + max_pages=max_pages, + format=format, + include_patterns=include_patterns, + exclude_patterns=exclude_patterns, + ) + return response + + async def _arun( + self, + url: str, + depth: int = 2, + max_pages: int = 10, + format: str = "markdown", + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + url=url, + depth=depth, + max_pages=max_pages, + format=format, + include_patterns=include_patterns, + exclude_patterns=exclude_patterns, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class CrawlStatusTool(BaseTool): + """Tool for getting the status of a crawl job.""" + + name: str = "CrawlStatus" + description: str = "Get the status and results of a crawl job by its ID." + args_schema: Type[BaseModel] = CrawlIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + crawl_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.crawl.status(crawl_id) + + async def _arun( + self, + crawl_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + crawl_id=crawl_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class CrawlStopTool(BaseTool): + """Tool for stopping a running crawl job.""" + + name: str = "CrawlStop" + description: str = "Stop a running crawl job by its ID." + args_schema: Type[BaseModel] = CrawlIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + crawl_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.crawl.stop(crawl_id) + + async def _arun( + self, + crawl_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + crawl_id=crawl_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class CrawlResumeTool(BaseTool): + """Tool for resuming a stopped crawl job.""" + + name: str = "CrawlResume" + description: str = "Resume a stopped crawl job by its ID." + args_schema: Type[BaseModel] = CrawlIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + crawl_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.crawl.resume(crawl_id) + + async def _arun( + self, + crawl_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + crawl_id=crawl_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) diff --git a/langchain_scrapegraph/tools/credits.py b/langchain_scrapegraph/tools/credits.py index d4ea94e..d6aec2c 100644 --- a/langchain_scrapegraph/tools/credits.py +++ b/langchain_scrapegraph/tools/credits.py @@ -27,32 +27,18 @@ class GetCreditsTool(BaseTool): export SGAI_API_KEY="your-api-key" - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - Instantiate: .. code-block:: python from langchain_scrapegraph.tools import GetCreditsTool - # Will automatically get SGAI_API_KEY from environment tool = GetCreditsTool() - # Or provide API key directly - tool = GetCreditsTool(api_key="your-api-key") - Use the tool: .. code-block:: python result = tool.invoke({}) - print(result) - # { - # "remaining_credits": 100, - # "total_credits_used": 50 - # } - Async usage: .. code-block:: python @@ -70,7 +56,6 @@ class GetCreditsTool(BaseTool): @model_validator(mode="before") @classmethod def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") values["client"] = Client(api_key=values["api_key"]) return values @@ -79,14 +64,12 @@ def __init__(self, **data: Any): super().__init__(**data) def _run(self, run_manager: Optional[CallbackManagerForToolRun] = None) -> dict: - """Get the available credits.""" if not self.client: raise ValueError("Client not initialized") - return self.client.get_credits() + return self.client.credits() async def _arun( self, run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> int: - """Get the available credits asynchronously.""" + ) -> dict: return self._run(run_manager=run_manager.get_sync() if run_manager else None) diff --git a/langchain_scrapegraph/tools/extract.py b/langchain_scrapegraph/tools/extract.py new file mode 100644 index 0000000..8b426fe --- /dev/null +++ b/langchain_scrapegraph/tools/extract.py @@ -0,0 +1,123 @@ +from typing import Any, Dict, Optional, Type, TypeVar + +from langchain_core.callbacks import ( + AsyncCallbackManagerForToolRun, + CallbackManagerForToolRun, +) +from langchain_core.tools import BaseTool +from langchain_core.utils import get_from_dict_or_env +from pydantic import BaseModel, Field, model_validator +from scrapegraph_py import Client + +T = TypeVar("T", bound=BaseModel) + + +class ExtractInput(BaseModel): + url: str = Field(description="URL of the webpage to extract data from") + prompt: str = Field( + description="Prompt describing what to extract from the webpage and how to structure the output" + ) + + +class ExtractTool(BaseTool): + """Tool for extracting structured data from websites using ScrapeGraph AI v2 API. + + Setup: + Install ``langchain-scrapegraph`` python package: + + .. code-block:: bash + + pip install langchain-scrapegraph + + Get your API key from ScrapeGraph AI (https://scrapegraphai.com) + and set it as an environment variable: + + .. code-block:: bash + + export SGAI_API_KEY="your-api-key" + + Key init args: + api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. + client: Optional pre-configured ScrapeGraph client instance. + llm_output_schema: Optional Pydantic model class to structure the output. + + Instantiate: + .. code-block:: python + + from langchain_scrapegraph.tools import ExtractTool + + tool = ExtractTool() + + # With output schema: + from pydantic import BaseModel, Field + + class WebsiteInfo(BaseModel): + title: str = Field(description="The main title") + description: str = Field(description="The main description") + + tool_with_schema = ExtractTool(llm_output_schema=WebsiteInfo) + + Use the tool: + .. code-block:: python + + result = tool.invoke({ + "url": "https://example.com", + "prompt": "Extract the main heading and first paragraph" + }) + + Async usage: + .. code-block:: python + + result = await tool.ainvoke({ + "url": "https://example.com", + "prompt": "Extract the main heading" + }) + """ + + name: str = "Extract" + description: str = ( + "Extract structured data from a webpage using AI, by providing a URL and an extraction prompt" + ) + args_schema: Type[BaseModel] = ExtractInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + llm_output_schema: Optional[Type[BaseModel]] = None + + @model_validator(mode="before") + @classmethod + def validate_environment(cls: Type[T], values: Dict[str, Any]) -> Dict[str, Any]: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + url: str, + prompt: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> Dict[str, Any]: + if not self.client: + raise ValueError("Client not initialized") + + response = self.client.extract( + url=url, + prompt=prompt, + output_schema=self.llm_output_schema, + ) + return response + + async def _arun( + self, + url: str, + prompt: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> Dict[str, Any]: + return self._run( + url, + prompt, + run_manager=run_manager.get_sync() if run_manager else None, + ) diff --git a/langchain_scrapegraph/tools/history.py b/langchain_scrapegraph/tools/history.py new file mode 100644 index 0000000..9acdf38 --- /dev/null +++ b/langchain_scrapegraph/tools/history.py @@ -0,0 +1,79 @@ +from typing import Any, Dict, Optional, Type + +from langchain_core.callbacks import ( + AsyncCallbackManagerForToolRun, + CallbackManagerForToolRun, +) +from langchain_core.tools import BaseTool +from langchain_core.utils import get_from_dict_or_env +from pydantic import BaseModel, Field, model_validator +from scrapegraph_py import Client + + +class HistoryInput(BaseModel): + endpoint: Optional[str] = Field( + default=None, + description="Filter by endpoint name (e.g. 'scrape', 'extract', 'search')", + ) + status: Optional[str] = Field(default=None, description="Filter by request status") + limit: Optional[int] = Field( + default=None, description="Maximum number of results (1-100)" + ) + offset: Optional[int] = Field(default=None, description="Number of results to skip") + + +class HistoryTool(BaseTool): + """Tool for retrieving request history from ScrapeGraph AI v2 API.""" + + name: str = "History" + description: str = ( + "Retrieve your ScrapeGraph AI request history with optional filters." + ) + args_schema: Type[BaseModel] = HistoryInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + endpoint: Optional[str] = None, + status: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + + return self.client.history( + endpoint=endpoint, + status=status, + limit=limit, + offset=offset, + ) + + async def _arun( + self, + endpoint: Optional[str] = None, + status: Optional[str] = None, + limit: Optional[int] = None, + offset: Optional[int] = None, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + endpoint=endpoint, + status=status, + limit=limit, + offset=offset, + run_manager=run_manager.get_sync() if run_manager else None, + ) diff --git a/langchain_scrapegraph/tools/markdownify.py b/langchain_scrapegraph/tools/markdownify.py index 4750f5b..fa4041d 100644 --- a/langchain_scrapegraph/tools/markdownify.py +++ b/langchain_scrapegraph/tools/markdownify.py @@ -17,6 +17,8 @@ class MarkdownifyInput(BaseModel): class MarkdownifyTool(BaseTool): """Tool for converting webpages to Markdown format using ScrapeGraph AI. + In v2, this uses the scrape endpoint with format='markdown'. + Setup: Install ``langchain-scrapegraph`` python package: @@ -31,21 +33,13 @@ class MarkdownifyTool(BaseTool): export SGAI_API_KEY="your-api-key" - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - Instantiate: .. code-block:: python from langchain_scrapegraph.tools import MarkdownifyTool - # Will automatically get SGAI_API_KEY from environment tool = MarkdownifyTool() - # Or provide API key directly - tool = MarkdownifyTool(api_key="your-api-key") - Use the tool: .. code-block:: python @@ -53,11 +47,6 @@ class MarkdownifyTool(BaseTool): "website_url": "https://example.com" }) - print(result) - # # Example Domain - # - # This domain is for use in illustrative examples... - Async usage: .. code-block:: python @@ -78,7 +67,6 @@ class MarkdownifyTool(BaseTool): @model_validator(mode="before") @classmethod def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") values["client"] = Client(api_key=values["api_key"]) return values @@ -91,18 +79,16 @@ def _run( website_url: str, run_manager: Optional[CallbackManagerForToolRun] = None, ) -> dict: - """Use the tool to extract data from a website.""" if not self.client: raise ValueError("Client not initialized") - response = self.client.markdownify(website_url=website_url) - return response["result"] + response = self.client.scrape(url=website_url, format="markdown") + return response async def _arun( self, website_url: str, run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> str: - """Use the tool asynchronously.""" + ) -> dict: return self._run( website_url, run_manager=run_manager.get_sync() if run_manager else None, diff --git a/langchain_scrapegraph/tools/monitor.py b/langchain_scrapegraph/tools/monitor.py new file mode 100644 index 0000000..0f3ef84 --- /dev/null +++ b/langchain_scrapegraph/tools/monitor.py @@ -0,0 +1,282 @@ +from typing import Any, Dict, Optional, Type + +from langchain_core.callbacks import ( + AsyncCallbackManagerForToolRun, + CallbackManagerForToolRun, +) +from langchain_core.tools import BaseTool +from langchain_core.utils import get_from_dict_or_env +from pydantic import BaseModel, Field, model_validator +from scrapegraph_py import Client + + +class MonitorCreateInput(BaseModel): + name: str = Field(description="Name of the monitor") + url: str = Field(description="URL to monitor") + prompt: str = Field(description="Prompt for AI extraction") + cron: str = Field( + description="Cron expression for scheduling (e.g., '0 9 * * *' for daily at 9 AM)" + ) + output_schema: Optional[Dict[str, Any]] = Field( + default=None, description="Optional JSON Schema for structured output" + ) + + +class MonitorIdInput(BaseModel): + monitor_id: str = Field(description="The monitor ID") + + +class MonitorCreateTool(BaseTool): + """Tool for creating a monitor using ScrapeGraph AI v2 API.""" + + name: str = "MonitorCreate" + description: str = ( + "Create a new monitor that automatically extracts data from a URL on a schedule." + ) + args_schema: Type[BaseModel] = MonitorCreateInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + name: str, + url: str, + prompt: str, + cron: str, + output_schema: Optional[Dict[str, Any]] = None, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + + response = self.client.monitor.create( + name=name, + url=url, + prompt=prompt, + cron=cron, + output_schema=output_schema, + ) + return response + + async def _arun( + self, + name: str, + url: str, + prompt: str, + cron: str, + output_schema: Optional[Dict[str, Any]] = None, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + name=name, + url=url, + prompt=prompt, + cron=cron, + output_schema=output_schema, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class MonitorListTool(BaseTool): + """Tool for listing all monitors.""" + + name: str = "MonitorList" + description: str = "List all monitors in your ScrapeGraph AI account." + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.monitor.list() + + async def _arun( + self, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run(run_manager=run_manager.get_sync() if run_manager else None) + + +class MonitorGetTool(BaseTool): + """Tool for getting a specific monitor.""" + + name: str = "MonitorGet" + description: str = "Get details of a specific monitor by its ID." + args_schema: Type[BaseModel] = MonitorIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + monitor_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.monitor.get(monitor_id) + + async def _arun( + self, + monitor_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + monitor_id=monitor_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class MonitorPauseTool(BaseTool): + """Tool for pausing a monitor.""" + + name: str = "MonitorPause" + description: str = "Pause a monitor so it stops running until resumed." + args_schema: Type[BaseModel] = MonitorIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + monitor_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.monitor.pause(monitor_id) + + async def _arun( + self, + monitor_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + monitor_id=monitor_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class MonitorResumeTool(BaseTool): + """Tool for resuming a paused monitor.""" + + name: str = "MonitorResume" + description: str = "Resume a paused monitor so it starts running again." + args_schema: Type[BaseModel] = MonitorIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + monitor_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.monitor.resume(monitor_id) + + async def _arun( + self, + monitor_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + monitor_id=monitor_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) + + +class MonitorDeleteTool(BaseTool): + """Tool for deleting a monitor.""" + + name: str = "MonitorDelete" + description: str = "Delete a monitor permanently." + args_schema: Type[BaseModel] = MonitorIdInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + + @model_validator(mode="before") + @classmethod + def validate_environment(cls, values: Dict) -> Dict: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + monitor_id: str, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> dict: + if not self.client: + raise ValueError("Client not initialized") + return self.client.monitor.delete(monitor_id) + + async def _arun( + self, + monitor_id: str, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> dict: + return self._run( + monitor_id=monitor_id, + run_manager=run_manager.get_sync() if run_manager else None, + ) diff --git a/langchain_scrapegraph/tools/scheduled_jobs.py b/langchain_scrapegraph/tools/scheduled_jobs.py deleted file mode 100644 index bf36fa7..0000000 --- a/langchain_scrapegraph/tools/scheduled_jobs.py +++ /dev/null @@ -1,520 +0,0 @@ -from typing import Any, Dict, Optional, Type - -from langchain_core.callbacks import ( - AsyncCallbackManagerForToolRun, - CallbackManagerForToolRun, -) -from langchain_core.tools import BaseTool -from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, Field, model_validator -from scrapegraph_py import Client - - -class ServiceType: - """Service types for scheduled jobs.""" - - SMARTSCRAPER = "smartscraper" - SEARCHSCRAPER = "searchscraper" - SMARTCRAWLER = "smartcrawler" - MARKDOWNIFY = "markdownify" - - -class CreateScheduledJobInput(BaseModel): - job_name: str = Field(description="Name of the scheduled job") - service_type: str = Field( - description="Type of service to run (smartscraper, searchscraper, smartcrawler, markdownify)" - ) - cron_expression: str = Field( - description="Cron expression for scheduling (e.g., '0 9 * * *' for daily at 9 AM)" - ) - job_config: Dict[str, Any] = Field( - description="Configuration dictionary for the job (varies by service type)" - ) - is_active: bool = Field( - default=True, description="Whether the job should be active" - ) - - -class GetScheduledJobsInput(BaseModel): - page: int = Field(default=1, description="Page number for pagination") - page_size: int = Field(default=10, description="Number of jobs per page") - service_type: Optional[str] = Field( - default=None, - description="Filter by service type (smartscraper, searchscraper, etc.)", - ) - is_active: Optional[bool] = Field( - default=None, description="Filter by active status" - ) - - -class GetScheduledJobInput(BaseModel): - job_id: str = Field(description="ID of the scheduled job to retrieve") - - -class UpdateScheduledJobInput(BaseModel): - job_id: str = Field(description="ID of the scheduled job to update") - job_name: Optional[str] = Field(default=None, description="New job name") - cron_expression: Optional[str] = Field( - default=None, description="New cron expression" - ) - job_config: Optional[Dict[str, Any]] = Field( - default=None, description="New job configuration" - ) - is_active: Optional[bool] = Field(default=None, description="New active status") - - -class JobControlInput(BaseModel): - job_id: str = Field(description="ID of the scheduled job") - - -class GetJobExecutionsInput(BaseModel): - job_id: str = Field(description="ID of the scheduled job") - page: int = Field(default=1, description="Page number for pagination") - page_size: int = Field(default=10, description="Number of executions per page") - - -class CreateScheduledJobTool(BaseTool): - """Tool for creating scheduled jobs with ScrapeGraph AI. - - This tool allows you to create recurring jobs that will automatically - run at specified intervals using cron expressions. - """ - - name: str = "CreateScheduledJob" - description: str = ( - "Create a new scheduled job that will run automatically at specified intervals. " - "Supports SmartScraper, SearchScraper, SmartCrawler, and Markdownify services." - ) - args_schema: Type[BaseModel] = CreateScheduledJobInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_name: str, - service_type: str, - cron_expression: str, - job_config: Dict[str, Any], - is_active: bool = True, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Create a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.create_scheduled_job( - job_name=job_name, - service_type=service_type, - cron_expression=cron_expression, - job_config=job_config, - is_active=is_active, - ) - return response - - async def _arun( - self, - job_name: str, - service_type: str, - cron_expression: str, - job_config: Dict[str, Any], - is_active: bool = True, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Create a scheduled job asynchronously.""" - return self._run( - job_name=job_name, - service_type=service_type, - cron_expression=cron_expression, - job_config=job_config, - is_active=is_active, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class GetScheduledJobsTool(BaseTool): - """Tool for retrieving scheduled jobs from ScrapeGraph AI.""" - - name: str = "GetScheduledJobs" - description: str = ( - "Retrieve a list of scheduled jobs with optional filtering by service type and active status." - ) - args_schema: Type[BaseModel] = GetScheduledJobsInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - page: int = 1, - page_size: int = 10, - service_type: Optional[str] = None, - is_active: Optional[bool] = None, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Get scheduled jobs.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.get_scheduled_jobs( - page=page, - page_size=page_size, - service_type=service_type, - is_active=is_active, - ) - return response - - async def _arun( - self, - page: int = 1, - page_size: int = 10, - service_type: Optional[str] = None, - is_active: Optional[bool] = None, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Get scheduled jobs asynchronously.""" - return self._run( - page=page, - page_size=page_size, - service_type=service_type, - is_active=is_active, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class GetScheduledJobTool(BaseTool): - """Tool for retrieving a specific scheduled job by ID.""" - - name: str = "GetScheduledJob" - description: str = "Retrieve details of a specific scheduled job by its ID." - args_schema: Type[BaseModel] = GetScheduledJobInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Get a specific scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.get_scheduled_job(job_id) - return response - - async def _arun( - self, - job_id: str, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Get a specific scheduled job asynchronously.""" - return self._run( - job_id=job_id, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class UpdateScheduledJobTool(BaseTool): - """Tool for updating a scheduled job.""" - - name: str = "UpdateScheduledJob" - description: str = "Update properties of an existing scheduled job." - args_schema: Type[BaseModel] = UpdateScheduledJobInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - job_name: Optional[str] = None, - cron_expression: Optional[str] = None, - job_config: Optional[Dict[str, Any]] = None, - is_active: Optional[bool] = None, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Update a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.update_scheduled_job( - job_id=job_id, - job_name=job_name, - cron_expression=cron_expression, - job_config=job_config, - is_active=is_active, - ) - return response - - async def _arun( - self, - job_id: str, - job_name: Optional[str] = None, - cron_expression: Optional[str] = None, - job_config: Optional[Dict[str, Any]] = None, - is_active: Optional[bool] = None, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Update a scheduled job asynchronously.""" - return self._run( - job_id=job_id, - job_name=job_name, - cron_expression=cron_expression, - job_config=job_config, - is_active=is_active, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class PauseScheduledJobTool(BaseTool): - """Tool for pausing a scheduled job.""" - - name: str = "PauseScheduledJob" - description: str = "Pause a scheduled job so it won't run until resumed." - args_schema: Type[BaseModel] = JobControlInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Pause a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.pause_scheduled_job(job_id) - return response - - async def _arun( - self, - job_id: str, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Pause a scheduled job asynchronously.""" - return self._run( - job_id=job_id, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class ResumeScheduledJobTool(BaseTool): - """Tool for resuming a paused scheduled job.""" - - name: str = "ResumeScheduledJob" - description: str = "Resume a paused scheduled job so it will start running again." - args_schema: Type[BaseModel] = JobControlInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Resume a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.resume_scheduled_job(job_id) - return response - - async def _arun( - self, - job_id: str, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Resume a scheduled job asynchronously.""" - return self._run( - job_id=job_id, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class TriggerScheduledJobTool(BaseTool): - """Tool for manually triggering a scheduled job.""" - - name: str = "TriggerScheduledJob" - description: str = "Manually trigger a scheduled job to run immediately." - args_schema: Type[BaseModel] = JobControlInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Trigger a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.trigger_scheduled_job(job_id) - return response - - async def _arun( - self, - job_id: str, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Trigger a scheduled job asynchronously.""" - return self._run( - job_id=job_id, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class GetJobExecutionsTool(BaseTool): - """Tool for getting execution history of a scheduled job.""" - - name: str = "GetJobExecutions" - description: str = "Retrieve execution history for a scheduled job." - args_schema: Type[BaseModel] = GetJobExecutionsInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - page: int = 1, - page_size: int = 10, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Get job executions.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.get_job_executions( - job_id=job_id, - page=page, - page_size=page_size, - ) - return response - - async def _arun( - self, - job_id: str, - page: int = 1, - page_size: int = 10, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Get job executions asynchronously.""" - return self._run( - job_id=job_id, - page=page, - page_size=page_size, - run_manager=run_manager.get_sync() if run_manager else None, - ) - - -class DeleteScheduledJobTool(BaseTool): - """Tool for deleting a scheduled job.""" - - name: str = "DeleteScheduledJob" - description: str = "Delete a scheduled job permanently." - args_schema: Type[BaseModel] = JobControlInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def _run( - self, - job_id: str, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Delete a scheduled job.""" - if not self.client: - raise ValueError("Client not initialized") - - response = self.client.delete_scheduled_job(job_id) - return response - - async def _arun( - self, - job_id: str, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Delete a scheduled job asynchronously.""" - return self._run( - job_id=job_id, - run_manager=run_manager.get_sync() if run_manager else None, - ) diff --git a/langchain_scrapegraph/tools/scrape.py b/langchain_scrapegraph/tools/scrape.py index 5821459..526f088 100644 --- a/langchain_scrapegraph/tools/scrape.py +++ b/langchain_scrapegraph/tools/scrape.py @@ -11,18 +11,17 @@ class ScrapeInput(BaseModel): - website_url: str = Field(description="URL of the website to scrape") - render_heavy_js: bool = Field( - default=False, - description="Whether to render heavy JavaScript content (slower but more complete)", - ) - headers: Optional[Dict[str, str]] = Field( - default=None, description="Optional headers to send with the request" + url: str = Field(description="URL of the website to scrape") + format: str = Field( + default="markdown", + description="Output format: 'markdown', 'html', 'screenshot', or 'branding'", ) class ScrapeTool(BaseTool): - """Tool for getting HTML content from websites using ScrapeGraph AI. + """Tool for scraping web pages using ScrapeGraph AI v2 API. + + Returns content in the specified format (markdown, html, screenshot, or branding). Setup: Install ``langchain-scrapegraph`` python package: @@ -47,55 +46,34 @@ class ScrapeTool(BaseTool): from langchain_scrapegraph.tools import ScrapeTool - # Will automatically get SGAI_API_KEY from environment tool = ScrapeTool() - # Or provide API key directly - tool = ScrapeTool(api_key="your-api-key") - Use the tool: .. code-block:: python - # Basic scraping + # Get markdown content result = tool.invoke({ - "website_url": "https://example.com" + "url": "https://example.com" }) - # With heavy JavaScript rendering + # Get HTML content result = tool.invoke({ - "website_url": "https://example.com", - "render_heavy_js": True + "url": "https://example.com", + "format": "html" }) - # With custom headers - result = tool.invoke({ - "website_url": "https://example.com", - "headers": { - "User-Agent": "Custom Bot 1.0", - "Accept": "text/html" - } - }) - - print(result) - # { - # "html": "...", - # "scrape_request_id": "req_123", - # "status": "success", - # "error": None - # } - Async usage: .. code-block:: python result = await tool.ainvoke({ - "website_url": "https://example.com" + "url": "https://example.com" }) """ name: str = "Scrape" description: str = ( - "Get HTML content from a website. Useful when you need to retrieve the raw HTML " - "content of a webpage, with optional heavy JavaScript rendering and custom headers." + "Scrape a web page and return its content in the specified format " + "(markdown, html, screenshot, or branding)." ) args_schema: Type[BaseModel] = ScrapeInput return_direct: bool = True @@ -105,7 +83,6 @@ class ScrapeTool(BaseTool): @model_validator(mode="before") @classmethod def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") values["client"] = Client(api_key=values["api_key"]) return values @@ -115,34 +92,24 @@ def __init__(self, **data: Any): def _run( self, - website_url: str, - render_heavy_js: bool = False, - headers: Optional[Dict[str, str]] = None, + url: str, + format: str = "markdown", run_manager: Optional[CallbackManagerForToolRun] = None, ) -> dict: - """Use the tool to scrape HTML content from a website.""" if not self.client: raise ValueError("Client not initialized") - response = self.client.scrape( - website_url=website_url, - render_heavy_js=render_heavy_js, - headers=headers, - ) - + response = self.client.scrape(url=url, format=format) return response async def _arun( self, - website_url: str, - render_heavy_js: bool = False, - headers: Optional[Dict[str, str]] = None, + url: str, + format: str = "markdown", run_manager: Optional[AsyncCallbackManagerForToolRun] = None, ) -> dict: - """Use the tool asynchronously.""" return self._run( - website_url=website_url, - render_heavy_js=render_heavy_js, - headers=headers, + url=url, + format=format, run_manager=run_manager.get_sync() if run_manager else None, ) diff --git a/langchain_scrapegraph/tools/search.py b/langchain_scrapegraph/tools/search.py new file mode 100644 index 0000000..b53f132 --- /dev/null +++ b/langchain_scrapegraph/tools/search.py @@ -0,0 +1,115 @@ +from typing import Any, Dict, Optional, Type, TypeVar + +from langchain_core.callbacks import ( + AsyncCallbackManagerForToolRun, + CallbackManagerForToolRun, +) +from langchain_core.tools import BaseTool +from langchain_core.utils import get_from_dict_or_env +from pydantic import BaseModel, Field, model_validator +from scrapegraph_py import Client + +T = TypeVar("T", bound=BaseModel) + + +class SearchInput(BaseModel): + query: str = Field( + description="The search query describing what information to find on the web" + ) + num_results: int = Field( + default=5, + description="Number of search results to return (3-20)", + ) + + +class SearchTool(BaseTool): + """Tool for searching the web and extracting structured data using ScrapeGraph AI v2 API. + + Setup: + Install ``langchain-scrapegraph`` python package: + + .. code-block:: bash + + pip install langchain-scrapegraph + + Get your API key from ScrapeGraph AI (https://scrapegraphai.com) + and set it as an environment variable: + + .. code-block:: bash + + export SGAI_API_KEY="your-api-key" + + Key init args: + api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. + client: Optional pre-configured ScrapeGraph client instance. + llm_output_schema: Optional Pydantic model class to structure the output. + + Instantiate: + .. code-block:: python + + from langchain_scrapegraph.tools import SearchTool + + tool = SearchTool() + + Use the tool: + .. code-block:: python + + result = tool.invoke({ + "query": "What are the key features of ChatGPT Plus?" + }) + + Async usage: + .. code-block:: python + + result = await tool.ainvoke({ + "query": "What are the key features of Product X?" + }) + """ + + name: str = "Search" + description: str = ( + "Search the web and extract structured information about a specific topic or query" + ) + args_schema: Type[BaseModel] = SearchInput + return_direct: bool = True + client: Optional[Client] = None + api_key: str + llm_output_schema: Optional[Type[BaseModel]] = None + + @model_validator(mode="before") + @classmethod + def validate_environment(cls: Type[T], values: Dict[str, Any]) -> Dict[str, Any]: + values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") + values["client"] = Client(api_key=values["api_key"]) + return values + + def __init__(self, **data: Any): + super().__init__(**data) + + def _run( + self, + query: str, + num_results: int = 5, + run_manager: Optional[CallbackManagerForToolRun] = None, + ) -> Dict[str, Any]: + if not self.client: + raise ValueError("Client not initialized") + + response = self.client.search( + query=query, + num_results=num_results, + output_schema=self.llm_output_schema, + ) + return response + + async def _arun( + self, + query: str, + num_results: int = 5, + run_manager: Optional[AsyncCallbackManagerForToolRun] = None, + ) -> Dict[str, Any]: + return self._run( + query, + num_results=num_results, + run_manager=run_manager.get_sync() if run_manager else None, + ) diff --git a/langchain_scrapegraph/tools/searchscraper.py b/langchain_scrapegraph/tools/searchscraper.py deleted file mode 100644 index 9b80ef5..0000000 --- a/langchain_scrapegraph/tools/searchscraper.py +++ /dev/null @@ -1,181 +0,0 @@ -from typing import Any, Dict, Optional, Type, TypeVar - -from langchain_core.callbacks import ( - AsyncCallbackManagerForToolRun, - CallbackManagerForToolRun, -) -from langchain_core.tools import BaseTool -from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, Field, model_validator -from scrapegraph_py import Client - -T = TypeVar("T", bound=BaseModel) - - -class SearchScraperInput(BaseModel): - user_prompt: str = Field( - description="Prompt describing what information to search for and extract from the web" - ) - extraction_mode: bool = Field( - default=True, - description="If True, use AI extraction mode (10 credits/page). If False, use markdown mode (2 credits/page).", - ) - - -class SearchScraperTool(BaseTool): - """Tool for searching and extracting structured data from the web using ScrapeGraph AI. - - Setup: - Install ``langchain-scrapegraph`` python package: - - .. code-block:: bash - - pip install langchain-scrapegraph - - Get your API key from ScrapeGraph AI (https://scrapegraphai.com) - and set it as an environment variable: - - .. code-block:: bash - - export SGAI_API_KEY="your-api-key" - - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - llm_output_schema: Optional Pydantic model or dictionary schema to structure the output. - If provided, the tool will ensure the output conforms to this schema. - - Instantiate: - .. code-block:: python - - from langchain_scrapegraph.tools import SearchScraperTool - - # Will automatically get SGAI_API_KEY from environment - tool = SearchScraperTool() - - # Or provide API key directly - tool = SearchScraperTool(api_key="your-api-key") - - # Optionally, you can provide an output schema: - from pydantic import BaseModel, Field - from typing import List - - class ProductInfo(BaseModel): - name: str = Field(description="Product name") - features: List[str] = Field(description="List of product features") - pricing: Dict[str, Any] = Field(description="Pricing information") - - tool_with_schema = SearchScraperTool(llm_output_schema=ProductInfo) - - Use the tool: - .. code-block:: python - - result = tool.invoke({ - "user_prompt": "What are the key features and pricing of ChatGPT Plus?" - }) - - print(result) - # { - # "product": { - # "name": "ChatGPT Plus", - # "description": "Premium version of ChatGPT...", - # ... - # }, - # "features": [...], - # "pricing": {...}, - # "reference_urls": [ - # "https://openai.com/chatgpt", - # ... - # ] - # } - - Async usage: - .. code-block:: python - - result = await tool.ainvoke({ - "user_prompt": "What are the key features of Product X?" - }) - """ - - name: str = "SearchScraper" - description: str = ( - "Useful when you need to search and extract structured information from the web about a specific topic or query" - ) - args_schema: Type[BaseModel] = SearchScraperInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - llm_output_schema: Optional[Type[BaseModel]] = None - - @model_validator(mode="before") - @classmethod - def validate_environment(cls: Type[T], values: Dict[str, Any]) -> Dict[str, Any]: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def __init__(self, **data: Any): - super().__init__(**data) - - def _run( - self, - user_prompt: str, - extraction_mode: bool = True, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Use the tool to search and extract data from the web. - - Args: - user_prompt: What information to search for and extract - extraction_mode: If True, use AI extraction (10 credits/page). If False, use markdown mode (2 credits/page). - run_manager: Optional callback manager - - Returns: - dict: In extraction mode, returns structured data. In markdown mode, returns markdown content. - """ - if not self.client: - raise ValueError("Client not initialized") - - # In markdown mode, we ignore the output schema since we're returning raw markdown - if not extraction_mode: - response = self.client.searchscraper( - user_prompt=user_prompt, - extraction_mode=False, - ) - return { - "markdown_content": response.get("markdown_content", ""), - "reference_urls": response.get("reference_urls", []), - } - - # In extraction mode, we can use the output schema if provided - if self.llm_output_schema is None: - response = self.client.searchscraper( - user_prompt=user_prompt, - extraction_mode=True, - ) - elif isinstance(self.llm_output_schema, type) and issubclass( - self.llm_output_schema, BaseModel - ): - response = self.client.searchscraper( - user_prompt=user_prompt, - extraction_mode=True, - output_schema=self.llm_output_schema, - ) - else: - raise ValueError("llm_output_schema must be a Pydantic model class") - - return response["result"] - - async def _arun( - self, - user_prompt: str, - extraction_mode: bool = True, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Use the tool asynchronously.""" - return self._run( - user_prompt, - extraction_mode=extraction_mode, - run_manager=run_manager.get_sync() if run_manager else None, - ) diff --git a/langchain_scrapegraph/tools/smartcrawler.py b/langchain_scrapegraph/tools/smartcrawler.py deleted file mode 100644 index c626850..0000000 --- a/langchain_scrapegraph/tools/smartcrawler.py +++ /dev/null @@ -1,188 +0,0 @@ -from typing import Any, Dict, Optional, Type - -from langchain_core.callbacks import ( - AsyncCallbackManagerForToolRun, - CallbackManagerForToolRun, -) -from langchain_core.tools import BaseTool -from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, Field, model_validator -from scrapegraph_py import Client - - -class SmartCrawlerInput(BaseModel): - prompt: str = Field( - description="Prompt describing what to extract from the websites and how to structure the output" - ) - url: str = Field(description="URL of the website to start crawling from") - cache_website: bool = Field( - default=True, - description="Whether to cache the website content for faster subsequent requests", - ) - depth: int = Field( - default=2, description="Maximum depth to crawl from the starting URL" - ) - max_pages: int = Field(default=2, description="Maximum number of pages to crawl") - same_domain_only: bool = Field( - default=True, - description="Whether to only crawl pages from the same domain as the starting URL", - ) - - -class SmartCrawlerTool(BaseTool): - """Tool for crawling and extracting structured data from multiple related webpages using ScrapeGraph AI. - - Setup: - Install ``langchain-scrapegraph`` python package: - - .. code-block:: bash - - pip install langchain-scrapegraph - - Get your API key from ScrapeGraph AI (https://scrapegraphai.com) - and set it as an environment variable: - - .. code-block:: bash - - export SGAI_API_KEY="your-api-key" - - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - llm_output_schema: Optional Pydantic model or dictionary schema to structure the output. - If provided, the tool will ensure the output conforms to this schema. - - Instantiate: - .. code-block:: python - - from langchain_scrapegraph.tools import SmartCrawlerTool - - # Will automatically get SGAI_API_KEY from environment - tool = SmartCrawlerTool() - - # Or provide API key directly - tool = SmartCrawlerTool(api_key="your-api-key") - - # Optionally, you can provide an output schema: - from pydantic import BaseModel, Field - - class CompanyInfo(BaseModel): - company_description: str = Field(description="What the company does") - privacy_policy: str = Field(description="Privacy policy content") - terms_of_service: str = Field(description="Terms of service content") - - tool_with_schema = SmartCrawlerTool(llm_output_schema=CompanyInfo) - - Use the tool: - .. code-block:: python - - # Basic crawling - result = tool.invoke({ - "prompt": "What does the company do? Extract privacy and terms content", - "url": "https://scrapegraphai.com/", - "depth": 2, - "max_pages": 5 - }) - - # Crawling with custom parameters - result = tool.invoke({ - "prompt": "Extract product information and pricing", - "url": "https://example.com/products", - "cache_website": False, - "depth": 3, - "max_pages": 10, - "same_domain_only": False - }) - - print(result) - - Async usage: - .. code-block:: python - - result = await tool.ainvoke({ - "prompt": "Extract company information", - "url": "https://example.com" - }) - """ - - name: str = "SmartCrawler" - description: str = ( - "Useful when you need to extract structured data from multiple related webpages by crawling through a website, applying LLM reasoning across pages, by providing a starting URL and extraction prompt" - ) - args_schema: Type[BaseModel] = SmartCrawlerInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - llm_output_schema: Optional[Type[BaseModel]] = None - - @model_validator(mode="before") - @classmethod - def validate_environment(cls, values: Dict) -> Dict: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def __init__(self, **data: Any): - super().__init__(**data) - - def _run( - self, - prompt: str, - url: str, - cache_website: bool = True, - depth: int = 2, - max_pages: int = 2, - same_domain_only: bool = True, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> dict: - """Use the tool to crawl and extract data from multiple webpages.""" - if not self.client: - raise ValueError("Client not initialized") - - if self.llm_output_schema is None: - response = self.client.crawl( - url=url, - prompt=prompt, - cache_website=cache_website, - depth=depth, - max_pages=max_pages, - same_domain_only=same_domain_only, - ) - elif isinstance(self.llm_output_schema, type) and issubclass( - self.llm_output_schema, BaseModel - ): - response = self.client.crawl( - url=url, - prompt=prompt, - cache_website=cache_website, - depth=depth, - max_pages=max_pages, - same_domain_only=same_domain_only, - output_schema=self.llm_output_schema, - ) - else: - raise ValueError("llm_output_schema must be a Pydantic model class") - - return response - - async def _arun( - self, - prompt: str, - url: str, - cache_website: bool = True, - depth: int = 2, - max_pages: int = 2, - same_domain_only: bool = True, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> dict: - """Use the tool asynchronously.""" - return self._run( - prompt, - url, - cache_website=cache_website, - depth=depth, - max_pages=max_pages, - same_domain_only=same_domain_only, - run_manager=run_manager.get_sync() if run_manager else None, - ) diff --git a/langchain_scrapegraph/tools/smartscraper.py b/langchain_scrapegraph/tools/smartscraper.py deleted file mode 100644 index 11334a6..0000000 --- a/langchain_scrapegraph/tools/smartscraper.py +++ /dev/null @@ -1,198 +0,0 @@ -from typing import Any, Dict, Optional, Type, TypeVar - -from langchain_core.callbacks import ( - AsyncCallbackManagerForToolRun, - CallbackManagerForToolRun, -) -from langchain_core.tools import BaseTool -from langchain_core.utils import get_from_dict_or_env -from pydantic import BaseModel, Field, model_validator -from scrapegraph_py import Client - -T = TypeVar("T", bound=BaseModel) - - -class SmartScraperInput(BaseModel): - user_prompt: str = Field( - description="Prompt describing what to extract from the webpage and how to structure the output" - ) - website_url: str = Field(description="Url of the webpage to extract data from") - website_html: Optional[str] = Field( - default=None, - description="Optional HTML content to process instead of fetching from website_url", - ) - render_heavy_js: bool = Field( - default=False, - description="If True, enables full browser rendering for JavaScript-heavy websites", - ) - - -class SmartScraperTool(BaseTool): - """Tool for extracting structured data from websites using ScrapeGraph AI. - - Setup: - Install ``langchain-scrapegraph`` python package: - - .. code-block:: bash - - pip install langchain-scrapegraph - - Get your API key from ScrapeGraph AI (https://scrapegraphai.com) - and set it as an environment variable: - - .. code-block:: bash - - export SGAI_API_KEY="your-api-key" - - Key init args: - api_key: Your ScrapeGraph AI API key. If not provided, will look for SGAI_API_KEY env var. - client: Optional pre-configured ScrapeGraph client instance. - llm_output_schema: Optional Pydantic model or dictionary schema to structure the output. - If provided, the tool will ensure the output conforms to this schema. - - Instantiate: - .. code-block:: python - - from langchain_scrapegraph.tools import SmartScraperTool - - # Will automatically get SGAI_API_KEY from environment - tool = SmartScraperTool() - - # Or provide API key directly - tool = SmartScraperTool(api_key="your-api-key") - - # Optionally, you can provide an output schema: - from pydantic import BaseModel, Field - - class WebsiteInfo(BaseModel): - title: str = Field(description="The main title") - description: str = Field(description="The main description") - - tool_with_schema = SmartScraperTool(llm_output_schema=WebsiteInfo) - - Use the tool: - .. code-block:: python - - # Using website URL - result = tool.invoke({ - "user_prompt": "Extract the main heading and first paragraph", - "website_url": "https://example.com" - }) - - # Using HTML content directly - html_content = ''' - - -

Example Domain

-

This domain is for use in illustrative examples...

- - - ''' - result = tool.invoke({ - "user_prompt": "Extract the main heading and first paragraph", - "website_url": "https://example.com", - "website_html": html_content # This will override website_url - }) - - print(result) - # Without schema: - # { - # "main_heading": "Example Domain", - # "first_paragraph": "This domain is for use in illustrative examples..." - # } - # - # With WebsiteInfo schema: - # { - # "title": "Example Domain", - # "description": "This domain is for use in illustrative examples..." - # } - - Async usage: - .. code-block:: python - - result = await tool.ainvoke({ - "user_prompt": "Extract the main heading", - "website_url": "https://example.com" - }) - """ - - name: str = "SmartScraper" - description: str = ( - "Useful when you need to extract structured data from a webpage, applying also some reasoning using LLM, by providing a webpage URL and an extraction prompt" - ) - args_schema: Type[BaseModel] = SmartScraperInput - return_direct: bool = True - client: Optional[Client] = None - api_key: str - llm_output_schema: Optional[Type[BaseModel]] = None - - @model_validator(mode="before") - @classmethod - def validate_environment(cls: Type[T], values: Dict[str, Any]) -> Dict[str, Any]: - """Validate that api key exists in environment.""" - values["api_key"] = get_from_dict_or_env(values, "api_key", "SGAI_API_KEY") - values["client"] = Client(api_key=values["api_key"]) - return values - - def __init__(self, **data: Any): - super().__init__(**data) - - def _run( - self, - user_prompt: str, - website_url: str, - website_html: Optional[str] = None, - render_heavy_js: bool = False, - run_manager: Optional[CallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Use the tool to extract data from a website. - - Args: - user_prompt: What to extract from the webpage - website_url: URL to scrape - website_html: Optional HTML content to process instead of fetching from URL - render_heavy_js: If True, enables full browser rendering for JavaScript-heavy sites - run_manager: Optional callback manager - - Returns: - dict: Extracted data in the requested format - """ - if not self.client: - raise ValueError("Client not initialized") - - if self.llm_output_schema is None: - response = self.client.smartscraper( - website_url=website_url, - user_prompt=user_prompt, - website_html=website_html, - ) - elif isinstance(self.llm_output_schema, type) and issubclass( - self.llm_output_schema, BaseModel - ): - response = self.client.smartscraper( - website_url=website_url, - user_prompt=user_prompt, - website_html=website_html, - output_schema=self.llm_output_schema, - ) - else: - raise ValueError("llm_output_schema must be a Pydantic model class") - - return response["result"] - - async def _arun( - self, - user_prompt: str, - website_url: str, - website_html: Optional[str] = None, - render_heavy_js: bool = False, - run_manager: Optional[AsyncCallbackManagerForToolRun] = None, - ) -> Dict[str, Any]: - """Use the tool asynchronously.""" - return self._run( - user_prompt, - website_url, - website_html=website_html, - render_heavy_js=render_heavy_js, - run_manager=run_manager.get_sync() if run_manager else None, - ) diff --git a/pyproject.toml b/pyproject.toml index 3199205..cf277fc 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -1,6 +1,6 @@ [tool.poetry] name = "langchain-scrapegraph" -version = "1.11.0" +version = "2.0.0" description = "Library for extracting structured data from websites using ScrapeGraphAI" authors = ["Marco Vinciguerra ", "Lorenzo Padoan "] maintainers = ["Marco Vinciguerra "] @@ -38,7 +38,7 @@ classifiers = [ [tool.poetry.dependencies] python = ">=3.10,<4.0" langchain-core = "^0.3.21" -scrapegraph-py = "1.31.0" +scrapegraph-py = ">=1.31.0" [tool.poetry.group.test.dependencies] pytest = "^8.3.4" diff --git a/tests/integration_tests/test_tools.py b/tests/integration_tests/test_tools.py index 9ed2a29..42b511e 100644 --- a/tests/integration_tests/test_tools.py +++ b/tests/integration_tests/test_tools.py @@ -13,20 +13,16 @@ from dotenv import load_dotenv from langchain_tests.integration_tests import ToolsIntegrationTests -from langchain_scrapegraph.tools import ( - GetCreditsTool, - MarkdownifyTool, - SmartScraperTool, -) +from langchain_scrapegraph.tools import ExtractTool, GetCreditsTool, MarkdownifyTool # Load environment variables from .env file load_dotenv() -class TestSmartScraperToolIntegration(ToolsIntegrationTests): +class TestExtractToolIntegration(ToolsIntegrationTests): @property - def tool_constructor(self) -> Type[SmartScraperTool]: - return SmartScraperTool + def tool_constructor(self) -> Type[ExtractTool]: + return ExtractTool @property def tool_constructor_params(self) -> dict: @@ -38,8 +34,8 @@ def tool_constructor_params(self) -> dict: @property def tool_invoke_params_example(self) -> dict: return { - "user_prompt": "Extract the main heading.", - "website_url": "https://example.com", + "url": "https://example.com", + "prompt": "Extract the main heading.", } diff --git a/tests/unit_tests/mocks.py b/tests/unit_tests/mocks.py index 16af24c..c1c628d 100644 --- a/tests/unit_tests/mocks.py +++ b/tests/unit_tests/mocks.py @@ -1,249 +1,187 @@ -from typing import Any, Dict, Optional, Type +from typing import Any, Dict, List, Optional, Type from langchain_core.tools import BaseTool from pydantic import BaseModel, Field -class MockClient: - def __init__(self, api_key: str = None, *args, **kwargs): - """Initialize with mock methods that return proper response structures""" - self._api_key = api_key +class MockCrawlNamespace: + def __init__(self, client): + self._client = client - def smartscraper( - self, website_url: str, user_prompt: str, website_html: str = None + def start( + self, + url: str, + depth: int = 2, + max_pages: int = 10, + format: str = "markdown", + include_patterns: Optional[List[str]] = None, + exclude_patterns: Optional[List[str]] = None, + fetch_config=None, ) -> dict: - """Mock smartscraper method""" - # If website_html is provided, use it to determine the response - if website_html and "

Test

" in website_html: - return { - "request_id": "test-id", - "status": "completed", - "website_url": website_url, - "user_prompt": user_prompt, - "result": { - "main_heading": "Test", - "first_paragraph": "Test paragraph", - }, - "error": "", - } - - # Default response for URL-based requests return { - "request_id": "test-id", - "status": "completed", - "website_url": website_url, - "user_prompt": user_prompt, - "result": { - "main_heading": "Example Domain", - "first_paragraph": "Test paragraph", - }, - "error": "", + "id": "crawl-job-123", + "status": "running", + "url": url, + "depth": depth, + "max_pages": max_pages, } - def searchscraper(self, user_prompt: str) -> dict: - """Mock searchscraper method""" + def status(self, crawl_id: str) -> dict: return { - "request_id": "test-id", + "id": crawl_id, "status": "completed", - "user_prompt": user_prompt, - "result": { - "product": {"name": "Test Product", "description": "Test description"}, - "features": [{"name": "Feature 1", "description": "Description 1"}], - "pricing": { - "plans": [ - { - "name": "Basic Plan", - "price": { - "amount": "10", - "currency": "USD", - "period": "monthly", - }, - } - ] - }, - }, - "reference_urls": ["https://example.com/test"], - "error": "", + "pages_crawled": 5, + "results": [{"url": "https://example.com", "content": "# Example"}], } - def get_credits(self) -> dict: - """Mock get_credits method""" - return {"remaining_credits": 50, "total_credits_used": 543} + def stop(self, crawl_id: str) -> dict: + return {"id": crawl_id, "status": "stopped"} + + def resume(self, crawl_id: str) -> dict: + return {"id": crawl_id, "status": "running"} + + +class MockMonitorNamespace: + def __init__(self, client): + self._client = client - def markdownify(self, website_url: str) -> dict: - """Mock markdownify method""" + def create( + self, + name: str, + url: str, + prompt: str, + cron: str, + output_schema=None, + fetch_config=None, + llm_config=None, + ) -> dict: return { - "request_id": "test-id", - "status": "completed", - "website_url": website_url, - "result": "# Example Domain\n\nTest paragraph", - "error": "", + "id": "monitor-123", + "name": name, + "url": url, + "prompt": prompt, + "cron": cron, + "status": "active", } - def scrape( - self, website_url: str, render_heavy_js: bool = False, headers: dict = None - ) -> dict: - """Mock scrape method""" + def list(self) -> dict: return { - "scrape_request_id": "test-scrape-id", - "status": "success", - "html": "

Example Domain

Test content

", - "error": None, + "monitors": [ + { + "id": "monitor-123", + "name": "Test Monitor", + "url": "https://example.com", + "status": "active", + } + ] } - def create_scheduled_job( - self, - job_name: str, - service_type: str, - cron_expression: str, - job_config: dict, - is_active: bool = True, - ) -> dict: - """Mock create_scheduled_job method""" + def get(self, monitor_id: str) -> dict: return { - "id": "test-job-id-123", - "job_name": job_name, - "service_type": service_type, - "cron_expression": cron_expression, - "job_config": job_config, - "is_active": is_active, - "created_at": "2024-01-01T00:00:00Z", - "next_run_at": "2024-01-02T09:00:00Z", + "id": monitor_id, + "name": "Test Monitor", + "url": "https://example.com", + "status": "active", } - def get_scheduled_jobs( - self, - page: int = 1, - page_size: int = 10, - service_type: str = None, - is_active: bool = None, - ) -> dict: - """Mock get_scheduled_jobs method""" - jobs = [ - { - "id": "test-job-1", - "job_name": "Test Job 1", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "is_active": True, - "created_at": "2024-01-01T00:00:00Z", - "next_run_at": "2024-01-02T09:00:00Z", - } - ] + def pause(self, monitor_id: str) -> dict: + return {"id": monitor_id, "status": "paused"} - # Apply filters - if service_type: - jobs = [job for job in jobs if job["service_type"] == service_type] - if is_active is not None: - jobs = [job for job in jobs if job["is_active"] == is_active] + def resume(self, monitor_id: str) -> dict: + return {"id": monitor_id, "status": "active"} - return {"jobs": jobs, "total": len(jobs), "page": page, "page_size": page_size} + def delete(self, monitor_id: str) -> dict: + return {"message": "Monitor deleted successfully", "id": monitor_id} - def get_scheduled_job(self, job_id: str) -> dict: - """Mock get_scheduled_job method""" - return { - "id": job_id, - "job_name": "Test Job", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "job_config": {"website_url": "https://example.com", "user_prompt": "test"}, - "is_active": True, - "created_at": "2024-01-01T00:00:00Z", - "updated_at": "2024-01-01T00:00:00Z", - "next_run_at": "2024-01-02T09:00:00Z", - } - def update_scheduled_job( +class MockClient: + def __init__(self, api_key: str = None, *args, **kwargs): + self._api_key = api_key + self.crawl = MockCrawlNamespace(self) + self.monitor = MockMonitorNamespace(self) + + def extract( self, - job_id: str, - job_name: str = None, - cron_expression: str = None, - job_config: dict = None, - is_active: bool = None, + url: str, + prompt: str, + output_schema=None, + fetch_config=None, + llm_config=None, ) -> dict: - """Mock update_scheduled_job method""" return { - "id": job_id, - "job_name": job_name or "Updated Test Job", - "service_type": "smartscraper", - "cron_expression": cron_expression or "0 8 * * *", - "is_active": is_active if is_active is not None else True, - "updated_at": "2024-01-01T00:00:00Z", + "id": "test-id", + "data": { + "main_heading": "Example Domain", + "first_paragraph": "Test paragraph", + }, } - def pause_scheduled_job(self, job_id: str) -> dict: - """Mock pause_scheduled_job method""" + def search( + self, + query: str, + num_results: int = 5, + output_schema=None, + llm_config=None, + ) -> dict: return { - "message": "Job paused successfully", - "job_id": job_id, - "is_active": False, + "id": "test-id", + "results": [ + { + "title": "Test Result", + "url": "https://example.com", + "content": "Test content", + } + ], } - def resume_scheduled_job(self, job_id: str) -> dict: - """Mock resume_scheduled_job method""" + def scrape(self, url: str, format: str = "markdown", fetch_config=None) -> dict: + if format == "html": + return { + "id": "test-id", + "format": "html", + "content": ["

Example Domain

"], + } return { - "message": "Job resumed successfully", - "job_id": job_id, - "is_active": True, - "next_run_at": "2024-01-02T09:00:00Z", + "id": "test-id", + "format": "markdown", + "content": ["# Example Domain\n\nTest paragraph"], } - def trigger_scheduled_job(self, job_id: str) -> dict: - """Mock trigger_scheduled_job method""" - return { - "message": "Job triggered successfully", - "job_id": job_id, - "execution_id": "exec-123", - "triggered_at": "2024-01-01T12:00:00Z", - } + def credits(self) -> dict: + return {"remaining_credits": 50, "total_credits_used": 543} - def get_job_executions( - self, job_id: str, page: int = 1, page_size: int = 10 + def history( + self, + endpoint: str = None, + status: str = None, + limit: int = None, + offset: int = None, ) -> dict: - """Mock get_job_executions method""" - executions = [ - { - "id": "exec-1", - "job_id": job_id, - "status": "completed", - "started_at": "2024-01-01T09:00:00Z", - "completed_at": "2024-01-01T09:01:00Z", - "credits_used": 1, - } - ] return { - "executions": executions, - "total": len(executions), - "page": page, - "page_size": page_size, + "requests": [ + { + "id": "req-123", + "endpoint": "scrape", + "status": "completed", + "created_at": "2024-01-01T00:00:00Z", + } + ], + "total": 1, } - def delete_scheduled_job(self, job_id: str) -> dict: - """Mock delete_scheduled_job method""" - return {"message": "Job deleted successfully", "job_id": job_id} - def close(self) -> None: - """Mock close method""" pass -class MockSmartScraperInput(BaseModel): - user_prompt: str = Field(description="Test prompt") - website_url: str = Field(description="Test URL") - - -class MockSearchScraperInput(BaseModel): - user_prompt: str = Field(description="Test prompt") - +class MockExtractInput(BaseModel): + url: str = Field(description="Test URL") + prompt: str = Field(description="Test prompt") -class MockMarkdownifyInput(BaseModel): - website_url: str = Field(description="Test URL") - -class MockSmartScraperTool(BaseTool): - name: str = "SmartScraper" +class MockExtractTool(BaseTool): + name: str = "Extract" description: str = "Test description" - args_schema: type[BaseModel] = MockSmartScraperInput + args_schema: type[BaseModel] = MockExtractInput client: Optional[MockClient] = None api_key: str @@ -251,19 +189,28 @@ def _run(self, **kwargs: Any) -> Dict: return {"main_heading": "Test", "first_paragraph": "Test"} -class MockSearchScraperTool(BaseTool): - name: str = "SearchScraper" +class MockSearchInput(BaseModel): + query: str = Field(description="Test query") + num_results: int = Field(default=5, description="Number of results") + + +class MockSearchTool(BaseTool): + name: str = "Search" description: str = "Test description" - args_schema: type[BaseModel] = MockSearchScraperInput + args_schema: type[BaseModel] = MockSearchInput client: Optional[MockClient] = None api_key: str llm_output_schema: Optional[Type[BaseModel]] = None def _run(self, **kwargs: Any) -> Dict: return { - "product": {"name": "Test Product", "description": "Test description"}, - "features": [{"name": "Feature 1", "description": "Description 1"}], - "reference_urls": ["https://example.com/test"], + "results": [ + { + "title": "Test Result", + "url": "https://example.com", + "content": "Test", + } + ], } @@ -277,6 +224,10 @@ def _run(self, **kwargs: Any) -> Dict: return {"remaining_credits": 50, "total_credits_used": 543} +class MockMarkdownifyInput(BaseModel): + website_url: str = Field(description="Test URL") + + class MockMarkdownifyTool(BaseTool): name: str = "Markdownify" description: str = "Test description" @@ -284,14 +235,17 @@ class MockMarkdownifyTool(BaseTool): client: Optional[MockClient] = None api_key: str - def _run(self, **kwargs: Any) -> str: - return "# Example Domain\n\nTest paragraph" + def _run(self, **kwargs: Any) -> Dict: + return { + "id": "test-id", + "format": "markdown", + "content": ["# Example Domain\n\nTest paragraph"], + } class MockScrapeInput(BaseModel): - website_url: str = Field(description="Test URL") - render_heavy_js: bool = Field(default=False, description="Test JS rendering") - headers: Optional[Dict[str, str]] = Field(default=None, description="Test headers") + url: str = Field(description="Test URL") + format: str = Field(default="markdown", description="Output format") class MockScrapeTool(BaseTool): @@ -303,65 +257,52 @@ class MockScrapeTool(BaseTool): def _run(self, **kwargs: Any) -> Dict: return { - "scrape_request_id": "test-scrape-id", - "status": "success", - "html": "

Example Domain

Test content

", - "error": None, + "id": "test-id", + "format": "markdown", + "content": ["# Example Domain\n\nTest paragraph"], } -class MockCreateScheduledJobInput(BaseModel): - job_name: str = Field(description="Test job name") - service_type: str = Field(description="Test service type") - cron_expression: str = Field(description="Test cron expression") - job_config: Dict[str, Any] = Field(description="Test job config") - is_active: bool = Field(default=True, description="Test active status") +class MockCrawlStartInput(BaseModel): + url: str = Field(description="Test URL") + depth: int = Field(default=2, description="Crawl depth") + max_pages: int = Field(default=10, description="Max pages") + format: str = Field(default="markdown", description="Output format") -class MockCreateScheduledJobTool(BaseTool): - name: str = "CreateScheduledJob" +class MockCrawlStartTool(BaseTool): + name: str = "CrawlStart" description: str = "Test description" - args_schema: type[BaseModel] = MockCreateScheduledJobInput + args_schema: type[BaseModel] = MockCrawlStartInput client: Optional[MockClient] = None api_key: str def _run(self, **kwargs: Any) -> Dict: return { - "id": "test-job-id-123", - "job_name": kwargs.get("job_name", "Test Job"), - "service_type": kwargs.get("service_type", "smartscraper"), - "cron_expression": kwargs.get("cron_expression", "0 9 * * *"), - "job_config": kwargs.get("job_config", {}), - "is_active": kwargs.get("is_active", True), - "created_at": "2024-01-01T00:00:00Z", - "next_run_at": "2024-01-02T09:00:00Z", + "id": "crawl-job-123", + "status": "running", + "url": kwargs.get("url", "https://example.com"), } -class MockGetScheduledJobsInput(BaseModel): - page: int = Field(default=1, description="Test page") - page_size: int = Field(default=10, description="Test page size") - service_type: Optional[str] = Field(default=None, description="Test service type") - is_active: Optional[bool] = Field(default=None, description="Test active status") +class MockMonitorCreateInput(BaseModel): + name: str = Field(description="Monitor name") + url: str = Field(description="Monitor URL") + prompt: str = Field(description="Monitor prompt") + cron: str = Field(description="Cron expression") -class MockGetScheduledJobsTool(BaseTool): - name: str = "GetScheduledJobs" +class MockMonitorCreateTool(BaseTool): + name: str = "MonitorCreate" description: str = "Test description" - args_schema: type[BaseModel] = MockGetScheduledJobsInput + args_schema: type[BaseModel] = MockMonitorCreateInput client: Optional[MockClient] = None api_key: str def _run(self, **kwargs: Any) -> Dict: - jobs = [ - { - "id": "test-job-1", - "job_name": "Test Job 1", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "is_active": True, - "created_at": "2024-01-01T00:00:00Z", - "next_run_at": "2024-01-02T09:00:00Z", - } - ] - return {"jobs": jobs, "total": len(jobs), "page": 1, "page_size": 10} + return { + "id": "monitor-123", + "name": kwargs.get("name", "Test Monitor"), + "url": kwargs.get("url", "https://example.com"), + "status": "active", + } diff --git a/tests/unit_tests/test_tools.py b/tests/unit_tests/test_tools.py index 55f7d6c..0feb911 100644 --- a/tests/unit_tests/test_tools.py +++ b/tests/unit_tests/test_tools.py @@ -4,101 +4,82 @@ from langchain_tests.unit_tests import ToolsUnitTests from langchain_scrapegraph.tools import ( - CreateScheduledJobTool, + CrawlStartTool, + ExtractTool, GetCreditsTool, - GetScheduledJobsTool, MarkdownifyTool, + MonitorCreateTool, ScrapeTool, - SearchScraperTool, - SmartScraperTool, + SearchTool, ) from tests.unit_tests.mocks import ( MockClient, - MockCreateScheduledJobTool, + MockCrawlStartTool, + MockExtractTool, MockGetCreditsTool, - MockGetScheduledJobsTool, MockMarkdownifyTool, + MockMonitorCreateTool, MockScrapeTool, - MockSearchScraperTool, - MockSmartScraperTool, + MockSearchTool, ) -class TestSmartScraperToolUnit(ToolsUnitTests): +class TestExtractToolUnit(ToolsUnitTests): @property - def tool_constructor(self) -> Type[SmartScraperTool]: - return MockSmartScraperTool + def tool_constructor(self) -> Type[ExtractTool]: + return MockExtractTool @property def tool_constructor_params(self) -> dict: - with patch("langchain_scrapegraph.tools.smartscraper.Client", MockClient): + with patch("langchain_scrapegraph.tools.extract.Client", MockClient): return {"api_key": "sgai-test-api-key"} @property def tool_invoke_params_example(self) -> dict: return { - "user_prompt": "Extract the main heading", - "website_url": "https://example.com", + "url": "https://example.com", + "prompt": "Extract the main heading", } -class TestSmartScraperToolCustom: - def test_invoke_with_html(self): - """Test invoking the tool with HTML content.""" - with patch("langchain_scrapegraph.tools.smartscraper.Client", MockClient): - tool = MockSmartScraperTool(api_key="sgai-test-api-key") +class TestExtractToolCustom: + def test_invoke_basic(self): + with patch("langchain_scrapegraph.tools.extract.Client", MockClient): + tool = MockExtractTool(api_key="sgai-test-api-key") result = tool.invoke( { - "user_prompt": "Extract the main heading", - "website_url": "https://example.com", - "website_html": "

Test

", + "url": "https://example.com", + "prompt": "Extract the main heading", } ) assert isinstance(result, dict) assert "main_heading" in result - assert result["main_heading"] == "Test" -class TestSearchScraperToolUnit(ToolsUnitTests): +class TestSearchToolUnit(ToolsUnitTests): @property - def tool_constructor(self) -> Type[SearchScraperTool]: - return MockSearchScraperTool + def tool_constructor(self) -> Type[SearchTool]: + return MockSearchTool @property def tool_constructor_params(self) -> dict: - with patch("langchain_scrapegraph.tools.searchscraper.Client", MockClient): + with patch("langchain_scrapegraph.tools.search.Client", MockClient): return {"api_key": "sgai-test-api-key"} @property def tool_invoke_params_example(self) -> dict: return { - "user_prompt": "What are the key features of Product X?", + "query": "What are the key features of Product X?", } -class TestSearchScraperToolCustom: - def test_invoke_with_schema(self): - """Test invoking the tool with a schema.""" - from typing import List - - from pydantic import BaseModel, Field - - class TestSchema(BaseModel): - product: dict = Field(description="Product information") - features: List[dict] = Field(description="List of features") - reference_urls: List[str] = Field(description="Reference URLs") - - with patch("langchain_scrapegraph.tools.searchscraper.Client", MockClient): - tool = MockSearchScraperTool(api_key="sgai-test-api-key") - tool.llm_output_schema = TestSchema - result = tool.invoke( - {"user_prompt": "What are the key features of Product X?"} - ) +class TestSearchToolCustom: + def test_invoke_with_num_results(self): + with patch("langchain_scrapegraph.tools.search.Client", MockClient): + tool = MockSearchTool(api_key="sgai-test-api-key") + result = tool.invoke({"query": "AI news", "num_results": 10}) assert isinstance(result, dict) - assert "product" in result - assert "features" in result - assert "reference_urls" in result - assert isinstance(result["reference_urls"], list) + assert "results" in result class TestGetCreditsToolUnit(ToolsUnitTests): @@ -143,131 +124,81 @@ def tool_constructor_params(self) -> dict: @property def tool_invoke_params_example(self) -> dict: - return {"website_url": "https://example.com"} + return {"url": "https://example.com"} class TestScrapeToolCustom: - def test_invoke_with_js_rendering(self): - """Test invoking the scrape tool with JavaScript rendering.""" + def test_invoke_with_html_format(self): with patch("langchain_scrapegraph.tools.scrape.Client", MockClient): tool = MockScrapeTool(api_key="sgai-test-api-key") - result = tool.invoke( - {"website_url": "https://example.com", "render_heavy_js": True} - ) + result = tool.invoke({"url": "https://example.com", "format": "html"}) assert isinstance(result, dict) - assert "html" in result - assert "scrape_request_id" in result - assert result["status"] == "success" - - def test_invoke_with_headers(self): - """Test invoking the scrape tool with custom headers.""" - with patch("langchain_scrapegraph.tools.scrape.Client", MockClient): - tool = MockScrapeTool(api_key="sgai-test-api-key") - result = tool.invoke( - { - "website_url": "https://example.com", - "headers": {"User-Agent": "Test Bot 1.0"}, - } - ) - assert isinstance(result, dict) - assert "html" in result - assert result["status"] == "success" + assert "content" in result -class TestCreateScheduledJobToolUnit(ToolsUnitTests): +class TestCrawlStartToolUnit(ToolsUnitTests): @property - def tool_constructor(self) -> Type[CreateScheduledJobTool]: - return MockCreateScheduledJobTool + def tool_constructor(self) -> Type[CrawlStartTool]: + return MockCrawlStartTool @property def tool_constructor_params(self) -> dict: - with patch("langchain_scrapegraph.tools.scheduled_jobs.Client", MockClient): + with patch("langchain_scrapegraph.tools.crawl.Client", MockClient): return {"api_key": "sgai-test-api-key"} @property def tool_invoke_params_example(self) -> dict: return { - "job_name": "Test Job", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "job_config": {"website_url": "https://example.com", "user_prompt": "test"}, - "is_active": True, + "url": "https://example.com", + "depth": 2, + "max_pages": 10, } -class TestCreateScheduledJobToolCustom: - def test_create_smartscraper_job(self): - """Test creating a SmartScraper scheduled job.""" - with patch("langchain_scrapegraph.tools.scheduled_jobs.Client", MockClient): - tool = MockCreateScheduledJobTool(api_key="sgai-test-api-key") +class TestCrawlStartToolCustom: + def test_invoke_basic(self): + with patch("langchain_scrapegraph.tools.crawl.Client", MockClient): + tool = MockCrawlStartTool(api_key="sgai-test-api-key") result = tool.invoke( - { - "job_name": "Daily Scraping Job", - "service_type": "smartscraper", - "cron_expression": "0 9 * * *", - "job_config": { - "website_url": "https://example.com", - "user_prompt": "Extract the main heading", - }, - "is_active": True, - } + {"url": "https://example.com", "depth": 3, "max_pages": 5} ) assert isinstance(result, dict) - assert result["job_name"] == "Daily Scraping Job" - assert result["service_type"] == "smartscraper" - assert result["is_active"] is True assert "id" in result - - def test_create_searchscraper_job(self): - """Test creating a SearchScraper scheduled job.""" - with patch("langchain_scrapegraph.tools.scheduled_jobs.Client", MockClient): - tool = MockCreateScheduledJobTool(api_key="sgai-test-api-key") - result = tool.invoke( - { - "job_name": "Weekly Search Job", - "service_type": "searchscraper", - "cron_expression": "0 10 * * 1", - "job_config": { - "user_prompt": "Find latest AI news", - "num_results": 5, - }, - "is_active": True, - } - ) - assert isinstance(result, dict) - assert result["job_name"] == "Weekly Search Job" - assert result["service_type"] == "searchscraper" + assert result["status"] == "running" -class TestGetScheduledJobsToolUnit(ToolsUnitTests): +class TestMonitorCreateToolUnit(ToolsUnitTests): @property - def tool_constructor(self) -> Type[GetScheduledJobsTool]: - return MockGetScheduledJobsTool + def tool_constructor(self) -> Type[MonitorCreateTool]: + return MockMonitorCreateTool @property def tool_constructor_params(self) -> dict: - with patch("langchain_scrapegraph.tools.scheduled_jobs.Client", MockClient): + with patch("langchain_scrapegraph.tools.monitor.Client", MockClient): return {"api_key": "sgai-test-api-key"} @property def tool_invoke_params_example(self) -> dict: - return {"page": 1, "page_size": 10} + return { + "name": "Test Monitor", + "url": "https://example.com", + "prompt": "Extract product prices", + "cron": "0 9 * * *", + } -class TestGetScheduledJobsToolCustom: - def test_get_jobs_with_filters(self): - """Test getting scheduled jobs with filters.""" - with patch("langchain_scrapegraph.tools.scheduled_jobs.Client", MockClient): - tool = MockGetScheduledJobsTool(api_key="sgai-test-api-key") +class TestMonitorCreateToolCustom: + def test_create_monitor(self): + with patch("langchain_scrapegraph.tools.monitor.Client", MockClient): + tool = MockMonitorCreateTool(api_key="sgai-test-api-key") result = tool.invoke( { - "page": 1, - "page_size": 10, - "service_type": "smartscraper", - "is_active": True, + "name": "Price Monitor", + "url": "https://example.com", + "prompt": "Extract product prices", + "cron": "0 9 * * *", } ) assert isinstance(result, dict) - assert "jobs" in result - assert "total" in result - assert isinstance(result["jobs"], list) + assert "id" in result + assert result["status"] == "active"