From c8f8853b13ee1f3f3d40054bf9bb6ebf33cc2d03 Mon Sep 17 00:00:00 2001 From: Pranjal Ashok Jain Date: Fri, 13 Feb 2026 09:47:38 -0800 Subject: [PATCH] Add Hugging Face sentiment analysis example --- .../02_sentiment_analysis/README.md | 260 ++++++++++++++++++ 02_ml_inference/02_sentiment_analysis/main.py | 64 +++++ .../02_sentiment_analysis/mothership.py | 55 ++++ .../02_sentiment_analysis/pyproject.toml | 58 ++++ .../02_sentiment_analysis/requirements.txt | 4 + .../02_sentiment_analysis/sentiment.py | 28 ++ .../02_sentiment_analysis/workers/__init__.py | 0 .../workers/cpu/__init__.py | 19 ++ .../workers/cpu/endpoint.py | 36 +++ .../workers/gpu/__init__.py | 19 ++ .../workers/gpu/endpoint.py | 61 ++++ 11 files changed, 604 insertions(+) create mode 100644 02_ml_inference/02_sentiment_analysis/README.md create mode 100644 02_ml_inference/02_sentiment_analysis/main.py create mode 100644 02_ml_inference/02_sentiment_analysis/mothership.py create mode 100644 02_ml_inference/02_sentiment_analysis/pyproject.toml create mode 100644 02_ml_inference/02_sentiment_analysis/requirements.txt create mode 100644 02_ml_inference/02_sentiment_analysis/sentiment.py create mode 100644 02_ml_inference/02_sentiment_analysis/workers/__init__.py create mode 100644 02_ml_inference/02_sentiment_analysis/workers/cpu/__init__.py create mode 100644 02_ml_inference/02_sentiment_analysis/workers/cpu/endpoint.py create mode 100644 02_ml_inference/02_sentiment_analysis/workers/gpu/__init__.py create mode 100644 02_ml_inference/02_sentiment_analysis/workers/gpu/endpoint.py diff --git a/02_ml_inference/02_sentiment_analysis/README.md b/02_ml_inference/02_sentiment_analysis/README.md new file mode 100644 index 0000000..409a03e --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/README.md @@ -0,0 +1,260 @@ +# flash-sentiment + +Flash application demonstrating distributed GPU and CPU computing on Runpod's serverless infrastructure. + +## About This Template + +This project was generated using `flash init`. The `flash-sentiment` placeholder is automatically replaced with your actual project name during initialization. + +## Quick Start + +### 1. Install Dependencies + +```bash +pip install -r requirements.txt +``` + +### 2. Configure Environment + +Create `.env` file: + +```bash +RUNPOD_API_KEY=your_api_key_here +``` + +Get your API key from [Runpod Settings](https://www.runpod.io/console/user/settings). + +### 3. Run Locally + +```bash +# Standard run +flash run + +# Faster development: pre-provision endpoints (eliminates cold-start delays) +flash run --auto-provision +``` + +Server starts at **http://localhost:8000** + +With `--auto-provision`, all serverless endpoints deploy before testing begins. This is much faster for development because endpoints are cached and reused across server restarts. Subsequent runs skip deployment and start immediately. + +### 4. Test the API + +```bash +# Health check +curl http://localhost:8000/ping + +# GPU worker +curl -X POST http://localhost:8000/gpu/hello \ + -H "Content-Type: application/json" \ + -d '{"message": "Hello GPU!"}' + +# CPU worker +curl -X POST http://localhost:8000/cpu/hello \ + -H "Content-Type: application/json" \ + -d '{"message": "Hello CPU!"}' +``` + +Visit **http://localhost:8000/docs** for interactive API documentation. + +## What This Demonstrates + +### GPU Worker (`workers/gpu/`) +Simple GPU-based serverless function: +- Remote execution with `@remote` decorator +- GPU resource configuration +- Automatic scaling (0-3 workers) +- No external dependencies required + +```python +@remote( + resource_config=LiveServerless( + name="gpu_worker", + gpus=[GpuGroup.ADA_24], # RTX 4090 + workersMin=0, + workersMax=3, + ) +) +async def gpu_hello(input_data: dict) -> dict: + # Your GPU code here + return {"status": "success", "message": "Hello from GPU!"} +``` + +### CPU Worker (`workers/cpu/`) +Simple CPU-based serverless function: +- CPU-only execution (no GPU overhead) +- CpuLiveServerless configuration +- Efficient for API endpoints +- Automatic scaling (0-5 workers) + +```python +@remote( + resource_config=CpuLiveServerless( + name="cpu_worker", + instanceIds=[CpuInstanceType.CPU3G_2_8], # 2 vCPU, 8GB RAM + workersMin=0, + workersMax=5, + ) +) +async def cpu_hello(input_data: dict) -> dict: + # Your CPU code here + return {"status": "success", "message": "Hello from CPU!"} +``` + +## Project Structure + +``` +flash-sentiment/ +├── main.py # FastAPI application +├── workers/ +│ ├── gpu/ # GPU worker +│ │ ├── __init__.py # FastAPI router +│ │ └── endpoint.py # @remote decorated function +│ └── cpu/ # CPU worker +│ ├── __init__.py # FastAPI router +│ └── endpoint.py # @remote decorated function +├── .env # Environment variables +├── requirements.txt # Dependencies +└── README.md # This file +``` + +## Key Concepts + +### Remote Execution +The `@remote` decorator transparently executes functions on serverless infrastructure: +- Code runs locally during development +- Automatically deploys to Runpod when configured +- Handles serialization, dependencies, and resource management + +### Resource Scaling +Both workers scale to zero when idle to minimize costs: +- **idleTimeout**: Minutes before scaling down (default: 5) +- **workersMin**: 0 = completely scales to zero +- **workersMax**: Maximum concurrent workers + +### GPU Types +Available GPU options for `LiveServerless`: +- `GpuGroup.ADA_24` - RTX 4090 (24GB) +- `GpuGroup.ADA_48_PRO` - RTX 6000 Ada, L40 (48GB) +- `GpuGroup.AMPERE_80` - A100 (80GB) +- `GpuGroup.ANY` - Any available GPU + +### CPU Types +Available CPU options for `CpuLiveServerless`: +- `CpuInstanceType.CPU3G_2_8` - 2 vCPU, 8GB RAM (General Purpose) +- `CpuInstanceType.CPU3C_4_8` - 4 vCPU, 8GB RAM (Compute Optimized) +- `CpuInstanceType.CPU5G_4_16` - 4 vCPU, 16GB RAM (Latest Gen) +- `CpuInstanceType.ANY` - Any available GPU + +## Development Workflow + +### Test Workers Locally +```bash +# Test GPU worker +python -m workers.gpu.endpoint + +# Test CPU worker +python -m workers.cpu.endpoint +``` + +### Run the Application +```bash +flash run +``` + +### Deploy to Production +```bash +# Build and deploy in one step +flash deploy + +# Or deploy to a specific environment +flash deploy --env production +``` + +## Adding New Workers + +### Add a GPU Worker + +1. Create `workers/my_worker/endpoint.py`: +```python +from runpod_flash import remote, LiveServerless + +config = LiveServerless(name="my_worker") + +@remote(resource_config=config, dependencies=["torch"]) +async def my_function(data: dict) -> dict: + import torch + # Your code here + return {"result": "success"} +``` + +2. Create `workers/my_worker/__init__.py`: +```python +from fastapi import APIRouter +from .endpoint import my_function + +router = APIRouter() + +@router.post("/process") +async def handler(data: dict): + return await my_function(data) +``` + +3. Add to `main.py`: +```python +from workers.my_worker import router as my_router +app.include_router(my_router, prefix="/my_worker") +``` + +### Add a CPU Worker + +Same pattern but use `CpuLiveServerless`: +```python +from runpod_flash import remote, CpuLiveServerless, CpuInstanceType + +config = CpuLiveServerless( + name="my_cpu_worker", + instanceIds=[CpuInstanceType.CPU3G_2_8] +) + +@remote(resource_config=config, dependencies=["requests"]) +async def fetch_data(url: str) -> dict: + import requests + return requests.get(url).json() +``` + +## Adding Dependencies + +Specify dependencies in the `@remote` decorator: +```python +@remote( + resource_config=config, + dependencies=["torch>=2.0.0", "transformers"], # Python packages + system_dependencies=["ffmpeg"] # System packages +) +async def my_function(data: dict) -> dict: + # Dependencies are automatically installed + import torch + import transformers +``` + +## Environment Variables + +```bash +# Required +RUNPOD_API_KEY=your_api_key + +# Optional +FLASH_HOST=localhost # Host to bind the server to (default: localhost) +FLASH_PORT=8888 # Port to bind the server to (default: 8888) +LOG_LEVEL=INFO # Logging level (default: INFO) +``` + +## Next Steps + +- Add your ML models or processing logic +- Configure GPU/CPU resources based on your needs +- Add authentication to your endpoints +- Implement error handling and retries +- Add monitoring and logging +- Deploy to production with `flash deploy` diff --git a/02_ml_inference/02_sentiment_analysis/main.py b/02_ml_inference/02_sentiment_analysis/main.py new file mode 100644 index 0000000..fd96b92 --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/main.py @@ -0,0 +1,64 @@ +import logging +import os +import sentiment # noqa: F401 +from sentiment import classify + + +from fastapi import FastAPI + +from workers.cpu import cpu_router +from workers.gpu import gpu_router + +logger = logging.getLogger(__name__) + + +app = FastAPI( + title="Flash Application", + description="Distributed GPU and CPU computing with Runpod Flash", + version="0.1.0", +) + +# Include routers +app.include_router(gpu_router, prefix="/gpu", tags=["GPU Workers"]) +app.include_router(cpu_router, prefix="/cpu", tags=["CPU Workers"]) + + +@app.get("/") +def home(): + return { + "message": "Flash Application", + "docs": "/docs", + "endpoints": {"gpu_hello": "/gpu/hello", "cpu_hello": "/cpu/hello"}, + } + + +@app.get("/ping") +def ping(): + return {"status": "healthy"} + +from pydantic import BaseModel + +class ClassifyRequest(BaseModel): + text: str + +@app.post("/classify", tags=["AI"]) +async def classify_endpoint(req: ClassifyRequest): + # classify() is a Flash remote function, so you must await it + return await classify(req.text) + +if __name__ == "__main__": + import uvicorn + + host = os.getenv("FLASH_HOST", "localhost") + port = int(os.getenv("FLASH_PORT", 8888)) + logger.info(f"Starting Flash server on {host}:{port}") + + uvicorn.run(app, host=host, port=port) + +from runpod_flash import remote, LiveServerless, CpuInstanceType + +cpu_config = LiveServerless( + name="flash-ai-sentiment", + instanceIds=[CpuInstanceType.CPU3G_2_8], + workersMax=1, +) \ No newline at end of file diff --git a/02_ml_inference/02_sentiment_analysis/mothership.py b/02_ml_inference/02_sentiment_analysis/mothership.py new file mode 100644 index 0000000..06ccb23 --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/mothership.py @@ -0,0 +1,55 @@ +""" +Mothership Endpoint Configuration + +The mothership endpoint serves your FastAPI application routes. +It is automatically deployed as a CPU-optimized load-balanced endpoint. + +To customize this configuration: +- Modify worker scaling: change workersMin and workersMax values +- Use GPU load balancer: import LiveLoadBalancer instead of CpuLiveLoadBalancer +- Change endpoint name: update the 'name' parameter + +To disable mothership deployment: +- Delete this file, or +- Comment out the 'mothership' variable below + +Documentation: https://docs.runpod.io/flash/mothership +""" + +from runpod_flash import CpuLiveLoadBalancer + +# Mothership endpoint configuration +# This serves your FastAPI app routes from main.py +mothership = CpuLiveLoadBalancer( + name="mothership", + workersMin=1, + workersMax=3, +) + +# Examples of customization: + +# Increase scaling for high traffic +# mothership = CpuLiveLoadBalancer( +# name="mothership", +# workersMin=2, +# workersMax=10, +# ) + +# Use GPU-based load balancer instead of CPU +# (requires importing LiveLoadBalancer) +# from runpod_flash import LiveLoadBalancer +# mothership = LiveLoadBalancer( +# name="mothership", +# gpus=[GpuGroup.ANY], +# ) + +# Custom endpoint name +# mothership = CpuLiveLoadBalancer( +# name="my-api-gateway", +# workersMin=1, +# workersMax=3, +# ) + +# To disable mothership: +# - Delete this entire file, or +# - Comment out the 'mothership' variable above diff --git a/02_ml_inference/02_sentiment_analysis/pyproject.toml b/02_ml_inference/02_sentiment_analysis/pyproject.toml new file mode 100644 index 0000000..8ccb0df --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/pyproject.toml @@ -0,0 +1,58 @@ +[build-system] +requires = ["setuptools>=61.0"] +build-backend = "setuptools.build_meta" + +[project] +name = "flash-sentiment" +version = "0.1.0" +description = "Flash serverless application" +readme = "README.md" +requires-python = ">=3.11" +dependencies = [ + "runpod-flash", + "fastapi>=0.104.0", + "uvicorn>=0.24.0", +] + +[project.optional-dependencies] +dev = [ + "pytest>=7.0", + "pytest-asyncio>=0.21", + "pytest-cov>=4.0", + "ruff>=0.1", + "mypy>=1.0", +] + +[tool.ruff] +line-length = 100 +target-version = "py311" + +[tool.ruff.lint] +select = ["E", "F", "I", "N", "W"] +ignore = ["E501"] + +[tool.pytest.ini_options] +testpaths = ["tests"] +python_files = ["test_*.py", "*_test.py"] +python_classes = ["Test*"] +python_functions = ["test_*"] +asyncio_mode = "auto" + +[tool.mypy] +python_version = "3.11" +warn_return_any = false +warn_unused_configs = true +disallow_untyped_defs = false + +[tool.coverage.run] +source = ["src"] +omit = ["*/tests/*"] + +[tool.coverage.report] +exclude_lines = [ + "pragma: no cover", + "def __repr__", + "raise AssertionError", + "raise NotImplementedError", + "if __name__ == .__main__.:", +] diff --git a/02_ml_inference/02_sentiment_analysis/requirements.txt b/02_ml_inference/02_sentiment_analysis/requirements.txt new file mode 100644 index 0000000..83ad37c --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/requirements.txt @@ -0,0 +1,4 @@ +numpy +scipy +scikit-learn +runpod-flash diff --git a/02_ml_inference/02_sentiment_analysis/sentiment.py b/02_ml_inference/02_sentiment_analysis/sentiment.py new file mode 100644 index 0000000..2e6035a --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/sentiment.py @@ -0,0 +1,28 @@ +from runpod_flash import remote, LiveServerless, CpuInstanceType + +cpu_config = LiveServerless( + name="flash-ai-sentiment", + instanceIds=[CpuInstanceType.CPU3G_2_8], + workersMax=1, +) + +@remote( + resource_config=cpu_config, + dependencies=[ + "transformers", + "torch", + "safetensors", + "huggingface_hub", + ], +) +def classify(text: str) -> dict: + from transformers import pipeline + + clf = pipeline("sentiment-analysis") # defaults to a reasonable pretrained model + out = clf(text)[0] # e.g. {"label": "POSITIVE", "score": 0.999...} + + return { + "input": text, + "label": out["label"], + "score": float(out["score"]), + } diff --git a/02_ml_inference/02_sentiment_analysis/workers/__init__.py b/02_ml_inference/02_sentiment_analysis/workers/__init__.py new file mode 100644 index 0000000..e69de29 diff --git a/02_ml_inference/02_sentiment_analysis/workers/cpu/__init__.py b/02_ml_inference/02_sentiment_analysis/workers/cpu/__init__.py new file mode 100644 index 0000000..aef10a1 --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/workers/cpu/__init__.py @@ -0,0 +1,19 @@ +from fastapi import APIRouter +from pydantic import BaseModel + +from .endpoint import cpu_hello + +cpu_router = APIRouter() + + +class MessageRequest(BaseModel): + """Request model for CPU worker.""" + + message: str = "Hello from CPU!" + + +@cpu_router.post("/hello") +async def hello(request: MessageRequest): + """Simple CPU worker endpoint.""" + result = await cpu_hello({"message": request.message}) + return result diff --git a/02_ml_inference/02_sentiment_analysis/workers/cpu/endpoint.py b/02_ml_inference/02_sentiment_analysis/workers/cpu/endpoint.py new file mode 100644 index 0000000..762e8f7 --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/workers/cpu/endpoint.py @@ -0,0 +1,36 @@ +from runpod_flash import CpuLiveServerless, remote + +cpu_config = CpuLiveServerless( + name="cpu_worker", + workersMin=0, + workersMax=5, + idleTimeout=5, +) + + +@remote(resource_config=cpu_config) +async def cpu_hello(input_data: dict) -> dict: + """Simple CPU worker example.""" + import platform + from datetime import datetime + + message = input_data.get("message", "Hello from CPU worker!") + + return { + "status": "success", + "message": message, + "worker_type": "CPU", + "timestamp": datetime.now().isoformat(), + "platform": platform.system(), + "python_version": platform.python_version(), + } + + +# Test locally with: python -m workers.cpu.endpoint +if __name__ == "__main__": + import asyncio + + test_payload = {"message": "Testing CPU worker"} + print(f"Testing CPU worker with payload: {test_payload}") + result = asyncio.run(cpu_hello(test_payload)) + print(f"Result: {result}") diff --git a/02_ml_inference/02_sentiment_analysis/workers/gpu/__init__.py b/02_ml_inference/02_sentiment_analysis/workers/gpu/__init__.py new file mode 100644 index 0000000..a6a3caa --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/workers/gpu/__init__.py @@ -0,0 +1,19 @@ +from fastapi import APIRouter +from pydantic import BaseModel + +from .endpoint import gpu_hello + +gpu_router = APIRouter() + + +class MessageRequest(BaseModel): + """Request model for GPU worker.""" + + message: str = "Hello from GPU!" + + +@gpu_router.post("/hello") +async def hello(request: MessageRequest): + """Simple GPU worker endpoint.""" + result = await gpu_hello({"message": request.message}) + return result diff --git a/02_ml_inference/02_sentiment_analysis/workers/gpu/endpoint.py b/02_ml_inference/02_sentiment_analysis/workers/gpu/endpoint.py new file mode 100644 index 0000000..1d125c1 --- /dev/null +++ b/02_ml_inference/02_sentiment_analysis/workers/gpu/endpoint.py @@ -0,0 +1,61 @@ +from runpod_flash import GpuGroup, LiveServerless, remote + +gpu_config = LiveServerless( + name="gpu_worker", + gpus=[GpuGroup.ANY], + workersMin=0, + workersMax=3, + idleTimeout=5, +) + + +@remote(resource_config=gpu_config, dependencies=["torch"]) +async def gpu_hello(input_data: dict) -> dict: + """Simple GPU worker example with GPU detection.""" + import platform + from datetime import datetime + + try: + import torch + + gpu_available = torch.cuda.is_available() + if gpu_available: + gpu_name = torch.cuda.get_device_name(0) + gpu_count = torch.cuda.device_count() + gpu_memory = torch.cuda.get_device_properties(0).total_memory / (1024**3) + else: + gpu_name = "No GPU detected" + gpu_count = 0 + gpu_memory = 0 + except Exception as e: + gpu_available = False + gpu_name = f"Error detecting GPU: {str(e)}" + gpu_count = 0 + gpu_memory = 0 + + message = input_data.get("message", "Hello from GPU worker!") + + return { + "status": "success", + "message": message, + "worker_type": "GPU", + "gpu_info": { + "available": gpu_available, + "name": gpu_name, + "count": gpu_count, + "memory_gb": round(gpu_memory, 2) if gpu_memory else 0, + }, + "timestamp": datetime.now().isoformat(), + "platform": platform.system(), + "python_version": platform.python_version(), + } + + +# Test locally with: python -m workers.gpu.endpoint +if __name__ == "__main__": + import asyncio + + test_payload = {"message": "Testing GPU worker"} + print(f"Testing GPU worker with payload: {test_payload}") + result = asyncio.run(gpu_hello(test_payload)) + print(f"Result: {result}")