diff --git a/app/erc8004.py b/app/erc8004.py index ad1cf0d..add6412 100644 --- a/app/erc8004.py +++ b/app/erc8004.py @@ -210,19 +210,22 @@ async def resolve_onchain_agent(agent_id: int) -> dict: } -def get_onchain_reputation(agent_id: int, clients: list = None) -> dict: +async def get_onchain_reputation(agent_id: int, clients: list = None) -> dict: """ Fetch on-chain reputation summary for an agent from the ERC-8004 Reputation Registry. """ + import asyncio contract = get_reputation_contract() try: if not clients: - clients = contract.functions.getClients(agent_id).call() + clients = await asyncio.to_thread(contract.functions.getClients(agent_id).call) if not clients: return {"agent_id": agent_id, "count": 0, "summary_value": 0, "decimals": 0, "clients": 0} - count, value, decimals = contract.functions.getSummary(agent_id, clients, "", "").call() + count, value, decimals = await asyncio.to_thread( + contract.functions.getSummary(agent_id, clients, "", "").call + ) return { "agent_id": agent_id, "count": count, @@ -293,7 +296,7 @@ def _get_reputation_write_contract(): return _reputation_write_contract -def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: int) -> dict: +async def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: int) -> dict: """ Post a MolTrust rating as an ERC-8004 feedback signal on-chain. @@ -308,6 +311,8 @@ def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: in Returns: dict with tx_hash on success, or error on failure """ + import asyncio + from app.nonce_manager import get_nonce, reset_nonce try: w3 = _get_w3() contract = _get_reputation_write_contract() @@ -315,8 +320,8 @@ def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: in erc8004_value = score * 20 # 1->20, 2->40, 3->60, 4->80, 5->100 endpoint = f"https://api.moltrust.ch/reputation/query/{moltrust_did}" - nonce = w3.eth.get_transaction_count(_WRITE_ADDR) - gas_price = w3.eth.gas_price + nonce = await get_nonce(w3, _WRITE_ADDR) + gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price) tx = contract.functions.giveFeedback( erc8004_agent_id, @@ -337,13 +342,14 @@ def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: in }) signed = w3.eth.account.sign_transaction(tx, _WRITE_KEY) - tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction) + tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction) hex_hash = w3.to_hex(tx_hash) logger.info(f"ERC-8004 feedback posted: agent={erc8004_agent_id} score={score} tx={hex_hash}") return {"tx_hash": hex_hash, "chain": "base", "basescan": f"https://basescan.org/tx/{hex_hash}"} except Exception as e: + await reset_nonce(_WRITE_ADDR) logger.error(f"ERC-8004 feedback error: {e}") return {"error": str(e)} @@ -385,7 +391,7 @@ def _get_identity_write_contract(): return _identity_write_contract -def register_onchain_agent(agent_did: str) -> dict: +async def register_onchain_agent(agent_did: str) -> dict: """ Register a MolTrust agent on the ERC-8004 IdentityRegistry on Base. @@ -397,14 +403,16 @@ def register_onchain_agent(agent_did: str) -> dict: Returns: dict with agent_id and tx_hash on success, or error on failure """ + import asyncio + from app.nonce_manager import get_nonce, reset_nonce try: w3 = _get_w3() contract = _get_identity_write_contract() agent_uri = f"https://api.moltrust.ch/agents/{agent_did}/erc8004" - nonce = w3.eth.get_transaction_count(_WRITE_ADDR) - gas_price = w3.eth.gas_price + nonce = await get_nonce(w3, _WRITE_ADDR) + gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price) tx = contract.functions.register(agent_uri).build_transaction({ "from": _WRITE_ADDR, @@ -416,8 +424,8 @@ def register_onchain_agent(agent_did: str) -> dict: }) signed = w3.eth.account.sign_transaction(tx, _WRITE_KEY) - tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction) - receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=30) + tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction) + receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30) if receipt.status != 1: return {"error": "Transaction reverted", "tx_hash": w3.to_hex(tx_hash)} @@ -439,5 +447,6 @@ def register_onchain_agent(agent_did: str) -> dict: } except Exception as e: + await reset_nonce(_WRITE_ADDR) logger.error(f"ERC-8004 registration error: {e}") return {"error": str(e)} diff --git a/app/main.py b/app/main.py index 4e6c1b9..9608623 100644 --- a/app/main.py +++ b/app/main.py @@ -763,7 +763,7 @@ async def register_agent(request: Request, body: RegisterRequest, api_key: str = erc8004_result = None if body.erc8004: from app.erc8004 import register_onchain_agent - erc8004_result = register_onchain_agent(agent_did) + erc8004_result = await register_onchain_agent(agent_did) if erc8004_result.get("agent_id") and db_pool: async with db_pool.acquire() as conn: await conn.execute( @@ -871,7 +871,7 @@ async def rate_agent(request: Request, body: RateRequest, api_key: str = Depends row = await conn.fetchrow("SELECT erc8004_agent_id FROM agents WHERE did = $1", body.to_did) if row and row["erc8004_agent_id"] is not None: from app.erc8004 import post_reputation_feedback - result = post_reputation_feedback(row["erc8004_agent_id"], body.to_did, body.score) + result = await post_reputation_feedback(row["erc8004_agent_id"], body.to_did, body.score) if "tx_hash" in result: erc8004_tx = result["tx_hash"] return {"status": "rated", "from": body.from_did, "to": body.to_did, "score": body.score, "erc8004_tx": erc8004_tx} @@ -2046,12 +2046,15 @@ async def load_api_keys(): BASE_ADDR = Account.from_key(BASE_KEY).address if BASE_KEY else None async def anchor_to_base(agent_did: str, timestamp: str) -> str: + from app.nonce_manager import get_nonce, reset_nonce try: w3 = Web3(Web3.HTTPProvider(BASE_RPC)) - if not w3.is_connected(): + connected = await asyncio.to_thread(w3.is_connected) + if not connected: return None data = _hashlib.sha256(f"{agent_did}:{timestamp}".encode()).hexdigest() - nonce = w3.eth.get_transaction_count(BASE_ADDR) + nonce = await get_nonce(w3, BASE_ADDR) + gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price) tx = { "from": BASE_ADDR, "to": BASE_ADDR, @@ -2060,13 +2063,14 @@ async def anchor_to_base(agent_did: str, timestamp: str) -> str: "nonce": nonce, "chainId": 8453, "gas": 25000, - "maxFeePerGas": w3.eth.gas_price + w3.to_wei(0.001, "gwei"), + "maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"), "maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"), } signed = w3.eth.account.sign_transaction(tx, BASE_KEY) - tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction) + tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction) return w3.to_hex(tx_hash) except Exception as e: + await reset_nonce(BASE_ADDR) print(f"Base anchor error: {e}") return None @@ -2456,7 +2460,7 @@ async def erc8004_resolve(request: Request, agent_id: int = Path(ge=0)): result["moltrust_profile"] = f"https://api.moltrust.ch/identity/resolve/{row["did"]}" # Fetch on-chain reputation - result["onchain_reputation"] = get_onchain_reputation(agent_id) + result["onchain_reputation"] = await get_onchain_reputation(agent_id) return result @app.get("/.well-known/agent-registration.json") @@ -2520,7 +2524,7 @@ async def erc8004_dual_register(request: Request, body: ERC8004RegisterRequest, }) from app.erc8004 import register_onchain_agent - erc8004_result = register_onchain_agent(agent_did) + erc8004_result = await register_onchain_agent(agent_did) erc8004_agent_id = erc8004_result.get("agent_id") if erc8004_agent_id: async with db_pool.acquire() as conn: @@ -2614,7 +2618,7 @@ async def erc8004_validate(request: Request, body: ERC8004ValidateRequest, api_k vc = issue_credential(subject_did, "AgentValidationCredential", claims) from app.erc8004 import post_reputation_feedback - feedback_result = post_reputation_feedback(body.erc8004_agent_id, subject_did, trust_score) + feedback_result = await post_reputation_feedback(body.erc8004_agent_id, subject_did, trust_score) return { "validated": True, @@ -3673,40 +3677,45 @@ def _build_music_vc(row) -> dict: async def _anchor_music_vc(track_hash: str, credential_id: str): - """Anchor music VC on Base L2 in background.""" + """Anchor music VC on Base L2 in background using web3.py.""" + from app.nonce_manager import get_nonce, reset_nonce base_key = os.environ.get("BASE_WRITE_KEY", "") if not base_key: return try: + from eth_account import Account as _MusicAccount + write_addr = _MusicAccount.from_key(base_key).address message = "MolTrust/MusicVC/1 SHA256:" + track_hash hex_data = message.encode("utf-8").hex() - env = os.environ.copy() - env["ETH_PRIVATE_KEY"] = base_key - cmd = [ - os.path.expanduser("~/.foundry/bin/cast"), "send", - "--rpc-url", "https://mainnet.base.org", - "0x0000000000000000000000000000000000000000", - "--value", "0", - "--", "0x" + hex_data, - ] - proc = await asyncio.create_subprocess_exec( - *cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE, - env=env, - ) - stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30) - output = stdout.decode() - import re - tx_match = re.search(r"transactionHash\s+(0x[0-9a-fA-F]+)", output) - block_match = re.search(r"blockNumber\s+(\d+)", output) - if tx_match and block_match: - tx, block = tx_match.group(1), block_match.group(1) + + w3 = Web3(Web3.HTTPProvider("https://mainnet.base.org")) + nonce = await get_nonce(w3, write_addr) + gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price) + tx = { + "from": write_addr, + "to": "0x0000000000000000000000000000000000000000", + "value": 0, + "data": w3.to_bytes(hexstr="0x" + hex_data), + "nonce": nonce, + "chainId": 8453, + "gas": 25000, + "maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"), + "maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"), + } + signed = w3.eth.account.sign_transaction(tx, base_key) + tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction) + hex_hash = w3.to_hex(tx_hash) + receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30) + + if receipt.blockNumber: async with db_pool.acquire() as conn: await conn.execute( "UPDATE music_credentials SET anchor_tx = $1, anchor_block = $2 WHERE id = $3", - tx, block, credential_id, + hex_hash, str(receipt.blockNumber), credential_id, ) - print(f"Music VC anchored: {tx} block {block}") + print(f"Music VC anchored: {hex_hash} block {receipt.blockNumber}") except Exception as e: + await reset_nonce(write_addr) print(f"Music anchor failed: {e}") diff --git a/app/nonce_manager.py b/app/nonce_manager.py new file mode 100644 index 0000000..fa1b4cd --- /dev/null +++ b/app/nonce_manager.py @@ -0,0 +1,47 @@ +"""Thread-safe nonce manager for Base L2 transactions. + +Prevents nonce collisions when multiple async handlers submit +transactions concurrently from the same wallet address. +""" +import asyncio +import logging +from web3 import Web3 + +logger = logging.getLogger("moltrust.nonce") + +_locks: dict[str, asyncio.Lock] = {} +_nonces: dict[str, int] = {} + + +def _get_lock(address: str) -> asyncio.Lock: + """Get or create a per-address lock.""" + if address not in _locks: + _locks[address] = asyncio.Lock() + return _locks[address] + + +async def get_nonce(w3: Web3, address: str) -> int: + """Get the next nonce for an address, serialized via asyncio.Lock. + + On first call (or after a reset), fetches the pending nonce from the + chain. Subsequent calls increment locally to avoid collisions when + multiple transactions are submitted before the first is mined. + """ + lock = _get_lock(address) + async with lock: + if address not in _nonces: + _nonces[address] = await asyncio.to_thread( + w3.eth.get_transaction_count, address, "pending" + ) + else: + _nonces[address] += 1 + nonce = _nonces[address] + logger.debug(f"Nonce for {address}: {nonce}") + return nonce + + +async def reset_nonce(address: str) -> None: + """Reset cached nonce after a known failure, forcing a re-fetch.""" + lock = _get_lock(address) + async with lock: + _nonces.pop(address, None) diff --git a/app/provenance/anchor.py b/app/provenance/anchor.py index 7276ca8..782d6ce 100644 --- a/app/provenance/anchor.py +++ b/app/provenance/anchor.py @@ -168,9 +168,10 @@ async def anchor_batch(conn, anchor_fn) -> dict: block_number = None try: from web3 import Web3 + import asyncio import os w3 = Web3(Web3.HTTPProvider(os.getenv("BASE_RPC", "https://mainnet.base.org"))) - receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=30) + receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30) block_number = receipt.blockNumber except Exception: pass @@ -203,7 +204,9 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]: """ try: from web3 import Web3 + import asyncio import os + from app.nonce_manager import get_nonce, reset_nonce BASE_RPC = os.getenv("BASE_RPC", "https://mainnet.base.org") BASE_ADDR = os.getenv("BASE_ADDR", "") @@ -214,10 +217,12 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]: return None w3 = Web3(Web3.HTTPProvider(BASE_RPC)) - if not w3.is_connected(): + connected = await asyncio.to_thread(w3.is_connected) + if not connected: return None - nonce = w3.eth.get_transaction_count(BASE_ADDR) + nonce = await get_nonce(w3, BASE_ADDR) + gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price) tx = { "from": BASE_ADDR, "to": BASE_ADDR, @@ -226,12 +231,13 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]: "nonce": nonce, "chainId": 8453, "gas": 30000, - "maxFeePerGas": w3.eth.gas_price + w3.to_wei(0.001, "gwei"), + "maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"), "maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"), } signed = w3.eth.account.sign_transaction(tx, BASE_KEY) - tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction) + tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction) return w3.to_hex(tx_hash) except Exception as e: + await reset_nonce(BASE_ADDR) print(f"IPR anchor error: {e}") return None