Skip to content

Latest commit

 

History

History
483 lines (375 loc) · 15.5 KB

File metadata and controls

483 lines (375 loc) · 15.5 KB

Nexus Python SDK

PyPI License Python CI

Official Python SDK for Nexus graph database.

Installation

pip install hivehub-nexus-sdk

Quick Start (RPC — default)

import asyncio
from nexus_sdk import NexusClient

async def main():
    # Defaults to nexus://127.0.0.1:15475 (binary RPC).
    async with NexusClient() as client:
        result = await client.execute_cypher("RETURN 1 AS one")
        print(f"{result.rows[0]}  (transport: {client.endpoint_description()})")

asyncio.run(main())

Transports

URL form Transport Default port Use case
nexus://host[:port] Binary RPC (MessagePack) 15475 Default. Lowest latency.
http://host[:port] HTTP/JSON (httpx) 15474 Browser proxies, firewalls.
https://host[:port] HTTPS/JSON 443 Public-internet HTTP with TLS.
resp3://host[:port] RESP3 (reserved) 15476 Not yet shipped — raises.

Precedence: URL scheme > NEXUS_SDK_TRANSPORT env var > transport kwarg > default (nexus).

# HTTP fallback
client = NexusClient(base_url="http://localhost:15474", api_key="nexus_sk_...")

# Transport hint on a bare URL
client = NexusClient(base_url="host:15474", transport="http")

# Env override (`NEXUS_SDK_TRANSPORT=http`) honoured automatically

Full cross-SDK spec: docs/specs/sdk-transport.md.

Usage

Basic Example

import asyncio
from nexus_sdk import NexusClient

async def main():
    async with NexusClient() as client:
        # Cypher (routes through the active transport)
        result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10")
        print(f"Found {len(result.rows)} rows")

        # Convenience helpers call into Cypher under the hood
        create_response = await client.create_node(
            labels=["Person"],
            properties={"name": "Alice"},
        )
        print(f"Created node with ID: {create_response.node_id}")

asyncio.run(main())

With Authentication

# Using API key
client = NexusClient(
    "http://localhost:15474",
    api_key="your-api-key"
)

# Or using username/password
client = NexusClient(
    "http://localhost:15474",
    username="user",
    password="pass"
)

Schema Management

# Create a label
response = await client.create_label("Person")

# List all labels. Each entry is a `LabelInfo(name, id)` — the
# `id` is the catalog id allocated by the engine, not a count.
# (Renamed from a JSON tuple `["Person", 0]` in nexus-server 1.15+,
# see https://github.com/hivellm/nexus/issues/2.)
labels = await client.list_labels()
for label in labels.labels:
    print(f"  {label.name} (id={label.id})")

# Create a relationship type
response = await client.create_rel_type("KNOWS")

# List all relationship types. Each entry is a `RelTypeInfo`.
types = await client.list_rel_types()
for rel_type in types.types:
    print(f"  {rel_type.name} (id={rel_type.id})")

Query Builder

from nexus_sdk import QueryBuilder

# Build queries type-safely
query = (
    QueryBuilder()
    .match_("(n:Person)")
    .where_("n.age > $min_age")
    .return_("n.name, n.age")
    .order_by("n.age DESC")
    .limit(10)
    .param("min_age", 25)
    .build()
)

result = await client.execute_cypher(query.query, query.params)

Batch Operations

# Batch create nodes
nodes = [
    {"labels": ["Person"], "properties": {"name": f"Person{i}", "age": 20 + i}}
    for i in range(10)
]
batch_response = await client.batch_create_nodes(nodes)
print(f"Created {len(batch_response.node_ids)} nodes")

# Batch create relationships
relationships = [
    {
        "source_id": node_ids[i],
        "target_id": node_ids[i + 1],
        "rel_type": "KNOWS",
        "properties": {"since": 2020},
    }
    for i in range(len(node_ids) - 1)
]
rel_batch = await client.batch_create_relationships(relationships)

Performance Monitoring

# Get query statistics
stats = await client.get_query_statistics()
print(f"Total queries: {stats.statistics.total_queries}")
print(f"Average time: {stats.statistics.average_execution_time_ms}ms")

