A unified Python library for orchestrating ArangoDB Graph Analytics Engine (GAE) operations across both Arango Managed Platform (AMP) and self-managed deployments. Scope is limited to GAE orchestration (deploy -> load -> analyze -> store -> cleanup).
- Unified Interface - Single API for both AMP and self-managed deployments
- Complete Automation - Full workflow orchestration (deploy -> load -> analyze -> store -> cleanup)
- Multiple Algorithms - Support for PageRank, WCC, SCC, Label Propagation, and more
- Result Management - Index management, validation, comparison, and batch operations
- Query Helpers - Cross-reference results, find top influential vertices, join with vertex details
- Export Utilities - Export results to CSV and JSON formats
- Cost Tracking - Automatic cost calculation for AMP deployments
- Error Handling - Robust retry logic and guaranteed cleanup
- Easy Configuration - Simple
.envfile-based configuration - Production Ready - Extracted from three production projects
# Install from source (development)
git clone https://github.com/ArthurKeen/graph-analytics-orchestrator.git
cd graph-analytics-orchestrator
pip install -e .
# Or install from PyPI (when published)
pip install graph-analytics-orchestrator- Copy the example environment file:
cp .env.example .env- Edit
.envwith your credentials:
For AMP (Arango Managed Platform):
# ArangoDB Connection
ARANGO_ENDPOINT=https://your-cluster.arangodb.cloud:8529
ARANGO_USER=root
ARANGO_PASSWORD=your-password
ARANGO_DATABASE=your-database
# GAE Configuration
GAE_DEPLOYMENT_MODE=amp
ARANGO_GRAPH_API_KEY_ID=your-api-key-id
ARANGO_GRAPH_API_KEY_SECRET=your-api-key-secret
ARANGO_GAE_PORT=8829For Self-Managed:
# ArangoDB Connection
ARANGO_ENDPOINT=https://your-endpoint:8529
ARANGO_USER=root
ARANGO_PASSWORD=your-password
ARANGO_DATABASE=your-database
ARANGO_VERIFY_SSL=false
# GAE Configuration
GAE_DEPLOYMENT_MODE=self_managed
# No additional credentials neededfrom graph_analytics_orchestrator import GAEOrchestrator, AnalysisConfig
# Define your analysis
config = AnalysisConfig(
name="product_demand",
description="PageRank analysis of product demand",
vertex_collections=["users", "products"],
edge_collections=["clicks"],
algorithm="pagerank",
engine_size="e16", # AMP only, ignored for self-managed
target_collection="graph_analysis_results"
)
# Run the analysis (fully automated)
orchestrator = GAEOrchestrator()
result = orchestrator.run_analysis(config)
# Check results
print(f"Status: {result.status}")
print(f"Documents updated: {result.documents_updated}")
print(f"Cost: ${result.estimated_cost_usd}") # AMP only- PRD.md - Product Requirements Document
- Result Management API - API documentation for result management, queries, and export
- Result Management Examples - Usage examples for result operations
- Examples - Code examples below
| Algorithm | Use Case | Parameters |
|---|---|---|
| PageRank | Influence analysis, centrality | damping_factor, maximum_supersteps |
| WCC | Community detection, data quality | None |
| SCC | Cyclic relationships, temporal analysis | None |
| Label Propagation | Community detection, clustering | start_label_attribute, synchronous, random_tiebreak, maximum_supersteps |
from graph_analytics_orchestrator import GAEOrchestrator, AnalysisConfig
config = AnalysisConfig(
name="user_influence",
vertex_collections=["users"],
edge_collections=["follows"],
algorithm="pagerank",
algorithm_params={
"damping_factor": 0.85,
"maximum_supersteps": 100
},
target_collection="users",
result_field="pagerank_score"
)
orchestrator = GAEOrchestrator()
result = orchestrator.run_analysis(config)config = AnalysisConfig(
name="product_communities",
vertex_collections=["products"],
edge_collections=["co_purchased"],
algorithm="label_propagation",
algorithm_params={
"start_label_attribute": "_key",
"synchronous": False,
"maximum_supersteps": 200
},
target_collection="products",
result_field="community_id"
)
orchestrator = GAEOrchestrator()
result = orchestrator.run_analysis(config)config = AnalysisConfig(
name="entity_resolution",
vertex_collections=["entities"],
edge_collections=["similarity_edges"],
algorithm="wcc",
target_collection="entities",
result_field="component_id"
)
orchestrator = GAEOrchestrator()
result = orchestrator.run_analysis(config)configs = [
AnalysisConfig(name="analysis1", vertex_collections=["v1"], edge_collections=["e1"], algorithm="pagerank"),
AnalysisConfig(name="analysis2", vertex_collections=["v2"], edge_collections=["e2"], algorithm="wcc"),
]
orchestrator = GAEOrchestrator()
results = orchestrator.run_batch(configs)
for result in results:
print(f"{result.config.name}: {result.status}")graph_analytics_orchestrator/
|-- __init__.py # Public API
|-- config.py # Configuration management
|-- db_connection.py # ArangoDB connection
|-- gae_connection.py # GAE connection (AMP & self-managed)
|-- gae_orchestrator.py # Workflow orchestration
|-- results.py # Result collection management & batch operations
|-- queries.py # Result query helpers
|-- export.py # Export utilities (CSV, JSON)
`-- utils.py # Utility functions
Arango Managed Platform (AMP):
- Uses API keys and
oasisctlfor authentication - Configurable engine sizes (e4, e8, e16, e32, e64, e128)
- Cost tracking based on engine size and runtime
Self-Managed (GenAI Platform):
- Uses JWT tokens from ArangoDB
- Engine size managed by platform
- No cost tracking (on-premises)
The orchestrator automates the complete workflow:
- Engine Deployment - Deploy or start GAE engine
- Graph Loading - Load graph data from ArangoDB collections
- Algorithm Execution - Run the configured algorithm
- Result Storage - Write results back to ArangoDB
- Cleanup - Delete/stop engine to prevent orphaned resources
All steps include error handling, retry logic, and guaranteed cleanup.
The library automatically tracks costs for AMP deployments:
result = orchestrator.run_analysis(config)
print(f"Cost: ${result.estimated_cost_usd}")
print(f"Runtime: {result.engine_runtime_minutes} minutes")Engine costs (approximate, per hour):
- e4: $0.20
- e8: $0.30
- e16: $0.40
- e32: $0.80
- e64: $1.60
- e128: $3.20
The library includes comprehensive error handling:
- Automatic Retry - Transient errors are automatically retried
- Non-Retryable Errors - Configuration errors are not retried
- Guaranteed Cleanup - Engines are always cleaned up, even on failure
- Safety Checks - Warns about existing engines before deployment
If you're migrating from prior projects (AMP or self-managed), the library provides a unified interface that simplifies the orchestration logic. See the examples below for common patterns.
- Python 3.8+
- ArangoDB cluster (AMP or self-managed)
- For AMP:
oasisctlCLI tool (for token generation)
pip install python-arango requests python-dotenvpip install pytest pytest-cov black flake8 mypyRequired for All Deployments:
ARANGO_ENDPOINT- ArangoDB endpoint URLARANGO_USER- Database usernameARANGO_PASSWORD- Database passwordARANGO_DATABASE- Database name
For AMP Deployments:
GAE_DEPLOYMENT_MODE=ampARANGO_GRAPH_API_KEY_ID- API key IDARANGO_GRAPH_API_KEY_SECRET- API key secretARANGO_GRAPH_TOKEN- (Optional) Pre-generated tokenARANGO_GAE_PORT- (Optional) GAE port (default: 8829)
For Self-Managed Deployments:
GAE_DEPLOYMENT_MODE=self_managed- No additional GAE credentials needed
Main orchestrator class for running analyses.
orchestrator = GAEOrchestrator(verbose=True)
result = orchestrator.run_analysis(config)
results = orchestrator.run_batch(configs)Configuration for a GAE analysis.
config = AnalysisConfig(
name="analysis_name",
vertex_collections=["collection1", "collection2"],
edge_collections=["edge_collection"],
algorithm="pagerank",
engine_size="e16",
target_collection="results",
algorithm_params={...}
)Result object from a completed analysis.
result.status # AnalysisStatus enum
result.vertex_count # Number of vertices
result.edge_count # Number of edges
result.documents_updated # Documents updated
result.estimated_cost_usd # Cost (AMP only)
result.duration_seconds # RuntimeContributions are welcome! Please see the contributing guidelines (to be added).
MIT License - see LICENSE file for details.
For issues, questions, or contributions:
- Review the PRD for detailed documentation
- Open an issue on GitHub
- Initial release
- Support for AMP and self-managed deployments
- Complete workflow orchestration
- Cost tracking for AMP
- Comprehensive error handling
- ArangoDB team for the Graph Analytics Engine