Skip to content
Closed
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
33 changes: 21 additions & 12 deletions app/erc8004.py
Original file line number Diff line number Diff line change
Expand Up @@ -210,19 +210,22 @@ async def resolve_onchain_agent(agent_id: int) -> dict:
}


def get_onchain_reputation(agent_id: int, clients: list = None) -> dict:
async def get_onchain_reputation(agent_id: int, clients: list = None) -> dict:
"""
Fetch on-chain reputation summary for an agent from the ERC-8004 Reputation Registry.
"""
import asyncio
contract = get_reputation_contract()

try:
if not clients:
clients = contract.functions.getClients(agent_id).call()
clients = await asyncio.to_thread(contract.functions.getClients(agent_id).call)
if not clients:
return {"agent_id": agent_id, "count": 0, "summary_value": 0, "decimals": 0, "clients": 0}

count, value, decimals = contract.functions.getSummary(agent_id, clients, "", "").call()
count, value, decimals = await asyncio.to_thread(
contract.functions.getSummary(agent_id, clients, "", "").call
)
return {
"agent_id": agent_id,
"count": count,
Expand Down Expand Up @@ -293,7 +296,7 @@ def _get_reputation_write_contract():
return _reputation_write_contract


def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: int) -> dict:
async def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: int) -> dict:
"""
Post a MolTrust rating as an ERC-8004 feedback signal on-chain.

Expand All @@ -308,15 +311,17 @@ def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: in
Returns:
dict with tx_hash on success, or error on failure
"""
import asyncio
from app.nonce_manager import get_nonce, reset_nonce
try:
w3 = _get_w3()
contract = _get_reputation_write_contract()

erc8004_value = score * 20 # 1->20, 2->40, 3->60, 4->80, 5->100
endpoint = f"https://api.moltrust.ch/reputation/query/{moltrust_did}"

nonce = w3.eth.get_transaction_count(_WRITE_ADDR)
gas_price = w3.eth.gas_price
nonce = await get_nonce(w3, _WRITE_ADDR)
gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price)

tx = contract.functions.giveFeedback(
erc8004_agent_id,
Expand All @@ -337,13 +342,14 @@ def post_reputation_feedback(erc8004_agent_id: int, moltrust_did: str, score: in
})

signed = w3.eth.account.sign_transaction(tx, _WRITE_KEY)
tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction)
tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction)
hex_hash = w3.to_hex(tx_hash)

logger.info(f"ERC-8004 feedback posted: agent={erc8004_agent_id} score={score} tx={hex_hash}")
return {"tx_hash": hex_hash, "chain": "base", "basescan": f"https://basescan.org/tx/{hex_hash}"}

except Exception as e:
await reset_nonce(_WRITE_ADDR)
logger.error(f"ERC-8004 feedback error: {e}")
return {"error": str(e)}

Expand Down Expand Up @@ -385,7 +391,7 @@ def _get_identity_write_contract():
return _identity_write_contract


def register_onchain_agent(agent_did: str) -> dict:
async def register_onchain_agent(agent_did: str) -> dict:
"""
Register a MolTrust agent on the ERC-8004 IdentityRegistry on Base.

Expand All @@ -397,14 +403,16 @@ def register_onchain_agent(agent_did: str) -> dict:
Returns:
dict with agent_id and tx_hash on success, or error on failure
"""
import asyncio
from app.nonce_manager import get_nonce, reset_nonce
try:
w3 = _get_w3()
contract = _get_identity_write_contract()

agent_uri = f"https://api.moltrust.ch/agents/{agent_did}/erc8004"

nonce = w3.eth.get_transaction_count(_WRITE_ADDR)
gas_price = w3.eth.gas_price
nonce = await get_nonce(w3, _WRITE_ADDR)
gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price)

tx = contract.functions.register(agent_uri).build_transaction({
"from": _WRITE_ADDR,
Expand All @@ -416,8 +424,8 @@ def register_onchain_agent(agent_did: str) -> dict:
})

signed = w3.eth.account.sign_transaction(tx, _WRITE_KEY)
tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction)
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=30)
tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction)
receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30)

if receipt.status != 1:
return {"error": "Transaction reverted", "tx_hash": w3.to_hex(tx_hash)}
Expand All @@ -439,5 +447,6 @@ def register_onchain_agent(agent_did: str) -> dict:
}