# Get slow queries
slow_queries = await client.get_slow_queries()
for query in slow_queries.queries:
    print(f"Slow query: {query.query} ({query.execution_time_ms}ms)")

# Get plan cache statistics
cache_stats = await client.get_plan_cache_statistics()
print(f"Hit rate: {cache_stats.hit_rate:.2%}")

# Clear plan cache
await client.clear_plan_cache()

Advanced Transactions

from nexus_sdk import Transaction

# Begin transaction with Transaction class
tx = await client.begin_transaction()
print(f"Transaction active: {tx.is_active()}")
print(f"Status: {tx.status()}")

# Execute queries within transaction
result = await tx.execute("CREATE (n:Person {name: $name}) RETURN n", {"name": "Alice"})

# Commit or rollback
await tx.commit()  # or tx.rollback()

Multi-Database Support

# List all databases
databases = await client.list_databases()
print(f"Available databases: {databases.databases}")
print(f"Default database: {databases.default_database}")

# Create a new database
create_result = await client.create_database("mydb")
print(f"Created: {create_result.name}")

# Switch to the new database
switch_result = await client.switch_database("mydb")
print(f"Switched to: mydb")

# Get current database
current_db = await client.get_current_database()
print(f"Current database: {current_db}")

# Create data in the current database
result = await client.execute_cypher(
    "CREATE (n:Product {name: $name}) RETURN n",
    {"name": "Laptop"}
)

# Get database information
db_info = await client.get_database("mydb")
print(f"Nodes: {db_info.node_count}, Relationships: {db_info.relationship_count}")

# Drop database (must not be current database)
await client.switch_database("neo4j")  # Switch away first
drop_result = await client.drop_database("mydb")

# Or connect directly to a specific database
async with NexusClient("http://localhost:15474", database="mydb") as client:
    # All operations will use 'mydb'
    result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10", None)

Using Context Manager

async with NexusClient("http://localhost:15474") as client:
    result = await client.execute_cypher("MATCH (n) RETURN n LIMIT 10", None)
    print(f"Found {len(result.rows)} rows")

High Availability with Replication

Nexus supports master-replica replication for high availability and read scaling. Use the master for all write operations and replicas for read operations.

import asyncio
from nexus_sdk import NexusClient

class NexusCluster:
    """Client for Nexus cluster with master-replica topology."""

    def __init__(self, master_url: str, replica_urls: list[str]):
        """
        Initialize cluster client.

        Args:
            master_url: URL of the master node (for writes)
            replica_urls: List of replica URLs (for reads)
        """
        self.master = NexusClient(master_url)
        self.replicas = [NexusClient(url) for url in replica_urls]
        self._replica_index = 0

    def _get_replica(self) -> NexusClient:
        """Round-robin replica selection."""
        if not self.replicas:
            return self.master
        replica = self.replicas[self._replica_index]
        self._replica_index = (self._replica_index + 1) % len(self.replicas)
        return replica

    async def write(self, query: str, params: dict = None):
        """Execute write query on master."""
        return await self.master.execute_cypher(query, params)

    async def read(self, query: str, params: dict = None):
        """Execute read query on replica (round-robin)."""
        return await self._get_replica().execute_cypher(query, params)

    async def close(self):
        """Close all connections."""
        await self.master.close()
        for replica in self.replicas:
            await replica.close()

async def main():
    # Connect to cluster
    # Master handles all writes, replicas handle reads
    cluster = NexusCluster(
        master_url="http://master:15474",
        replica_urls=[
            "http://replica1:15474",
            "http://replica2:15474",
        ]
    )

    # Write operations go to master
    await cluster.write(
        "CREATE (n:Person {name: $name, age: $age}) RETURN n",
        {"name": "Alice", "age": 30}
    )

    # Read operations are distributed across replicas
    result = await cluster.read(
        "MATCH (n:Person) WHERE n.age > $min_age RETURN n",
        {"min_age": 25}
    )
    print(f"Found {len(result.rows)} persons")

    # High-volume reads are load-balanced
    for i in range(100):
        result = await cluster.read("MATCH (n) RETURN count(n) as total", None)

    await cluster.close()

asyncio.run(main())

Replication Architecture

