From 8252275334f3685721d3dece286a2696d32cc21c Mon Sep 17 00:00:00 2001 From: plagtech Date: Tue, 23 Jun 2026 18:53:44 -0700 Subject: [PATCH] Add Batch Payments Agent use case (Spraay x402 + AgentCore Payments) --- 02-use-cases/batch-payments-agent/.gitignore | 8 + 02-use-cases/batch-payments-agent/README.md | 350 ++++++++++++++ .../batch-payments-agent/agent/__init__.py | 1 + .../batch-payments-agent/agent/config.py | 110 +++++ .../batch-payments-agent/agent/main.py | 191 ++++++++ .../batch-payments-agent/agent/tools.py | 440 ++++++++++++++++++ .../batch-payments-agent/requirements.txt | 11 + .../scripts/cleanup_payments.py | 81 ++++ .../scripts/invoke_agent.py | 54 +++ .../scripts/setup_payments.py | 204 ++++++++ .../batch-payments-agent/tests/conftest.py | 73 +++ .../tests/test_402_handling.py | 132 ++++++ .../batch-payments-agent/tests/test_tools.py | 131 ++++++ 13 files changed, 1786 insertions(+) create mode 100644 02-use-cases/batch-payments-agent/.gitignore create mode 100644 02-use-cases/batch-payments-agent/README.md create mode 100644 02-use-cases/batch-payments-agent/agent/__init__.py create mode 100644 02-use-cases/batch-payments-agent/agent/config.py create mode 100644 02-use-cases/batch-payments-agent/agent/main.py create mode 100644 02-use-cases/batch-payments-agent/agent/tools.py create mode 100644 02-use-cases/batch-payments-agent/requirements.txt create mode 100644 02-use-cases/batch-payments-agent/scripts/cleanup_payments.py create mode 100644 02-use-cases/batch-payments-agent/scripts/invoke_agent.py create mode 100644 02-use-cases/batch-payments-agent/scripts/setup_payments.py create mode 100644 02-use-cases/batch-payments-agent/tests/conftest.py create mode 100644 02-use-cases/batch-payments-agent/tests/test_402_handling.py create mode 100644 02-use-cases/batch-payments-agent/tests/test_tools.py diff --git a/02-use-cases/batch-payments-agent/.gitignore b/02-use-cases/batch-payments-agent/.gitignore new file mode 100644 index 000000000..39288b210 --- /dev/null +++ b/02-use-cases/batch-payments-agent/.gitignore @@ -0,0 +1,8 @@ +__pycache__/ +*.pyc +.venv/ +.env +*.egg-info/ +dist/ +build/ +.pytest_cache/ diff --git a/02-use-cases/batch-payments-agent/README.md b/02-use-cases/batch-payments-agent/README.md new file mode 100644 index 000000000..f97246396 --- /dev/null +++ b/02-use-cases/batch-payments-agent/README.md @@ -0,0 +1,350 @@ +# Batch Payments Agent with Spraay x402 Gateway + +A production-ready agent that uses **Amazon Bedrock AgentCore Payments** to autonomously execute **multi-recipient batch payments** — paying N wallets in a single blockchain transaction through the [Spraay x402 gateway](https://spraay.app). + +Traditional on-chain transfers require one transaction per recipient. Spraay's batch payment infrastructure collapses that into a single atomic operation: one agent call, one x402 micropayment, N recipients funded. This is the primitive that payroll agents, airdrop agents, grant distribution agents, and refund agents all need — and it doesn't exist in AgentCore samples today. + +## Why Batch Payments Matter for Agents + +| Scenario | Without Batch | With Spraay Batch | +|---|---|---| +| **Payroll agent** pays 50 contractors | 50 transactions, 50 gas fees, 50 confirmations | 1 transaction, 1 gas fee, 1 confirmation | +| **Airdrop agent** rewards 200 users | 200 tx, minutes of wait, partial failure risk | 1 atomic tx, all-or-nothing settlement | +| **Refund agent** processes 30 returns | 30 individual sends, manual tracking | 1 batch call, single tx hash for audit | +| **Grant agent** distributes to 10 teams | 10 separate approvals and sends | 1 batch with per-recipient amounts | + +The agent pays a single x402 micropayment ($0.01–$0.05 USDC) to Spraay, which handles the on-chain batching, gas optimization, and atomic execution across 16 supported chains. + +## What This Demonstrates + +| Capability | How It's Used | +|---|---| +| **AgentCore Payments** | Autonomous x402 micropayments via Coinbase CDP or Stripe/Privy wallet | +| **AgentCore Gateway** | MCP tool discovery for Spraay's batch payment endpoints | +| **AgentCore Runtime** | Serverless agent hosting with session isolation | +| **AgentCore Observability** | End-to-end payment tracing via CloudWatch | +| **x402 Protocol** | HTTP 402 → payment → settlement → batch execution loop | +| **Coinbase x402 Bazaar** | Agent-driven service discovery of Spraay endpoints | + +## Architecture + +``` +┌─────────────────────────────────────────────────────────────┐ +│ Amazon Bedrock AgentCore │ +│ ┌──────────────┐ ┌──────────────┐ ┌──────────────────┐ │ +│ │ Gateway │ │ Runtime │ │ Payments │ │ +│ │ (MCP Tools) │──│ (Strands) │──│ (ProcessPayment)│ │ +│ └──────┬───────┘ └──────┬───────┘ └────────┬─────────┘ │ +│ │ │ │ │ +│ │ ┌──────┴───────┐ │ │ +│ │ │ Strands Agent │ │ │ +│ │ │ Claude 4.5 │ │ │ +│ │ └──────┬───────┘ │ │ +└─────────┼─────────────────┼────────────────────┼─────────────┘ + │ │ │ + │ ┌──────────▼──────────┐ │ + │ │ Spraay Gateway │ │ + │ │ gateway.spraay.app │ │ + │ │ │ │ + │ │ ┌───────────────┐ │ │ + │ │ │ 402 Response │◄─┼─────────┘ + │ │ │ + x402 header │ │ (USDC micropayment on Base) + │ │ └───────┬───────┘ │ + │ │ │ │ + │ │ ┌───────▼───────┐ │ + │ │ │ Batch Execute │ │ + │ │ │ (N recipients │ │ + │ │ │ in 1 tx) │ │ + │ │ └───────────────┘ │ + │ └─────────────────────┘ + │ + ┌──────▼──────────────┐ + │ Coinbase x402 │ + │ Bazaar (Discovery) │ + └─────────────────────┘ +``` + +### Payment Flow + +1. User tells agent to pay multiple recipients (e.g., "Pay these 5 wallets 0.01 ETH each on Base") +2. Agent calls Spraay's batch payment endpoint with recipient list +3. Spraay returns HTTP 402 with x402 payment header ($0.01–$0.05 USDC service fee) +4. AgentCore Payments checks session budget, signs USDC transaction on Base +5. Agent retries with payment proof header +6. Spraay verifies payment, executes the batch transfer as a single on-chain transaction +7. Agent receives the transaction hash and per-recipient confirmation + +### Batch Payment Contract + +Spraay's batch contract on Base (`0x1646452F98E36A3c9Cfc3eDD8868221E207B5eEC`) handles: +- Multi-recipient ETH and ERC-20 transfers in one atomic transaction +- Gas optimization (one base fee instead of N) +- All-or-nothing execution (no partial failures) +- On-chain verifiability via single transaction hash + +## Prerequisites + +- **AWS Account** with Amazon Bedrock AgentCore access (preview regions: us-east-1, us-west-2, eu-central-1, ap-southeast-2) +- **Coinbase Developer Platform** API keys — [portal.cdp.coinbase.com](https://portal.cdp.coinbase.com/) +- **Python 3.12+** +- **AWS CLI** configured (`aws configure`) +- **AgentCore CLI** installed (`npm install -g @aws/agentcore`) +- **Docker** (for AgentCore Runtime deployment) +- **USDC on Base Sepolia** — fund via [faucet.circle.com](https://faucet.circle.com/) + +## Quick Start + +### 1. Clone and Install + +```bash +git clone https://github.com/awslabs/amazon-bedrock-agentcore-samples.git +cd amazon-bedrock-agentcore-samples/02-use-cases/batch-payments-agent + +python -m venv .venv +source .venv/bin/activate # Windows: .venv\Scripts\activate +pip install -r requirements.txt +``` + +### 2. Configure Environment + +```bash +cp .env.example .env +# Edit .env with your configuration +``` + +Required variables: + +| Variable | Description | +|---|---| +| `AWS_REGION` | AWS region (e.g., `us-east-1`) | +| `PAYMENT_MANAGER_ARN` | ARN from CreatePaymentManager | +| `PAYMENT_CONNECTOR_ID` | ID from CreatePaymentConnector | +| `PAYMENT_INSTRUMENT_ID` | ID from CreatePaymentInstrument | +| `SPRAAY_GATEWAY_URL` | `https://gateway.spraay.app` | +| `MODEL_ID` | Bedrock model (default: `us.anthropic.claude-sonnet-4-5-20250514-v1:0`) | + +### 3. Set Up AgentCore Payments + +Create the payment resources (one-time setup): + +```bash +python scripts/setup_payments.py +``` + +This script creates: +- PaymentManager with IAM authorizer +- PaymentConnector for Coinbase CDP +- PaymentInstrument (agent wallet) +- PaymentSession with spending limits + +### 4. Run Locally + +```bash +# Interactive mode +python -m agent.main + +# Single prompt +python -m agent.main --prompt "Send 0.001 ETH to 3 wallets on Base" + +# AgentCore CLI local dev +agentcore dev +``` + +### 5. Deploy to AgentCore Runtime + +```bash +agentcore deploy +``` + +## Sample Prompts + +### Batch Payments (Core Use Case) +``` +Send 0.001 ETH to each of these three wallets on Base: +0xAbc...123, 0xDef...456, 0x789...abc +``` + +``` +Distribute 100 USDC equally among these 5 team wallets on Ethereum. +``` + +``` +Process this payroll batch: Alice 0.5 ETH, Bob 0.3 ETH, Carol 0.2 ETH on Base. +``` + +### Cost Estimation +``` +How much would it cost to batch-pay 20 recipients on Base? +``` + +### Discovery +``` +What batch payment services are available? What chains are supported? +``` + +### Supporting Operations +``` +What's the current price of ETH? +Check the USDC balance of 0xAbc...123 on Base. +``` + +## Project Structure + +``` +batch-payments-agent/ +├── agent/ +│ ├── __init__.py +│ ├── main.py # Agent entry point with AgentCore Payments plugin +│ ├── tools.py # Spraay batch payment and supporting tools +│ └── config.py # Configuration management +├── scripts/ +│ ├── setup_payments.py # One-time AgentCore Payments setup +│ ├── invoke_agent.py # Test deployed agent +│ └── cleanup_payments.py # Tear down payment resources +├── tests/ +│ ├── test_402_handling.py # x402 payment flow tests +│ ├── test_tools.py # Tool unit tests +│ └── conftest.py # Test fixtures +├── .env.example # Environment template +├── requirements.txt # Python dependencies +├── Dockerfile # Container for AgentCore Runtime +└── README.md # This file +``` + +## Spraay x402 Gateway + +[Spraay](https://spraay.app) is a live x402 payment gateway whose primary capability is **batch payments — pay N recipients in one transaction**. It provides 170+ paid endpoints across 37 categories and 16 blockchain networks, and is indexed on the [Coinbase x402 Bazaar](https://docs.cdp.coinbase.com/x402/bazaar) for autonomous agent discovery. + +### Batch Payment Pricing + +| Operation | Price (USDC) | What You Get | +|---|---|---| +| Batch ETH transfer (N recipients) | $0.01–$0.05 | Single atomic tx, all recipients funded | +| Batch ERC-20 transfer (N recipients) | $0.01–$0.05 | Single atomic tx with token approvals | +| Batch payroll (N recipients, mixed amounts) | $0.05–$0.25 | Payroll-optimized with receipt data | +| Escrow batch | $0.05–$0.25 | Escrow creation for multiple parties | + +### Supported Chains + +Primary: **Base, Ethereum, Solana** + +Also: Arbitrum, Optimism, Polygon, Avalanche, BSC, Fantom, Gnosis, Celo, Linea, Scroll, zkSync, Canton Network, and more (16 total). + +## Key Implementation Details + +### AgentCore Payments Plugin (Strands) + +The agent uses the built-in `AgentCorePaymentsPlugin` for seamless x402 payment handling: + +```python +from strands import Agent +from strands.models import BedrockModel +from bedrock_agentcore.payments.integrations.strands.plugin import ( + AgentCorePaymentsPlugin, +) +from bedrock_agentcore.payments.integrations.config import ( + AgentCorePaymentsPluginConfig, +) + +config = AgentCorePaymentsPluginConfig( + payment_manager_arn=PAYMENT_MANAGER_ARN, + user_id="batch-agent-user", + payment_instrument_id=PAYMENT_INSTRUMENT_ID, + payment_session_id=PAYMENT_SESSION_ID, + region=AWS_REGION, +) + +plugin = AgentCorePaymentsPlugin(config) + +agent = Agent( + model=BedrockModel(model_id=MODEL_ID, region=AWS_REGION), + tools=[batch_transfer, estimate_batch_cost, get_supported_chains], + plugins=[plugin], + system_prompt=SYSTEM_PROMPT, +) +``` + +### The x402 Loop for Batch Payments + +When the agent calls Spraay's batch endpoint: + +1. Spraay returns `HTTP 402` with `PAYMENT-REQUIRED` header (base64 JSON) +2. Header contains: scheme, network (`eip155:8453` for Base), asset (USDC), recipient (Spraay), amount ($0.01–$0.05), timeout +3. `AgentCorePaymentsPlugin` intercepts, calls `ProcessPayment` via AgentCore +4. AgentCore signs a USDC transfer on Base — agent never touches private keys +5. Agent retries with `X-PAYMENT` proof header +6. Spraay verifies via x402 facilitator, then executes the batch transfer on-chain +7. Agent receives: transaction hash, per-recipient status, gas used + +### Budget Controls + +```python +session = payments_client.create_payment_session( + paymentManagerId=manager_id, + paymentInstrumentId=instrument_id, + maxSpendAmount="1.00", # $1.00 USDC max per session + currency="USDC", + expiresAt=expiry_time, +) +``` + +At $0.01–$0.05 per batch call, a $1.00 session budget supports 20–100 batch operations. + +## Testing + +```bash +# Unit tests +pytest tests/ -v + +# Test 402 handling +pytest tests/test_402_handling.py -v + +# Test with live Spraay endpoints (requires funded wallet) +pytest tests/ -v -m integration +``` + +## Clean Up + +```bash +# Delete AgentCore Runtime +agentcore runtime delete --agent-name batch-payments-agent + +# Delete payment resources +python scripts/cleanup_payments.py + +# Or manually via AWS CLI: +aws bedrock-agentcore delete-payment-session \ + --payment-manager-id \ + --payment-session-id + +aws bedrock-agentcore delete-payment-instrument \ + --payment-manager-id \ + --payment-instrument-id + +aws bedrock-agentcore delete-payment-connector \ + --payment-manager-id \ + --payment-connector-id + +aws bedrock-agentcore delete-payment-manager \ + --payment-manager-id +``` + +Also delete any CloudWatch log groups created during testing. + +## References + +- [Amazon Bedrock AgentCore Payments Documentation](https://docs.aws.amazon.com/bedrock-agentcore/latest/devguide/payments.html) +- [AgentCore Payments Technical Deep Dive](https://aws.amazon.com/blogs/machine-learning/technical-deep-dive-agentcore-payments-and-innovation-in-agentic-commerce/) +- [x402 Protocol Specification](https://github.com/coinbase/x402/tree/main/specs) +- [Coinbase x402 Bazaar](https://docs.cdp.coinbase.com/x402/bazaar) +- [Spraay Documentation](https://docs.spraay.app) +- [Spraay BPA 1.0 Specification](https://docs.spraay.app/bpa/1.0/) +- [Strands Agents Documentation](https://strandsagents.com/latest/documentation/docs/) + +## Security + +See [CONTRIBUTING](../../CONTRIBUTING.md) for more information. + +## License + +This library is licensed under the MIT-0 License. See the LICENSE file. diff --git a/02-use-cases/batch-payments-agent/agent/__init__.py b/02-use-cases/batch-payments-agent/agent/__init__.py new file mode 100644 index 000000000..b375056af --- /dev/null +++ b/02-use-cases/batch-payments-agent/agent/__init__.py @@ -0,0 +1 @@ +"""DeFi Payments Agent — AgentCore Payments + Spraay x402 Gateway.""" diff --git a/02-use-cases/batch-payments-agent/agent/config.py b/02-use-cases/batch-payments-agent/agent/config.py new file mode 100644 index 000000000..8cb2ef254 --- /dev/null +++ b/02-use-cases/batch-payments-agent/agent/config.py @@ -0,0 +1,110 @@ +"""Configuration management for the DeFi Payments Agent.""" + +import os +from dataclasses import dataclass, field +from dotenv import load_dotenv + +load_dotenv() + + +@dataclass +class AgentConfig: + """Agent configuration loaded from environment variables.""" + + # AWS + aws_region: str = field( + default_factory=lambda: os.getenv("AWS_REGION", "us-east-1") + ) + + # Bedrock Model + model_id: str = field( + default_factory=lambda: os.getenv( + "MODEL_ID", "us.anthropic.claude-sonnet-4-5-20250514-v1:0" + ) + ) + + # AgentCore Payments + payment_manager_arn: str = field( + default_factory=lambda: os.getenv("PAYMENT_MANAGER_ARN", "") + ) + payment_connector_id: str = field( + default_factory=lambda: os.getenv("PAYMENT_CONNECTOR_ID", "") + ) + payment_instrument_id: str = field( + default_factory=lambda: os.getenv("PAYMENT_INSTRUMENT_ID", "") + ) + payment_session_id: str = field( + default_factory=lambda: os.getenv("PAYMENT_SESSION_ID", "") + ) + payment_user_id: str = field( + default_factory=lambda: os.getenv("PAYMENT_USER_ID", "batch-agent-user") + ) + + # Spraay Gateway + spraay_gateway_url: str = field( + default_factory=lambda: os.getenv( + "SPRAAY_GATEWAY_URL", "https://gateway.spraay.app" + ) + ) + + # Session budget + max_spend_amount: str = field( + default_factory=lambda: os.getenv("MAX_SPEND_AMOUNT", "1.00") + ) + spend_currency: str = field( + default_factory=lambda: os.getenv("SPEND_CURRENCY", "USDC") + ) + + # Debug + debug: bool = field( + default_factory=lambda: os.getenv("DEBUG", "false").lower() == "true" + ) + + def validate(self) -> None: + """Validate that required configuration is present.""" + required = { + "PAYMENT_MANAGER_ARN": self.payment_manager_arn, + "PAYMENT_INSTRUMENT_ID": self.payment_instrument_id, + } + missing = [k for k, v in required.items() if not v] + if missing: + raise ValueError( + f"Missing required environment variables: {', '.join(missing)}. " + f"Run 'python scripts/setup_payments.py' to configure." + ) + + +# System prompt for the Batch Payments Agent +SYSTEM_PROMPT = """You are a Batch Payments Agent powered by Amazon Bedrock AgentCore. + +Your primary capability is executing multi-recipient batch payments — sending +tokens to N wallets in a single atomic blockchain transaction through the +Spraay x402 gateway. This is your core value: where a normal transfer requires +one transaction per recipient, you collapse it into one. + +Use cases you excel at: +- **Payroll**: Pay 50 contractors in one transaction instead of 50 +- **Airdrops**: Distribute tokens to hundreds of wallets atomically +- **Refunds**: Process batch refunds with a single tx hash for audit +- **Grants**: Distribute funds to multiple teams with per-recipient amounts +- **Rewards**: Send incentive payments to a list of participants + +Supporting capabilities (also via Spraay x402): +- Token pricing: real-time price feeds for any token +- Wallet queries: balance checks, nonce lookups +- Chain info: supported networks and their capabilities + +Spraay's batch contract executes all recipients atomically — all succeed or all +fail, no partial transfers. Supported on Base, Ethereum, Solana, and 13 other +chains. + +When you call a paid endpoint and receive an HTTP 402 response: +1. Extract the x402 payment details from the response +2. Use AgentCore Payments to sign and submit the USDC micropayment +3. Retry the request with the payment proof +4. Return the batch transaction result to the user + +Always tell the user the estimated service fee before executing. Confirm +recipient addresses and amounts before submitting a batch. Report the +transaction hash when complete. +""" diff --git a/02-use-cases/batch-payments-agent/agent/main.py b/02-use-cases/batch-payments-agent/agent/main.py new file mode 100644 index 000000000..b64c76253 --- /dev/null +++ b/02-use-cases/batch-payments-agent/agent/main.py @@ -0,0 +1,191 @@ +"""Batch Payments Agent — main entry point. + +A Strands agent that uses Amazon Bedrock AgentCore Payments to autonomously +execute multi-recipient batch payments through the Spraay x402 gateway. +Pay N wallets in one atomic transaction instead of N separate transactions. + +Usage: + # Local interactive mode + python -m agent.main + + # Single prompt + python -m agent.main --prompt "Send 0.001 ETH to 3 wallets on Base" + + # Deploy to AgentCore Runtime + agentcore deploy +""" + +import argparse +import logging +import sys + +from strands import Agent +from strands.models import BedrockModel + +from agent.config import AgentConfig, SYSTEM_PROMPT +from agent.tools import ( + # Primary — batch payments + batch_transfer, + batch_transfer_with_payment, + estimate_batch_cost, + # Supporting — discovery, pricing, chains + discover_spraay_services, + request_spraay_endpoint, + request_spraay_endpoint_with_payment, + get_supported_chains, + estimate_spraay_cost, +) + +logger = logging.getLogger(__name__) + + +def create_agent(config: AgentConfig | None = None) -> Agent: + """Create and configure the DeFi Payments Agent. + + Args: + config: Agent configuration. If None, loads from environment. + + Returns: + Configured Strands Agent with AgentCore Payments plugin. + """ + if config is None: + config = AgentConfig() + config.validate() + + # Import AgentCore Payments plugin + from bedrock_agentcore.payments.integrations.strands.plugin import ( + AgentCorePaymentsPlugin, + ) + from bedrock_agentcore.payments.integrations.config import ( + AgentCorePaymentsPluginConfig, + ) + + # Configure the payments plugin + payments_config = AgentCorePaymentsPluginConfig( + payment_manager_arn=config.payment_manager_arn, + user_id=config.payment_user_id, + payment_instrument_id=config.payment_instrument_id, + payment_session_id=config.payment_session_id, + region=config.aws_region, + ) + payments_plugin = AgentCorePaymentsPlugin(payments_config) + + # Configure the Bedrock model + model = BedrockModel( + model_id=config.model_id, + region=config.aws_region, + ) + + # Create the agent with batch payment tools + supporting tools + agent = Agent( + model=model, + tools=[ + # Primary — batch payments + batch_transfer, + batch_transfer_with_payment, + estimate_batch_cost, + # Supporting — discovery, pricing, chains + discover_spraay_services, + request_spraay_endpoint, + request_spraay_endpoint_with_payment, + get_supported_chains, + estimate_spraay_cost, + ], + plugins=[payments_plugin], + system_prompt=SYSTEM_PROMPT, + ) + + logger.info( + "Batch Payments Agent created — model=%s, region=%s, gateway=%s", + config.model_id, + config.aws_region, + config.spraay_gateway_url, + ) + + return agent + + +def run_interactive(agent: Agent) -> None: + """Run the agent in interactive mode.""" + print("\n💧 Batch Payments Agent (Spraay x402 + AgentCore Payments)") + print("=" * 60) + print("Pay N recipients in one transaction. Type 'quit' to exit.\n") + + while True: + try: + user_input = input("You: ").strip() + if not user_input: + continue + if user_input.lower() in ("quit", "exit", "q"): + print("\nGoodbye!") + break + + response = agent(user_input) + print(f"\nAgent: {response}\n") + + except KeyboardInterrupt: + print("\n\nInterrupted. Goodbye!") + break + except Exception as e: + logger.error("Agent error: %s", e, exc_info=True) + print(f"\nError: {e}\n") + + +def main() -> None: + """Entry point for the DeFi Payments Agent.""" + parser = argparse.ArgumentParser( + description="Batch Payments Agent — AgentCore Payments + Spraay x402" + ) + parser.add_argument( + "--prompt", + type=str, + help="Single prompt to run (non-interactive mode)", + ) + parser.add_argument( + "--debug", + action="store_true", + help="Enable debug logging", + ) + args = parser.parse_args() + + # Configure logging + log_level = logging.DEBUG if args.debug else logging.INFO + logging.basicConfig( + level=log_level, + format="%(asctime)s [%(levelname)s] %(name)s: %(message)s", + ) + + # Create the agent + config = AgentConfig() + if args.debug: + config.debug = True + + agent = create_agent(config) + + # Run in single-prompt or interactive mode + if args.prompt: + response = agent(args.prompt) + print(response) + else: + run_interactive(agent) + + +# AgentCore Runtime entry point +def handler(event: dict, context: dict) -> dict: + """AWS Lambda / AgentCore Runtime handler. + + This is the entry point when deployed to AgentCore Runtime. + """ + config = AgentConfig() + agent = create_agent(config) + + prompt = event.get("prompt", event.get("input", "")) + if not prompt: + return {"error": "No prompt provided"} + + response = agent(prompt) + return {"response": str(response)} + + +if __name__ == "__main__": + main() diff --git a/02-use-cases/batch-payments-agent/agent/tools.py b/02-use-cases/batch-payments-agent/agent/tools.py new file mode 100644 index 000000000..e26cb617b --- /dev/null +++ b/02-use-cases/batch-payments-agent/agent/tools.py @@ -0,0 +1,440 @@ +"""Spraay x402 batch payment tools for AgentCore agents. + +Primary tools for multi-recipient batch payments — the core capability. +Supporting tools for discovery, pricing, and chain info. + +x402 payment flow: +1. Call a paid endpoint → receive HTTP 402 with payment details +2. AgentCore Payments plugin handles the micropayment automatically +3. Retry with payment proof → receive the batch transaction result +""" + +import json +import logging +from typing import Any + +import httpx +from strands.types.tools import tool + +from agent.config import AgentConfig + +logger = logging.getLogger(__name__) +config = AgentConfig() + + +# --------------------------------------------------------------------------- +# PRIMARY TOOLS — Batch Payments +# --------------------------------------------------------------------------- + + +@tool +def batch_transfer( + chain: str, + token: str, + recipients: str, +) -> dict[str, Any]: + """Execute a multi-recipient batch payment in a single blockchain transaction. + + This is the core capability: pay N wallets atomically in one tx instead of N + separate transactions. All recipients succeed or all fail — no partial transfers. + + Spraay's batch contract handles gas optimization and atomic execution. + The agent pays a single x402 micropayment ($0.01–$0.05 USDC) as a service fee. + + Args: + chain: Target blockchain (e.g., 'base', 'ethereum', 'arbitrum'). + token: Token to send ('ETH', 'USDC', or any ERC-20 contract address). + recipients: JSON array of recipient objects, each with 'address' and 'amount'. + Example: '[{"address": "0xAbc...", "amount": "0.001"}, ...]' + + Returns: + On 402: x402 payment details for AgentCore Payments to process. + On success: transaction hash, per-recipient status, gas used. + """ + url = f"{config.spraay_gateway_url}/api/v1/batch/transfer" + headers = {"Content-Type": "application/json"} + + parsed_recipients = json.loads(recipients) if isinstance(recipients, str) else recipients + + body = { + "chain": chain.lower(), + "token": token, + "recipients": parsed_recipients, + } + + try: + response = httpx.post(url, json=body, headers=headers, timeout=60) + + if response.status_code == 402: + payment_header = response.headers.get( + "PAYMENT-REQUIRED", + response.headers.get("X-Payment-Required", ""), + ) + recipient_count = len(parsed_recipients) + return { + "status": "payment_required", + "http_status": 402, + "x402_payload": payment_header, + "url": url, + "recipient_count": recipient_count, + "message": ( + f"Batch payment for {recipient_count} recipients requires an " + f"x402 micropayment. Use AgentCore Payments to complete the " + f"transaction, then retry with the payment proof header." + ), + } + + if response.status_code == 200: + return {"status": "success", "data": response.json()} + + return { + "status": "error", + "http_status": response.status_code, + "message": response.text, + } + + except httpx.RequestError as e: + return {"status": "error", "message": str(e)} + + +@tool +def batch_transfer_with_payment( + chain: str, + token: str, + recipients: str, + payment_proof: str, +) -> dict[str, Any]: + """Retry a batch transfer with x402 payment proof after payment is complete. + + Call this after AgentCore Payments has processed the micropayment from + the initial batch_transfer call. + + Args: + chain: Target blockchain (e.g., 'base', 'ethereum'). + token: Token to send ('ETH', 'USDC', etc.). + recipients: JSON array of recipient objects (same as batch_transfer). + payment_proof: Payment proof string from AgentCore Payments. + + Returns: + Transaction hash, per-recipient confirmation, gas used. + """ + url = f"{config.spraay_gateway_url}/api/v1/batch/transfer" + headers = { + "Content-Type": "application/json", + "X-PAYMENT": payment_proof, + } + + parsed_recipients = json.loads(recipients) if isinstance(recipients, str) else recipients + body = { + "chain": chain.lower(), + "token": token, + "recipients": parsed_recipients, + } + + try: + response = httpx.post(url, json=body, headers=headers, timeout=60) + + if response.status_code == 200: + return {"status": "success", "data": response.json()} + + return { + "status": "error", + "http_status": response.status_code, + "message": response.text, + } + + except httpx.RequestError as e: + return {"status": "error", "message": str(e)} + + +@tool +def estimate_batch_cost( + recipient_count: int, + operation: str = "transfer", +) -> dict[str, Any]: + """Estimate the x402 service fee for a batch operation. + + This is the Spraay service fee paid via x402, not the on-chain gas cost. + Gas is included in the batch execution and optimized by Spraay's contract. + + Args: + recipient_count: Number of recipients in the batch. + operation: Type of batch ('transfer', 'payroll', 'escrow'). + + Returns: + Estimated service fee in USDC and per-recipient breakdown. + """ + pricing = { + "transfer": {"base": 0.01, "per_recipient": 0.001, "max": 0.05}, + "payroll": {"base": 0.05, "per_recipient": 0.002, "max": 0.25}, + "escrow": {"base": 0.05, "per_recipient": 0.003, "max": 0.25}, + } + + op = operation.lower() + if op not in pricing: + return { + "status": "error", + "message": f"Unknown operation: {operation}", + "available_operations": list(pricing.keys()), + } + + tier = pricing[op] + estimated = min( + tier["base"] + (tier["per_recipient"] * recipient_count), + tier["max"], + ) + + return { + "status": "success", + "operation": operation, + "recipient_count": recipient_count, + "estimated_fee": { + "amount": round(estimated, 4), + "currency": "USDC", + "note": "x402 service fee — on-chain gas is included by Spraay", + }, + "comparison": { + "individual_txs": recipient_count, + "batch_txs": 1, + "savings": f"{recipient_count - 1} fewer transactions", + }, + } + + +# --------------------------------------------------------------------------- +# SUPPORTING TOOLS — Discovery, Pricing, Chain Info +# --------------------------------------------------------------------------- + + +@tool +def discover_spraay_services() -> dict[str, Any]: + """Discover available paid services from the Spraay x402 gateway. + + Returns a list of available endpoint categories, pricing, and supported + blockchain networks. Use this to find out what services are available + before calling specific endpoints. + """ + try: + response = httpx.get( + f"{config.spraay_gateway_url}/api/v1/categories", + timeout=30, + ) + if response.status_code == 200: + return { + "status": "success", + "categories": response.json(), + "gateway_url": config.spraay_gateway_url, + "note": "Use request_spraay_endpoint to call specific endpoints.", + } + return { + "status": "error", + "code": response.status_code, + "message": response.text, + } + except httpx.RequestError as e: + return {"status": "error", "message": str(e)} + + +@tool +def request_spraay_endpoint( + method: str, + path: str, + body: str = "", +) -> dict[str, Any]: + """Call a Spraay x402 gateway endpoint. Handles the x402 payment flow. + + If the endpoint returns HTTP 402, the response includes the x402 payment + details needed by AgentCore Payments to complete the transaction. The + AgentCore Payments plugin will automatically handle payment and retry. + + Args: + method: HTTP method (GET or POST). + path: API path (e.g., '/api/v1/batch/transfer'). + body: JSON request body for POST requests. + + Returns: + The endpoint response, or x402 payment details if payment is required. + """ + url = f"{config.spraay_gateway_url}{path}" + headers = {"Content-Type": "application/json"} + + try: + if method.upper() == "POST" and body: + parsed_body = json.loads(body) if isinstance(body, str) else body + response = httpx.post(url, json=parsed_body, headers=headers, timeout=60) + else: + response = httpx.get(url, headers=headers, timeout=60) + + # x402 Payment Required — return details for AgentCore Payments + if response.status_code == 402: + payment_header = response.headers.get( + "PAYMENT-REQUIRED", + response.headers.get("X-Payment-Required", ""), + ) + return { + "status": "payment_required", + "http_status": 402, + "x402_payload": payment_header, + "url": url, + "message": ( + "This endpoint requires an x402 micropayment. " + "Use AgentCore Payments to complete the transaction, " + "then retry with the payment proof header." + ), + } + + # Successful response + if response.status_code == 200: + try: + return { + "status": "success", + "data": response.json(), + } + except json.JSONDecodeError: + return { + "status": "success", + "data": response.text, + } + + return { + "status": "error", + "http_status": response.status_code, + "message": response.text, + } + + except httpx.RequestError as e: + return {"status": "error", "message": str(e)} + + +@tool +def request_spraay_endpoint_with_payment( + method: str, + path: str, + payment_proof: str, + body: str = "", +) -> dict[str, Any]: + """Retry a Spraay endpoint with x402 payment proof after payment is complete. + + Args: + method: HTTP method (GET or POST). + path: API path (e.g., '/api/v1/batch/transfer'). + payment_proof: The payment proof string from AgentCore Payments. + body: JSON request body for POST requests. + + Returns: + The endpoint response after payment verification. + """ + url = f"{config.spraay_gateway_url}{path}" + headers = { + "Content-Type": "application/json", + "X-PAYMENT": payment_proof, + } + + try: + if method.upper() == "POST" and body: + parsed_body = json.loads(body) if isinstance(body, str) else body + response = httpx.post(url, json=parsed_body, headers=headers, timeout=60) + else: + response = httpx.get(url, headers=headers, timeout=60) + + if response.status_code == 200: + try: + return {"status": "success", "data": response.json()} + except json.JSONDecodeError: + return {"status": "success", "data": response.text} + + return { + "status": "error", + "http_status": response.status_code, + "message": response.text, + } + + except httpx.RequestError as e: + return {"status": "error", "message": str(e)} + + +@tool +def get_supported_chains() -> dict[str, Any]: + """Get the list of blockchain networks supported by Spraay. + + Returns supported chains with their chain IDs and capabilities. + """ + # Spraay's known supported chains — primary + secondary + chains = { + "primary": [ + {"name": "Base", "chain_id": 8453, "native_token": "ETH"}, + {"name": "Ethereum", "chain_id": 1, "native_token": "ETH"}, + {"name": "Solana", "chain_id": "solana", "native_token": "SOL"}, + ], + "secondary": [ + {"name": "Arbitrum", "chain_id": 42161, "native_token": "ETH"}, + {"name": "Optimism", "chain_id": 10, "native_token": "ETH"}, + {"name": "Polygon", "chain_id": 137, "native_token": "MATIC"}, + {"name": "Avalanche", "chain_id": 43114, "native_token": "AVAX"}, + {"name": "BSC", "chain_id": 56, "native_token": "BNB"}, + {"name": "Fantom", "chain_id": 250, "native_token": "FTM"}, + {"name": "Gnosis", "chain_id": 100, "native_token": "xDAI"}, + {"name": "Celo", "chain_id": 42220, "native_token": "CELO"}, + {"name": "Linea", "chain_id": 59144, "native_token": "ETH"}, + {"name": "Scroll", "chain_id": 534352, "native_token": "ETH"}, + {"name": "zkSync", "chain_id": 324, "native_token": "ETH"}, + {"name": "Canton Network", "chain_id": "canton", "native_token": "Canton"}, + ], + "total_chains": 16, + "total_endpoints": 170, + "payment_network": "Base (USDC via x402)", + } + return {"status": "success", "chains": chains} + + +@tool +def estimate_spraay_cost( + endpoint_category: str, + num_calls: int = 1, +) -> dict[str, Any]: + """Estimate the cost of calling Spraay endpoints. + + Args: + endpoint_category: Category name (e.g., 'batch_payments', 'pricing', + 'wallet', 'defi', 'research', 'rpc'). + num_calls: Number of calls to estimate for. + + Returns: + Estimated cost in USDC. + """ + # Spraay's pricing tiers + pricing = { + "batch_payments": {"min": 0.01, "max": 0.05, "unit": "per batch tx"}, + "escrow": {"min": 0.05, "max": 0.25, "unit": "per escrow op"}, + "bridge": {"min": 0.05, "max": 0.25, "unit": "per bridge tx"}, + "payroll": {"min": 0.05, "max": 0.25, "unit": "per payroll batch"}, + "pricing": {"min": 0.001, "max": 0.005, "unit": "per price query"}, + "wallet": {"min": 0.001, "max": 0.005, "unit": "per wallet query"}, + "defi": {"min": 0.005, "max": 0.01, "unit": "per defi query"}, + "research": {"min": 0.005, "max": 0.01, "unit": "per search"}, + "rpc": {"min": 0.001, "max": 0.005, "unit": "per rpc call"}, + "oracle": {"min": 0.005, "max": 0.01, "unit": "per oracle query"}, + "ai_inference": {"min": 0.03, "max": 0.05, "unit": "per inference"}, + "compute_futures": {"min": 0.01, "max": 0.05, "unit": "per contract"}, + } + + category = endpoint_category.lower().replace(" ", "_") + if category not in pricing: + return { + "status": "error", + "message": f"Unknown category: {endpoint_category}", + "available_categories": list(pricing.keys()), + } + + tier = pricing[category] + return { + "status": "success", + "category": endpoint_category, + "price_range": f"${tier['min']:.3f} – ${tier['max']:.3f} {tier['unit']}", + "estimated_total": { + "min": round(tier["min"] * num_calls, 4), + "max": round(tier["max"] * num_calls, 4), + "currency": "USDC", + "num_calls": num_calls, + }, + } diff --git a/02-use-cases/batch-payments-agent/requirements.txt b/02-use-cases/batch-payments-agent/requirements.txt new file mode 100644 index 000000000..f30452a74 --- /dev/null +++ b/02-use-cases/batch-payments-agent/requirements.txt @@ -0,0 +1,11 @@ +strands-agents>=0.3.0 +strands-agents-bedrock>=0.1.0 +bedrock-agentcore>=0.1.0 +httpx>=0.27.0 +python-dotenv>=1.0.0 +boto3>=1.35.0 + +# Development +pytest>=8.0.0 +pytest-asyncio>=0.23.0 +moto>=5.0.0 diff --git a/02-use-cases/batch-payments-agent/scripts/cleanup_payments.py b/02-use-cases/batch-payments-agent/scripts/cleanup_payments.py new file mode 100644 index 000000000..8e97b663b --- /dev/null +++ b/02-use-cases/batch-payments-agent/scripts/cleanup_payments.py @@ -0,0 +1,81 @@ +"""Clean up AgentCore Payments resources. + +Deletes all payment resources created by setup_payments.py. + +Usage: + python scripts/cleanup_payments.py +""" + +import os +import sys + +import boto3 +from dotenv import load_dotenv + +load_dotenv() + +REGION = os.getenv("AWS_REGION", "us-east-1") + + +def main(): + manager_arn = os.getenv("PAYMENT_MANAGER_ARN", "") + connector_id = os.getenv("PAYMENT_CONNECTOR_ID", "") + instrument_id = os.getenv("PAYMENT_INSTRUMENT_ID", "") + session_id = os.getenv("PAYMENT_SESSION_ID", "") + + if not manager_arn: + print("Error: PAYMENT_MANAGER_ARN not set in .env") + sys.exit(1) + + # Extract manager ID from ARN + manager_id = manager_arn.split("/")[-1] + + print("Cleaning up AgentCore Payments resources...") + print(f" Manager: {manager_id}") + + client = boto3.client("bedrock-agentcore", region_name=REGION) + + # Delete in reverse order: session → instrument → connector → manager + resources = [ + ("PaymentSession", "delete_payment_session", { + "paymentManagerId": manager_id, + "paymentSessionId": session_id, + }), + ("PaymentInstrument", "delete_payment_instrument", { + "paymentManagerId": manager_id, + "paymentInstrumentId": instrument_id, + }), + ("PaymentConnector", "delete_payment_connector", { + "paymentManagerId": manager_id, + "paymentConnectorId": connector_id, + }), + ("PaymentManager", "delete_payment_manager", { + "paymentManagerId": manager_id, + }), + ] + + for name, method, kwargs in resources: + # Skip if ID is empty + if any(not v for v in kwargs.values()): + print(f" Skipping {name} (ID not set)") + continue + try: + getattr(client, method)(**kwargs) + print(f" Deleted {name}: {list(kwargs.values())[-1]}") + except client.exceptions.ResourceNotFoundException: + print(f" {name} not found (already deleted)") + except Exception as e: + print(f" Error deleting {name}: {e}") + + print("\nCleanup complete.") + print("Remember to also delete:") + print(" - AgentCore Runtime: agentcore runtime delete --agent-name batch-payments-agent") + print(" - CloudWatch log groups") + + +if __name__ == "__main__": + confirm = input("Delete all payment resources? [y/N]: ").strip().lower() + if confirm == "y": + main() + else: + print("Aborted.") diff --git a/02-use-cases/batch-payments-agent/scripts/invoke_agent.py b/02-use-cases/batch-payments-agent/scripts/invoke_agent.py new file mode 100644 index 000000000..3a67e5534 --- /dev/null +++ b/02-use-cases/batch-payments-agent/scripts/invoke_agent.py @@ -0,0 +1,54 @@ +"""Invoke the deployed DeFi Payments Agent on AgentCore Runtime. + +Usage: + python scripts/invoke_agent.py "What's the price of ETH?" + python scripts/invoke_agent.py --runtime-arn "Send batch payment" +""" + +import argparse +import json +import uuid + +import boto3 + + +def invoke(prompt: str, runtime_arn: str | None = None, region: str = "us-east-1"): + """Invoke the agent on AgentCore Runtime.""" + client = boto3.client("bedrock-agentcore-runtime", region_name=region) + + if not runtime_arn: + sts = boto3.client("sts") + account_id = sts.get_caller_identity()["Account"] + runtime_arn = ( + f"arn:aws:bedrock-agentcore:{region}:{account_id}" + f":runtime/batch-payments-agent" + ) + + session_id = str(uuid.uuid4()) + + print(f"Runtime: {runtime_arn}") + print(f"Session: {session_id}") + print(f"Prompt: {prompt}\n") + + response = client.invoke_agent_runtime( + agentRuntimeArn=runtime_arn, + sessionId=session_id, + input={"prompt": prompt}, + ) + + # Process streaming response + for event in response.get("output", {}).get("events", []): + if "message" in event: + print(event["message"].get("text", "")) + + print(f"\nSession ID: {session_id}") + + +if __name__ == "__main__": + parser = argparse.ArgumentParser(description="Invoke the DeFi Payments Agent") + parser.add_argument("prompt", type=str, help="Prompt to send to the agent") + parser.add_argument("--runtime-arn", type=str, help="AgentCore Runtime ARN") + parser.add_argument("--region", type=str, default="us-east-1") + args = parser.parse_args() + + invoke(args.prompt, args.runtime_arn, args.region) diff --git a/02-use-cases/batch-payments-agent/scripts/setup_payments.py b/02-use-cases/batch-payments-agent/scripts/setup_payments.py new file mode 100644 index 000000000..ec2fc3d42 --- /dev/null +++ b/02-use-cases/batch-payments-agent/scripts/setup_payments.py @@ -0,0 +1,204 @@ +"""One-time setup for AgentCore Payments resources. + +Creates the payment infrastructure needed for the DeFi Payments Agent: +1. PaymentCredentialProvider (stores Coinbase CDP API keys in Identity) +2. PaymentManager (top-level payment coordination) +3. PaymentConnector (links manager to Coinbase CDP wallet) +4. PaymentInstrument (the agent's wallet) +5. PaymentSession (spending limits and expiry) + +Prerequisites: +- AWS credentials configured (aws configure) +- Coinbase CDP API key and wallet secret from portal.cdp.coinbase.com +- IAM role with BedrockAgentCoreFullAccess policy + +Usage: + python scripts/setup_payments.py +""" + +import json +import os +import sys +import time +from datetime import datetime, timedelta, timezone +from getpass import getpass + +import boto3 +from dotenv import load_dotenv + +load_dotenv() + +REGION = os.getenv("AWS_REGION", "us-east-1") + + +def wait_for_ready(client, get_fn, id_key, id_value, resource_name, timeout=120): + """Poll a resource until it reaches READY state.""" + print(f" Waiting for {resource_name} to be ready...", end="", flush=True) + start = time.time() + while time.time() - start < timeout: + try: + resp = get_fn(**{id_key: id_value}) + status = resp.get("status", resp.get("state", "UNKNOWN")) + if status == "READY": + print(" READY") + return resp + if "FAILED" in status: + print(f" FAILED: {status}") + sys.exit(1) + except Exception: + pass + print(".", end="", flush=True) + time.sleep(5) + print(f" TIMEOUT after {timeout}s") + sys.exit(1) + + +def main(): + print("=" * 60) + print(" AgentCore Payments Setup — Batch Payments Agent") + print("=" * 60) + print(f"\n Region: {REGION}\n") + + # Collect Coinbase CDP credentials + print("Step 0: Coinbase CDP Credentials") + print(" Get these from https://portal.cdp.coinbase.com/\n") + cdp_api_key = input(" CDP API Key ID: ").strip() + cdp_api_secret = getpass(" CDP API Key Secret: ").strip() + + if not cdp_api_key or not cdp_api_secret: + print("Error: CDP credentials are required.") + sys.exit(1) + + # Create clients + identity_client = boto3.client("bedrock-agentcore-identity", region_name=REGION) + payments_client = boto3.client("bedrock-agentcore", region_name=REGION) + + # Step 1: Create credential provider + print("\nStep 1: Creating PaymentCredentialProvider...") + try: + cred_resp = identity_client.create_payment_credential_provider( + name="spraay-batch-agent-coinbase", + providerType="COINBASE_CDP", + credentials={ + "coinbaseCdp": { + "apiKeyId": cdp_api_key, + "apiKeySecret": cdp_api_secret, + } + }, + ) + cred_provider_id = cred_resp["providerIdentifier"] + print(f" Created: {cred_provider_id}") + except Exception as e: + print(f" Error: {e}") + sys.exit(1) + + # Step 2: Create payment manager + print("\nStep 2: Creating PaymentManager...") + try: + manager_resp = payments_client.create_payment_manager( + name="spraay-batch-payments", + authorizerType="IAM", + ) + manager_id = manager_resp["paymentManagerId"] + manager_arn = manager_resp["paymentManagerArn"] + print(f" Created: {manager_id}") + print(f" ARN: {manager_arn}") + except Exception as e: + print(f" Error: {e}") + sys.exit(1) + + wait_for_ready( + payments_client, + payments_client.get_payment_manager, + "paymentManagerId", + manager_id, + "PaymentManager", + ) + + # Step 3: Create payment connector + print("\nStep 3: Creating PaymentConnector (Coinbase CDP)...") + try: + connector_resp = payments_client.create_payment_connector( + paymentManagerId=manager_id, + name="coinbase-cdp-connector", + connectorType="COINBASE_CDP", + credentialProviderIdentifier=cred_provider_id, + ) + connector_id = connector_resp["paymentConnectorId"] + print(f" Created: {connector_id}") + except Exception as e: + print(f" Error: {e}") + sys.exit(1) + + # Step 4: Create payment instrument (agent wallet) + print("\nStep 4: Creating PaymentInstrument (agent wallet)...") + try: + instrument_resp = payments_client.create_payment_instrument( + paymentManagerId=manager_id, + paymentConnectorId=connector_id, + userId="batch-agent-user", + ) + instrument_id = instrument_resp["paymentInstrumentId"] + redirect_url = instrument_resp.get("paymentInstrumentDetails", {}).get( + "redirectUrl", "N/A" + ) + print(f" Created: {instrument_id}") + print(f" Wallet Hub URL: {redirect_url}") + print(" → Open this URL to fund the wallet with USDC on Base Sepolia") + print(" → Use https://faucet.circle.com/ for testnet USDC") + except Exception as e: + print(f" Error: {e}") + sys.exit(1) + + # Step 5: Create payment session + print("\nStep 5: Creating PaymentSession...") + max_spend = os.getenv("MAX_SPEND_AMOUNT", "1.00") + expiry = datetime.now(timezone.utc) + timedelta(hours=24) + + try: + session_resp = payments_client.create_payment_session( + paymentManagerId=manager_id, + paymentInstrumentId=instrument_id, + maxSpendAmount=max_spend, + currency="USDC", + expiresAt=expiry.isoformat(), + ) + session_id = session_resp["paymentSessionId"] + print(f" Created: {session_id}") + print(f" Budget: {max_spend} USDC") + print(f" Expires: {expiry.isoformat()}") + except Exception as e: + print(f" Error: {e}") + sys.exit(1) + + # Output .env values + print("\n" + "=" * 60) + print(" Setup complete! Add these to your .env file:") + print("=" * 60) + env_values = f""" +PAYMENT_MANAGER_ARN={manager_arn} +PAYMENT_CONNECTOR_ID={connector_id} +PAYMENT_INSTRUMENT_ID={instrument_id} +PAYMENT_SESSION_ID={session_id} +PAYMENT_USER_ID=batch-agent-user +""" + print(env_values) + + # Write to .env if it exists + env_path = os.path.join(os.path.dirname(__file__), "..", ".env") + if os.path.exists(env_path): + update = input("Update .env file with these values? [y/N]: ").strip().lower() + if update == "y": + with open(env_path, "a") as f: + f.write("\n# AgentCore Payments — auto-generated by setup_payments.py\n") + f.write(env_values) + print(" .env updated!") + + print("\nNext steps:") + print(" 1. Fund the agent wallet via the Wallet Hub URL above") + print(" 2. Run: python -m agent.main") + print(" 3. Try: 'What DeFi services are available through Spraay?'") + + +if __name__ == "__main__": + main() diff --git a/02-use-cases/batch-payments-agent/tests/conftest.py b/02-use-cases/batch-payments-agent/tests/conftest.py new file mode 100644 index 000000000..9457c35a4 --- /dev/null +++ b/02-use-cases/batch-payments-agent/tests/conftest.py @@ -0,0 +1,73 @@ +"""Shared test fixtures for the Batch Payments Agent.""" + +import pytest + + +@pytest.fixture +def mock_402_response(): + """Mock x402 payment-required response from Spraay batch endpoint.""" + return { + "status_code": 402, + "headers": { + "PAYMENT-REQUIRED": ( + "eyJzY2hlbWUiOiJleGFjdCIsIm5ldHdvcmsiOiJlaXAxNTU6ODQ1MyIs" + "ImFzc2V0IjoiMHg4MzZlZmNlNmYzNjViOGUzMzliZjc3OTE2NjI1ZGIx" + "IiwiYW1vdW50IjoiMTAwMDAiLCJyZWNpcGllbnQiOiIweDFhMmIzYzRk" + "NWU2ZjdhOGI5YzBkMWUyZjNhNGI1YzZkN2U4ZjkiLCJ0aW1lb3V0Ij" + "oxNzE5MDAwMDAwfQ==" + ), + }, + "body": {"error": "Payment required", "amount": "0.01 USDC"}, + } + + +@pytest.fixture +def mock_batch_success_response(): + """Mock successful batch transfer response after payment.""" + return { + "status_code": 200, + "body": { + "status": "success", + "transaction_hash": "0xabc123def456789...", + "chain": "base", + "batch_contract": "0x1646452F98E36A3c9Cfc3eDD8868221E207B5eEC", + "recipients_count": 3, + "recipients": [ + {"address": "0xAbc123...", "amount": "0.001", "status": "confirmed"}, + {"address": "0xDef456...", "amount": "0.001", "status": "confirmed"}, + {"address": "0x789abc...", "amount": "0.001", "status": "confirmed"}, + ], + "total_amount": "0.003 ETH", + "gas_used": 85000, + }, + } + + +@pytest.fixture +def sample_batch_request(): + """Sample batch payment request body.""" + return { + "chain": "base", + "token": "ETH", + "recipients": [ + {"address": "0xAbc123...", "amount": "0.001"}, + {"address": "0xDef456...", "amount": "0.001"}, + {"address": "0x789abc...", "amount": "0.001"}, + ], + } + + +@pytest.fixture +def sample_payroll_request(): + """Sample payroll batch with mixed amounts.""" + return { + "chain": "base", + "token": "USDC", + "recipients": [ + {"address": "0xAlice...", "amount": "500.00"}, + {"address": "0xBob...", "amount": "300.00"}, + {"address": "0xCarol...", "amount": "200.00"}, + {"address": "0xDave...", "amount": "450.00"}, + {"address": "0xEve...", "amount": "350.00"}, + ], + } diff --git a/02-use-cases/batch-payments-agent/tests/test_402_handling.py b/02-use-cases/batch-payments-agent/tests/test_402_handling.py new file mode 100644 index 000000000..218a7ce25 --- /dev/null +++ b/02-use-cases/batch-payments-agent/tests/test_402_handling.py @@ -0,0 +1,132 @@ +"""Tests for x402 batch payment flow handling. + +Validates that the agent correctly handles the HTTP 402 → payment → retry flow +when executing batch payments through Spraay x402 endpoints. +""" + +import json +from unittest.mock import MagicMock, patch + +import pytest + + +class TestX402BatchPaymentFlow: + """Test the x402 payment flow for batch transfers.""" + + def test_402_response_detected(self, mock_402_response): + """Agent correctly identifies an HTTP 402 from the batch endpoint.""" + assert mock_402_response["status_code"] == 402 + assert "PAYMENT-REQUIRED" in mock_402_response["headers"] + + def test_x402_payload_extraction(self, mock_402_response): + """Agent extracts x402 payment payload for AgentCore Payments.""" + import base64 + + payload_b64 = mock_402_response["headers"]["PAYMENT-REQUIRED"] + payload = json.loads(base64.b64decode(payload_b64)) + + assert payload["scheme"] == "exact" + assert payload["network"] == "eip155:8453" # Base + assert "amount" in payload + assert "recipient" in payload + assert "timeout" in payload + + def test_batch_success_after_payment(self, mock_batch_success_response): + """Agent receives batch result with per-recipient confirmation.""" + assert mock_batch_success_response["status_code"] == 200 + body = mock_batch_success_response["body"] + assert body["status"] == "success" + assert "transaction_hash" in body + assert body["recipients_count"] == 3 + assert all(r["status"] == "confirmed" for r in body["recipients"]) + + def test_batch_contract_address(self, mock_batch_success_response): + """Response references the correct Spraay batch contract.""" + body = mock_batch_success_response["body"] + assert body["batch_contract"] == "0x1646452F98E36A3c9Cfc3eDD8868221E207B5eEC" + + def test_batch_cost_estimation(self): + """estimate_batch_cost returns correct fee structure.""" + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=10, operation="transfer" + ) + assert result["status"] == "success" + assert result["estimated_fee"]["currency"] == "USDC" + assert result["comparison"]["individual_txs"] == 10 + assert result["comparison"]["batch_txs"] == 1 + + def test_batch_cost_capped_at_max(self): + """estimate_batch_cost caps at max fee for large batches.""" + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=1000, operation="transfer" + ) + assert result["estimated_fee"]["amount"] <= 0.05 # max for transfer + + def test_payroll_batch_cost(self): + """estimate_batch_cost returns higher fees for payroll operations.""" + from agent.tools import estimate_batch_cost + + transfer = estimate_batch_cost( + tool_use_id="test", recipient_count=5, operation="transfer" + ) + payroll = estimate_batch_cost( + tool_use_id="test", recipient_count=5, operation="payroll" + ) + assert payroll["estimated_fee"]["amount"] > transfer["estimated_fee"]["amount"] + + def test_unknown_operation_error(self): + """estimate_batch_cost returns error for unknown operations.""" + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=5, operation="unknown" + ) + assert result["status"] == "error" + assert "available_operations" in result + + def test_supported_chains(self): + """get_supported_chains returns correct chain info.""" + from agent.tools import get_supported_chains + + result = get_supported_chains(tool_use_id="test") + assert result["status"] == "success" + chains = result["chains"] + assert chains["total_chains"] == 16 + + primary_names = [c["name"] for c in chains["primary"]] + assert "Base" in primary_names + assert "Ethereum" in primary_names + assert "Solana" in primary_names + + +class TestBatchPaymentRequest: + """Test batch payment request construction.""" + + def test_batch_request_structure(self, sample_batch_request): + """Batch payment request has correct structure.""" + assert sample_batch_request["chain"] == "base" + assert sample_batch_request["token"] == "ETH" + assert len(sample_batch_request["recipients"]) == 3 + + def test_batch_request_recipients(self, sample_batch_request): + """Each recipient has address and amount.""" + for recipient in sample_batch_request["recipients"]: + assert "address" in recipient + assert "amount" in recipient + assert recipient["address"].startswith("0x") + + def test_payroll_request_mixed_amounts(self, sample_payroll_request): + """Payroll batch supports different amounts per recipient.""" + amounts = [float(r["amount"]) for r in sample_payroll_request["recipients"]] + assert len(set(amounts)) > 1 # Not all the same amount + assert sample_payroll_request["token"] == "USDC" + assert len(sample_payroll_request["recipients"]) == 5 + + def test_payroll_total(self, sample_payroll_request): + """Payroll batch amounts sum correctly.""" + total = sum(float(r["amount"]) for r in sample_payroll_request["recipients"]) + assert total == 1800.00 # 500 + 300 + 200 + 450 + 350 diff --git a/02-use-cases/batch-payments-agent/tests/test_tools.py b/02-use-cases/batch-payments-agent/tests/test_tools.py new file mode 100644 index 000000000..4270a44f2 --- /dev/null +++ b/02-use-cases/batch-payments-agent/tests/test_tools.py @@ -0,0 +1,131 @@ +"""Unit tests for Spraay x402 batch payment and supporting tools.""" + +from unittest.mock import MagicMock, patch + +import pytest + + +class TestEstimateBatchCost: + """Tests for the batch-specific cost estimator.""" + + def test_small_batch_transfer(self): + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=3, operation="transfer" + ) + assert result["status"] == "success" + # base 0.01 + (3 * 0.001) = 0.013 + assert result["estimated_fee"]["amount"] == 0.013 + assert result["comparison"]["batch_txs"] == 1 + assert result["comparison"]["individual_txs"] == 3 + + def test_large_batch_capped(self): + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=500, operation="transfer" + ) + assert result["estimated_fee"]["amount"] == 0.05 # capped at max + + def test_payroll_operation(self): + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=10, operation="payroll" + ) + assert result["status"] == "success" + # base 0.05 + (10 * 0.002) = 0.07 + assert result["estimated_fee"]["amount"] == 0.07 + + def test_escrow_operation(self): + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=5, operation="escrow" + ) + assert result["status"] == "success" + # base 0.05 + (5 * 0.003) = 0.065 + assert result["estimated_fee"]["amount"] == 0.065 + + def test_single_recipient_batch(self): + """Even a single-recipient batch works (edge case).""" + from agent.tools import estimate_batch_cost + + result = estimate_batch_cost( + tool_use_id="test", recipient_count=1, operation="transfer" + ) + assert result["status"] == "success" + assert result["comparison"]["savings"] == "0 fewer transactions" + + +class TestEstimateSprayCost: + """Tests for the general endpoint cost estimator.""" + + def test_pricing_category(self): + from agent.tools import estimate_spraay_cost + + result = estimate_spraay_cost( + tool_use_id="test", endpoint_category="pricing", num_calls=10 + ) + assert result["status"] == "success" + assert result["estimated_total"]["min"] == 0.01 + assert result["estimated_total"]["max"] == 0.05 + + def test_defi_category(self): + from agent.tools import estimate_spraay_cost + + result = estimate_spraay_cost( + tool_use_id="test", endpoint_category="defi", num_calls=1 + ) + assert result["status"] == "success" + assert result["estimated_total"]["min"] == 0.005 + + def test_rpc_category(self): + from agent.tools import estimate_spraay_cost + + result = estimate_spraay_cost( + tool_use_id="test", endpoint_category="rpc", num_calls=100 + ) + assert result["status"] == "success" + assert result["estimated_total"]["min"] == 0.1 # 100 * 0.001 + + def test_all_categories_valid(self): + from agent.tools import estimate_spraay_cost + + categories = [ + "batch_payments", "escrow", "bridge", "payroll", + "pricing", "wallet", "defi", "research", "rpc", + "oracle", "ai_inference", "compute_futures", + ] + for cat in categories: + result = estimate_spraay_cost( + tool_use_id="test", endpoint_category=cat, num_calls=1 + ) + assert result["status"] == "success", f"Failed for category: {cat}" + + +class TestGetSupportedChains: + """Tests for the get_supported_chains tool.""" + + def test_returns_primary_and_secondary(self): + from agent.tools import get_supported_chains + + result = get_supported_chains(tool_use_id="test") + chains = result["chains"] + assert len(chains["primary"]) == 3 + assert len(chains["secondary"]) >= 10 + + def test_base_is_primary(self): + from agent.tools import get_supported_chains + + result = get_supported_chains(tool_use_id="test") + primary = result["chains"]["primary"] + base = next(c for c in primary if c["name"] == "Base") + assert base["chain_id"] == 8453 + + def test_payment_network_is_base(self): + from agent.tools import get_supported_chains + + result = get_supported_chains(tool_use_id="test") + assert "Base" in result["chains"]["payment_network"]