except Exception as e:
await reset_nonce(_WRITE_ADDR)
logger.error(f"ERC-8004 registration error: {e}")
return {"error": str(e)}
73 changes: 41 additions & 32 deletions app/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -763,7 +763,7 @@ async def register_agent(request: Request, body: RegisterRequest, api_key: str =
erc8004_result = None
if body.erc8004:
from app.erc8004 import register_onchain_agent
erc8004_result = register_onchain_agent(agent_did)
erc8004_result = await register_onchain_agent(agent_did)
if erc8004_result.get("agent_id") and db_pool:
async with db_pool.acquire() as conn:
await conn.execute(
Expand Down Expand Up @@ -871,7 +871,7 @@ async def rate_agent(request: Request, body: RateRequest, api_key: str = Depends
row = await conn.fetchrow("SELECT erc8004_agent_id FROM agents WHERE did = $1", body.to_did)
if row and row["erc8004_agent_id"] is not None:
from app.erc8004 import post_reputation_feedback
result = post_reputation_feedback(row["erc8004_agent_id"], body.to_did, body.score)
result = await post_reputation_feedback(row["erc8004_agent_id"], body.to_did, body.score)
if "tx_hash" in result:
erc8004_tx = result["tx_hash"]
return {"status": "rated", "from": body.from_did, "to": body.to_did, "score": body.score, "erc8004_tx": erc8004_tx}
Expand Down Expand Up @@ -2046,12 +2046,15 @@ async def load_api_keys():
BASE_ADDR = Account.from_key(BASE_KEY).address if BASE_KEY else None

async def anchor_to_base(agent_did: str, timestamp: str) -> str:
from app.nonce_manager import get_nonce, reset_nonce
try:
w3 = Web3(Web3.HTTPProvider(BASE_RPC))
if not w3.is_connected():
connected = await asyncio.to_thread(w3.is_connected)
if not connected:
return None
data = _hashlib.sha256(f"{agent_did}:{timestamp}".encode()).hexdigest()
nonce = w3.eth.get_transaction_count(BASE_ADDR)
nonce = await get_nonce(w3, BASE_ADDR)
gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price)
tx = {
"from": BASE_ADDR,
"to": BASE_ADDR,
Expand All @@ -2060,13 +2063,14 @@ async def anchor_to_base(agent_did: str, timestamp: str) -> str:
"nonce": nonce,
"chainId": 8453,
"gas": 25000,
"maxFeePerGas": w3.eth.gas_price + w3.to_wei(0.001, "gwei"),
"maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"),
"maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"),
}
signed = w3.eth.account.sign_transaction(tx, BASE_KEY)
tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction)
tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction)
return w3.to_hex(tx_hash)
except Exception as e:
await reset_nonce(BASE_ADDR)
print(f"Base anchor error: {e}")
return None

Expand Down Expand Up @@ -2456,7 +2460,7 @@ async def erc8004_resolve(request: Request, agent_id: int = Path(ge=0)):
result["moltrust_profile"] = f"https://api.moltrust.ch/identity/resolve/{row["did"]}"

# Fetch on-chain reputation
result["onchain_reputation"] = get_onchain_reputation(agent_id)
result["onchain_reputation"] = await get_onchain_reputation(agent_id)
return result

@app.get("/.well-known/agent-registration.json")
Expand Down Expand Up @@ -2520,7 +2524,7 @@ async def erc8004_dual_register(request: Request, body: ERC8004RegisterRequest,
})

from app.erc8004 import register_onchain_agent
erc8004_result = register_onchain_agent(agent_did)
erc8004_result = await register_onchain_agent(agent_did)
erc8004_agent_id = erc8004_result.get("agent_id")
if erc8004_agent_id:
async with db_pool.acquire() as conn:
Expand Down Expand Up @@ -2614,7 +2618,7 @@ async def erc8004_validate(request: Request, body: ERC8004ValidateRequest, api_k
vc = issue_credential(subject_did, "AgentValidationCredential", claims)

from app.erc8004 import post_reputation_feedback
feedback_result = post_reputation_feedback(body.erc8004_agent_id, subject_did, trust_score)
feedback_result = await post_reputation_feedback(body.erc8004_agent_id, subject_did, trust_score)

return {
"validated": True,
Expand Down Expand Up @@ -3673,40 +3677,45 @@ def _build_music_vc(row) -> dict:


async def _anchor_music_vc(track_hash: str, credential_id: str):
"""Anchor music VC on Base L2 in background."""
"""Anchor music VC on Base L2 in background using web3.py."""
from app.nonce_manager import get_nonce, reset_nonce
base_key = os.environ.get("BASE_WRITE_KEY", "")
if not base_key:
return
try:
from eth_account import Account as _MusicAccount
write_addr = _MusicAccount.from_key(base_key).address
message = "MolTrust/MusicVC/1 SHA256:" + track_hash
hex_data = message.encode("utf-8").hex()
env = os.environ.copy()
env["ETH_PRIVATE_KEY"] = base_key
cmd = [
os.path.expanduser("~/.foundry/bin/cast"), "send",
"--rpc-url", "https://mainnet.base.org",
"0x0000000000000000000000000000000000000000",
"--value", "0",
"--", "0x" + hex_data,
]
proc = await asyncio.create_subprocess_exec(
*cmd, stdout=asyncio.subprocess.PIPE, stderr=asyncio.subprocess.PIPE,
env=env,
)
stdout, _ = await asyncio.wait_for(proc.communicate(), timeout=30)
output = stdout.decode()
import re
tx_match = re.search(r"transactionHash\s+(0x[0-9a-fA-F]+)", output)
block_match = re.search(r"blockNumber\s+(\d+)", output)
if tx_match and block_match:
tx, block = tx_match.group(1), block_match.group(1)

w3 = Web3(Web3.HTTPProvider("https://mainnet.base.org"))
nonce = await get_nonce(w3, write_addr)
gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price)
tx = {
"from": write_addr,
"to": "0x0000000000000000000000000000000000000000",
"value": 0,
"data": w3.to_bytes(hexstr="0x" + hex_data),
"nonce": nonce,
"chainId": 8453,
"gas": 25000,
"maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"),
"maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"),
}
signed = w3.eth.account.sign_transaction(tx, base_key)
tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction)
hex_hash = w3.to_hex(tx_hash)
receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30)