┌─────────────────────────────────────────────────────────────┐
│                      Application                             │
│   ┌─────────────────────────────────────────────────────┐   │
│   │              NexusCluster Client                     │   │
│   │   write() ──────────┐     read() ───────────────┐   │   │
│   └─────────────────────┼───────────────────────────┼───┘   │
└─────────────────────────┼───────────────────────────┼───────┘
                          │                           │
                          ▼                           ▼
              ┌───────────────────┐     ┌─────────────────────┐
              │      MASTER       │     │      REPLICAS       │
              │   (writes only)   │────▶│   (reads only)      │
              │                   │ WAL │  ┌───────────────┐  │
              │ • CREATE          │────▶│  │   Replica 1   │  │
              │ • UPDATE          │     │  └───────────────┘  │
              │ • DELETE          │     │  ┌───────────────┐  │
              │ • MERGE           │────▶│  │   Replica 2   │  │
              └───────────────────┘     │  └───────────────┘  │
                                        └─────────────────────┘

Starting a Nexus Cluster

# Start master node
NEXUS_REPLICATION_ROLE=master \
NEXUS_REPLICATION_BIND_ADDR=0.0.0.0:15475 \
./nexus-server

# Start replica 1
NEXUS_REPLICATION_ROLE=replica \
NEXUS_REPLICATION_MASTER_ADDR=master:15475 \
./nexus-server

# Start replica 2
NEXUS_REPLICATION_ROLE=replica \
NEXUS_REPLICATION_MASTER_ADDR=master:15475 \
./nexus-server

Monitoring Replication Status

import httpx

async def check_replication_status(master_url: str):
    async with httpx.AsyncClient() as client:
        # Check master status
        response = await client.get(f"{master_url}/replication/status")
        status = response.json()
        print(f"Role: {status['role']}")
        print(f"Running: {status['running']}")
        print(f"Connected replicas: {status.get('replica_count', 0)}")

        # Get master stats
        response = await client.get(f"{master_url}/replication/master/stats")
        stats = response.json()
        print(f"Entries replicated: {stats['entries_replicated']}")
        print(f"Connected replicas: {stats['connected_replicas']}")

        # List replicas
        response = await client.get(f"{master_url}/replication/replicas")
        replicas = response.json()
        for replica in replicas['replicas']:
            print(f"  - {replica['id']}: lag={replica['lag']}, healthy={replica['healthy']}")

Features

  • ✅ Cypher query execution
  • ✅ Database statistics
  • ✅ Health check
  • ✅ Node CRUD operations (Create, Read, Update, Delete)
  • ✅ Relationship CRUD operations (Create, Update, Delete)
  • ✅ Schema management (Labels, Relationship Types)
  • ✅ Transaction support (BEGIN, COMMIT, ROLLBACK)
  • Batch operations (batch create nodes/relationships)
  • Performance monitoring (query statistics, slow queries, plan cache)
  • Query Builder (type-safe Cypher query construction)
  • Advanced Transaction (Transaction class with state management)
  • Multi-database support (create, list, switch, drop databases)
  • ✅ Proper error handling
  • ✅ Type-safe models with Pydantic
  • ✅ Async/await support

Dependencies

Install from PyPI:

pip install hivehub-nexus-sdk

Or install from source:

pip install -r requirements.txt

Core Dependencies

  • httpx>=0.24.0 - Modern HTTP client
  • pydantic>=2.0.0 - Data validation

Development Dependencies

pip install -r requirements-dev.txt

Examples

See the examples/ directory for complete examples:

  • basic_usage.py - Basic operations with nodes, relationships, and schema
  • with_auth.py - Authentication examples
  • transactions.py - Advanced transaction management with Transaction class
  • query_builder.py - Query builder usage examples
  • batch_operations.py - Batch create operations
  • performance_monitoring.py - Performance monitoring examples
  • multi_database.py - Multi-database support examples

Run examples with:

python examples/basic_usage.py
python examples/with_auth.py
python examples/transactions.py
python examples/query_builder.py
python examples/batch_operations.py
python examples/performance_monitoring.py
python examples/multi_database.py

Testing

Run tests with:

# Install development dependencies
pip install -e ".[dev]"

# Run unit tests
pytest

# Run with coverage
pytest --cov=nexus_sdk --cov-report=html

License

Licensed under the Apache License, Version 2.0.

See LICENSE for details.