if receipt.blockNumber:
async with db_pool.acquire() as conn:
await conn.execute(
"UPDATE music_credentials SET anchor_tx = $1, anchor_block = $2 WHERE id = $3",
tx, block, credential_id,
hex_hash, str(receipt.blockNumber), credential_id,
)
print(f"Music VC anchored: {tx} block {block}")
print(f"Music VC anchored: {hex_hash} block {receipt.blockNumber}")
except Exception as e:
await reset_nonce(write_addr)
print(f"Music anchor failed: {e}")


Expand Down
47 changes: 47 additions & 0 deletions app/nonce_manager.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,47 @@
"""Thread-safe nonce manager for Base L2 transactions.

Prevents nonce collisions when multiple async handlers submit
transactions concurrently from the same wallet address.
"""
import asyncio
import logging
from web3 import Web3

logger = logging.getLogger("moltrust.nonce")

_locks: dict[str, asyncio.Lock] = {}
_nonces: dict[str, int] = {}


def _get_lock(address: str) -> asyncio.Lock:
"""Get or create a per-address lock."""
if address not in _locks:
_locks[address] = asyncio.Lock()
return _locks[address]


async def get_nonce(w3: Web3, address: str) -> int:
"""Get the next nonce for an address, serialized via asyncio.Lock.

On first call (or after a reset), fetches the pending nonce from the
chain. Subsequent calls increment locally to avoid collisions when
multiple transactions are submitted before the first is mined.
"""
lock = _get_lock(address)
async with lock:
if address not in _nonces:
_nonces[address] = await asyncio.to_thread(
w3.eth.get_transaction_count, address, "pending"
)
else:
_nonces[address] += 1
nonce = _nonces[address]
logger.debug(f"Nonce for {address}: {nonce}")
return nonce


async def reset_nonce(address: str) -> None:
"""Reset cached nonce after a known failure, forcing a re-fetch."""
lock = _get_lock(address)
async with lock:
_nonces.pop(address, None)
16 changes: 11 additions & 5 deletions app/provenance/anchor.py
Original file line number Diff line number Diff line change
Expand Up @@ -168,9 +168,10 @@ async def anchor_batch(conn, anchor_fn) -> dict:
block_number = None
try:
from web3 import Web3
import asyncio
import os
w3 = Web3(Web3.HTTPProvider(os.getenv("BASE_RPC", "https://mainnet.base.org")))
receipt = w3.eth.wait_for_transaction_receipt(tx_hash, timeout=30)
receipt = await asyncio.to_thread(w3.eth.wait_for_transaction_receipt, tx_hash, 30)
block_number = receipt.blockNumber
except Exception:
pass
Expand Down Expand Up @@ -203,7 +204,9 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]:
"""
try:
from web3 import Web3
import asyncio
import os
from app.nonce_manager import get_nonce, reset_nonce

BASE_RPC = os.getenv("BASE_RPC", "https://mainnet.base.org")
BASE_ADDR = os.getenv("BASE_ADDR", "")
Expand All @@ -214,10 +217,12 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]:
return None

w3 = Web3(Web3.HTTPProvider(BASE_RPC))
if not w3.is_connected():
connected = await asyncio.to_thread(w3.is_connected)
if not connected:
return None

nonce = w3.eth.get_transaction_count(BASE_ADDR)
nonce = await get_nonce(w3, BASE_ADDR)
gas_price = await asyncio.to_thread(lambda: w3.eth.gas_price)
tx = {
"from": BASE_ADDR,
"to": BASE_ADDR,
Expand All @@ -226,12 +231,13 @@ async def anchor_single_calldata(calldata: str) -> Optional[str]:
"nonce": nonce,
"chainId": 8453,
"gas": 30000,
"maxFeePerGas": w3.eth.gas_price + w3.to_wei(0.001, "gwei"),
"maxFeePerGas": gas_price + w3.to_wei(0.001, "gwei"),
"maxPriorityFeePerGas": w3.to_wei(0.001, "gwei"),
}
signed = w3.eth.account.sign_transaction(tx, BASE_KEY)
tx_hash = w3.eth.send_raw_transaction(signed.raw_transaction)
tx_hash = await asyncio.to_thread(w3.eth.send_raw_transaction, signed.raw_transaction)
return w3.to_hex(tx_hash)
except Exception as e:
await reset_nonce(BASE_ADDR)
print(f"IPR anchor error: {e}")
return None