diff --git a/contracts/scripts/publish.sh b/contracts/scripts/publish.sh index cee5caa35..ae9c37adb 100755 --- a/contracts/scripts/publish.sh +++ b/contracts/scripts/publish.sh @@ -1,7 +1,8 @@ #!/usr/bin/env bash set -euxo pipefail -PUBLISHER_PROFILE=default +# Use PUBLISHER_PROFILE from env for devnet/mainnet (e.g. PUBLISHER_PROFILE=devnet); default is default (local). +PUBLISHER_PROFILE=${PUBLISHER_PROFILE:-default} PUBLISHER_ADDR=0x$(aptos config show-profiles --profile=$PUBLISHER_PROFILE | grep 'account' | sed -n 's/.*"account": \"\(.*\)\".*/\1/p') # deploy platform forwarder diff --git a/contracts/scripts/set_config.sh b/contracts/scripts/set_config.sh index 5f1d3c85f..44a6a91fb 100755 --- a/contracts/scripts/set_config.sh +++ b/contracts/scripts/set_config.sh @@ -1,7 +1,15 @@ #!/usr/bin/env bash set -euxo pipefail -PLATFORM_FORWARDER_ADDR=$(cat platform/contract_address.txt) +# Use PLATFORM_FORWARDER_ADDR from env or from platform/contract_address.txt (after publish.sh). +# Use PUBLISHER_PROFILE from env for devnet (e.g. PUBLISHER_PROFILE=devnet). +PLATFORM_FORWARDER_ADDR=${PLATFORM_FORWARDER_ADDR:-$(cat platform/contract_address.txt)} +PUBLISHER_PROFILE=${PUBLISHER_PROFILE:-default} -# forwarder::set_config -aptos move run --function-id "$PLATFORM_FORWARDER_ADDR::forwarder::set_config" --assume-yes --args u32:1 u32:1 u8:1 "hex:[$ORACLE_PUBKEYS]" +if [ -z "$ORACLE_PUBKEYS" ]; then + echo "ORACLE_PUBKEYS is required (comma-separated quoted hex keys, e.g. \"0xabc\",\"0xdef\")" + exit 1 +fi + +# forwarder::set_config(don_id, config_version, f, oracles) +aptos move run --function-id "$PLATFORM_FORWARDER_ADDR::forwarder::set_config" --profile "$PUBLISHER_PROFILE" --assume-yes --args u32:1 u32:1 u8:1 "hex:[$ORACLE_PUBKEYS]" diff --git a/go.mod b/go.mod index ebeccde94..754681e6f 100644 --- a/go.mod +++ b/go.mod @@ -16,8 +16,8 @@ require ( github.com/prometheus/client_golang v1.23.2 github.com/shopspring/decimal v1.4.0 github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 - github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7 - github.com/smartcontractkit/chainlink-protos/cre/go v1.0.0-beta + github.com/smartcontractkit/chainlink-common v0.10.1-0.20260226191628-ecce52da56c9 + github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f github.com/stretchr/testify v1.11.1 github.com/valyala/fastjson v1.6.10 go.opentelemetry.io/otel v1.40.0 @@ -105,7 +105,7 @@ require ( github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b // indirect github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e // indirect github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 // indirect - github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d // indirect + github.com/smartcontractkit/libocr v0.0.0-20251027221354-bdc84e1ed858 // indirect github.com/stretchr/objx v0.5.2 // indirect github.com/wk8/go-ordered-map/v2 v2.1.8 // indirect github.com/x448/float16 v0.8.4 // indirect diff --git a/go.sum b/go.sum index 3eefe4408..1a5c0d8a5 100644 --- a/go.sum +++ b/go.sum @@ -324,12 +324,12 @@ github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82 h1:6C8qej6f github.com/smacker/go-tree-sitter v0.0.0-20240827094217-dd81d9e9be82/go.mod h1:xe4pgH49k4SsmkQq5OT8abwhWmnzkhpgnXeekbx2efw= github.com/smartcontractkit/chain-selectors v1.0.89 h1:L9oWZGqQXWyTPnC6ODXgu3b0DFyLmJ9eHv+uJrE9IZY= github.com/smartcontractkit/chain-selectors v1.0.89/go.mod h1:qy7whtgG5g+7z0jt0nRyii9bLND9m15NZTzuQPkMZ5w= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7 h1:h5cmgzKpKn5N5ItpEDFhRcrtqs36nu9r/dciJub1hos= -github.com/smartcontractkit/chainlink-common v0.10.1-0.20260217160002-b56cb5356cc7/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260226191628-ecce52da56c9 h1:9rcOyYENSbzKzRFn1jeemsxMgUjrL0tfGo/SW3oN3bs= +github.com/smartcontractkit/chainlink-common v0.10.1-0.20260226191628-ecce52da56c9/go.mod h1:HXgSKzmZ/bhSx8nHU7hHW6dR+BHSXkdcpFv2T8qJcS8= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10 h1:FJAFgXS9oqASnkS03RE1HQwYQQxrO4l46O5JSzxqLgg= github.com/smartcontractkit/chainlink-common/pkg/chipingress v0.0.10/go.mod h1:oiDa54M0FwxevWwyAX773lwdWvFYYlYHHQV1LQ5HpWY= -github.com/smartcontractkit/chainlink-protos/cre/go v1.0.0-beta h1:gwVxckLPTCPlzQS3aJBzYP21j6JZDI42odrzqzWIpJA= -github.com/smartcontractkit/chainlink-protos/cre/go v1.0.0-beta/go.mod h1:jUC52kZzEnWF9tddHh85zolKybmLpbQ1oNA4FjOHt1Q= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f h1:MHlgzqiDPyDV397bZkzS9TtWXb3FR9Pb8FR9cP9h0As= +github.com/smartcontractkit/chainlink-protos/cre/go v0.0.0-20260217043601-5cc966896c4f/go.mod h1:Jqt53s27Tr0jDl8mdBXg1xhu6F8Fci8JOuq43tgHOM8= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b h1:QuI6SmQFK/zyUlVWEf0GMkiUYBPY4lssn26nKSd/bOM= github.com/smartcontractkit/chainlink-protos/linking-service/go v0.0.0-20251002192024-d2ad9222409b/go.mod h1:qSTSwX3cBP3FKQwQacdjArqv0g6QnukjV4XuzO6UyoY= github.com/smartcontractkit/chainlink-protos/node-platform v0.0.0-20260205130626-db2a2aab956b h1:36knUpKHHAZ86K4FGWXtx8i/EQftGdk2bqCoEu/Cha8= @@ -338,8 +338,8 @@ github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e h1:Hv9 github.com/smartcontractkit/freeport v0.1.3-0.20250716200817-cb5dfd0e369e/go.mod h1:T4zH9R8R8lVWKfU7tUvYz2o2jMv1OpGCdpY2j2QZXzU= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7 h1:12ijqMM9tvYVEm+nR826WsrNi6zCKpwBhuApq127wHs= github.com/smartcontractkit/grpc-proxy v0.0.0-20240830132753-a7e17fec5ab7/go.mod h1:FX7/bVdoep147QQhsOPkYsPEXhGZjeYx6lBSaSXtZOA= -github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d h1:LokA9PoCNb8mm8mDT52c3RECPMRsGz1eCQORq+J3n74= -github.com/smartcontractkit/libocr v0.0.0-20250912173940-f3ab0246e23d/go.mod h1:Acy3BTBxou83ooMESLO90s8PKSu7RvLCzwSTbxxfOK0= +github.com/smartcontractkit/libocr v0.0.0-20251027221354-bdc84e1ed858 h1:dz+lxAW+B+PUq32ODppSq5UKw06+EF6+EO6kk684bcQ= +github.com/smartcontractkit/libocr v0.0.0-20251027221354-bdc84e1ed858/go.mod h1:oJkBKVn8zoBQm7Feah9CiuEHyCqAhnp1LJBzrvloQtM= github.com/spf13/pflag v1.0.10 h1:4EBh2KAYBwaONj6b2Ye1GiHfwjqyROoF4RwYO+vPwFk= github.com/spf13/pflag v1.0.10/go.mod h1:McXfInJRrz4CZXVZOBLb0bTZqETkiAhM9Iw0y3An2Bg= github.com/stretchr/objx v0.1.0/go.mod h1:HFkY916IF+rwdDfMAkV7OtwuqBVzrE8GR6GFx+wExME= diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index ad53db34b..8b555abc1 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -35,6 +35,7 @@ type Chain interface { ID() string Config() *config.TOMLConfig DataSource() sqlutil.DataSource + KeyStore() loop.Keystore TxManager() *txm.AptosTxm LogPoller() *logpoller.AptosLogPoller @@ -77,6 +78,7 @@ type chain struct { cfg *config.TOMLConfig lggr logger.Logger ds sqlutil.DataSource + ks loop.Keystore // Sub-services txm *txm.AptosTxm @@ -117,6 +119,7 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, cfg: cfg, lggr: logger.Named(lggr, "Chain"), ds: ds, + ks: loopKs, } ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient) @@ -165,6 +168,10 @@ func (c *chain) DataSource() sqlutil.DataSource { return c.ds } +func (c *chain) KeyStore() loop.Keystore { + return c.ks +} + func (c *chain) ChainID() string { return c.id } @@ -261,16 +268,6 @@ func (c *chain) ID() string { return c.id } -func (c *chain) GetChainInfo(ctx context.Context) (types.ChainInfo, error) { - _ = ctx - return types.ChainInfo{ - FamilyName: config.ChainFamilyName, - ChainID: c.id, - NetworkName: c.cfg.NetworkName, - NetworkNameFull: c.cfg.NetworkNameFull, - }, nil -} - // LatestHead returns the latest head for the underlying chain. // TODO: should be replaced with a head tracker component func (c *chain) LatestHead(ctx context.Context) (types.Head, error) { @@ -306,6 +303,16 @@ func (c *chain) LatestHead(ctx context.Context) (types.Head, error) { }, nil } +func (c *chain) GetChainInfo(ctx context.Context) (types.ChainInfo, error) { + ci := c.chainInfo() + return types.ChainInfo{ + FamilyName: ci.ChainFamilyName, + ChainID: ci.ChainID, + NetworkName: ci.NetworkName, + NetworkNameFull: ci.NetworkNameFull, + }, nil +} + // ChainService interface func (c *chain) GetChainStatus(ctx context.Context) (types.ChainStatus, error) { toml, err := c.cfg.TOMLString() diff --git a/relayer/chainreader/chainreader.go b/relayer/chainreader/chainreader.go index eaedbee6a..eed74909d 100644 --- a/relayer/chainreader/chainreader.go +++ b/relayer/chainreader/chainreader.go @@ -327,6 +327,26 @@ func (a *aptosChainReader) GetLatestValue(ctx context.Context, readIdentifier st return codec.DecodeAptosJsonValue(transformedData, returnVal) } +// GetLatestValueWithHeadData returns the latest value and the current chain head (block height, timestamp). +// It calls GetLatestValue to fill returnVal, then fetches ledger info from the Aptos client to build the Head. +func (a *aptosChainReader) GetLatestValueWithHeadData(ctx context.Context, readIdentifier string, confidenceLevel primitives.ConfidenceLevel, params, returnVal any) (*types.Head, error) { + if err := a.GetLatestValue(ctx, readIdentifier, confidenceLevel, params, returnVal); err != nil { + return nil, err + } + nodeInfo, err := a.client.Info() + if err != nil { + return nil, fmt.Errorf("failed to get ledger info for head data: %w", err) + } + // LedgerTimestamp is in microseconds; Head.Timestamp is Unix seconds. + timestampSecs := nodeInfo.LedgerTimestamp() / 1000000 + head := &types.Head{ + Height: strconv.FormatUint(nodeInfo.BlockHeight(), 10), + Hash: nil, // Aptos NodeInfo does not expose block hash; view reads are at latest state + Timestamp: timestampSecs, + } + return head, nil +} + func (a *aptosChainReader) BatchGetLatestValues(ctx context.Context, request types.BatchGetLatestValuesRequest) (types.BatchGetLatestValuesResult, error) { result := make(types.BatchGetLatestValuesResult) diff --git a/relayer/relay.go b/relayer/relay.go index 06f7f799d..72e3b137a 100644 --- a/relayer/relay.go +++ b/relayer/relay.go @@ -7,16 +7,22 @@ import ( "fmt" "math/big" + aptosdk "github.com/aptos-labs/aptos-go-sdk" + aptosapi "github.com/aptos-labs/aptos-go-sdk/api" + "github.com/aptos-labs/aptos-go-sdk/bcs" + "github.com/google/uuid" "github.com/smartcontractkit/chainlink-common/pkg/logger" "github.com/smartcontractkit/chainlink-common/pkg/services" "github.com/smartcontractkit/chainlink-common/pkg/types" + typeaptos "github.com/smartcontractkit/chainlink-common/pkg/types/chains/aptos" "github.com/smartcontractkit/chainlink-common/pkg/types/core" - "github.com/smartcontractkit/chainlink-common/pkg/utils" + commonutils "github.com/smartcontractkit/chainlink-common/pkg/utils" "github.com/smartcontractkit/chainlink-aptos/relayer/chain" "github.com/smartcontractkit/chainlink-aptos/relayer/chainreader" crconfig "github.com/smartcontractkit/chainlink-aptos/relayer/chainreader/config" "github.com/smartcontractkit/chainlink-aptos/relayer/chainwriter" + rutils "github.com/smartcontractkit/chainlink-aptos/relayer/utils" write_target "github.com/smartcontractkit/chainlink-aptos/relayer/write_target/aptos" ) @@ -26,7 +32,7 @@ type relayer struct { chain chain.Chain lggr logger.Logger - starter utils.StartStopOnce + starter commonutils.StartStopOnce stopCh services.StopChan } @@ -154,6 +160,10 @@ func (r *relayer) NewOCR3CapabilityProvider(ctx context.Context, rargs types.Rel return nil, errors.New("ocr3 capability provider is not supported for aptos") } +func (r *relayer) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { + return nil, errors.New("ccip provider is not supported for aptos") +} + func (r *relayer) NewCCIPCommitProvider(ctx context.Context, rargs types.RelayArgs, pargs types.PluginArgs) (types.CCIPCommitProvider, error) { return nil, errors.New("ccip.commit is not supported for aptos") } @@ -162,12 +172,6 @@ func (r *relayer) NewCCIPExecProvider(ctx context.Context, rargs types.RelayArgs return nil, errors.New("ccip.exec is not supported for aptos") } -func (r *relayer) NewCCIPProvider(ctx context.Context, cargs types.CCIPProviderArgs) (types.CCIPProvider, error) { - _ = ctx - _ = cargs - return nil, errors.New("ccip provider is not supported for aptos") -} - func (r *relayer) EVM() (types.EVMService, error) { return nil, errors.New("EVMService is not supported for aptos") } @@ -180,6 +184,9 @@ func (r *relayer) Solana() (types.SolanaService, error) { return nil, errors.New("SolanaService is not supported for aptos") } +func (r *relayer) Aptos() (types.AptosService, error) { + return r, nil +} func (r *relayer) Replay(ctx context.Context, fromBlock string, args map[string]any) error { return errors.ErrUnsupported } @@ -204,3 +211,332 @@ func (r *relayer) ListNodeStatuses(ctx context.Context, pageSize int32, pageToke func (r *relayer) Transact(ctx context.Context, from, to string, amount *big.Int, balanceCheck bool) error { return r.chain.Transact(ctx, from, to, amount, balanceCheck) } + +func (r *relayer) AccountAPTBalance(ctx context.Context, req typeaptos.AccountAPTBalanceRequest) (*typeaptos.AccountAPTBalanceReply, error) { + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + balance, err := client.AccountAPTBalance(aptosdk.AccountAddress(req.Address)) + if err != nil { + return nil, err + } + + return &typeaptos.AccountAPTBalanceReply{Value: balance}, nil +} + +func (r *relayer) LedgerVersion(ctx context.Context) (uint64, error) { + client, err := r.chain.GetClient() + if err != nil { + return 0, fmt.Errorf("failed to get client: %w", err) + } + + info, err := client.Info() + if err != nil { + return 0, fmt.Errorf("failed to fetch node info: %w", err) + } + + return info.LedgerVersion(), nil +} + +func (r *relayer) View(ctx context.Context, req typeaptos.ViewRequest) (*typeaptos.ViewReply, error) { + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + payload, err := toSDKViewPayload(req.Payload) + if err != nil { + return nil, err + } + + var result []any + if req.LedgerVersion != nil { + result, err = client.View(payload, *req.LedgerVersion) + } else { + result, err = client.View(payload) + } + if err != nil { + return nil, err + } + + data, err := json.Marshal(result) + if err != nil { + return nil, fmt.Errorf("failed to marshal view response: %w", err) + } + + return &typeaptos.ViewReply{Data: data}, nil +} + +func (r *relayer) EventsByHandle(ctx context.Context, req typeaptos.EventsByHandleRequest) (*typeaptos.EventsByHandleReply, error) { + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + events, err := client.EventsByHandle(aptosdk.AccountAddress(req.Account), req.EventHandle, req.FieldName, req.Start, req.Limit) + if err != nil { + return nil, err + } + + out := &typeaptos.EventsByHandleReply{Events: make([]*typeaptos.Event, 0, len(events))} + for _, e := range events { + if e == nil { + continue + } + + ev := &typeaptos.Event{ + Version: e.Version, + Type: e.Type, + SequenceNumber: e.SequenceNumber, + } + if e.Guid != nil { + var addr typeaptos.AccountAddress + if e.Guid.AccountAddress != nil { + copy(addr[:], e.Guid.AccountAddress[:]) + } + ev.Guid = &typeaptos.GUID{ + CreationNumber: e.Guid.CreationNumber, + AccountAddress: addr, + } + } + if e.Data != nil { + data, marshalErr := json.Marshal(e.Data) + if marshalErr != nil { + return nil, fmt.Errorf("failed to marshal event data: %w", marshalErr) + } + ev.Data = data + } + out.Events = append(out.Events, ev) + } + + return out, nil +} + +func (r *relayer) TransactionByHash(ctx context.Context, req typeaptos.TransactionByHashRequest) (*typeaptos.TransactionByHashReply, error) { + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + tx, err := client.TransactionByHash(req.Hash) + if err != nil { + return nil, err + } + if tx == nil { + return &typeaptos.TransactionByHashReply{}, nil + } + + converted, err := convertSDKTransaction(tx) + if err != nil { + return nil, err + } + return &typeaptos.TransactionByHashReply{Transaction: converted}, nil +} + +func (r *relayer) SubmitTransaction(ctx context.Context, req typeaptos.SubmitTransactionRequest) (*typeaptos.SubmitTransactionReply, error) { + cfg := r.chain.Config() + if cfg.Workflow == nil || cfg.Workflow.PublicKey == "" { + return nil, fmt.Errorf("workflow public key not configured") + } + + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + var payload aptosdk.TransactionPayload + if err := bcs.Deserialize(&payload, req.EncodedPayload); err != nil { + return nil, fmt.Errorf("failed to decode transaction payload: %w", err) + } + + publicKey, err := rutils.HexPublicKeyToEd25519PublicKey(cfg.Workflow.PublicKey) + if err != nil { + return nil, fmt.Errorf("invalid workflow public key: %w", err) + } + fromAddress := rutils.Ed25519PublicKeyToAddress(publicKey) + + var maxGasAmount uint64 + var gasUnitPrice uint64 + if req.GasConfig != nil { + maxGasAmount = req.GasConfig.MaxGasAmount + gasUnitPrice = req.GasConfig.GasUnitPrice + } else { + gasInfo, gasErr := client.EstimateGasPrice() + if gasErr != nil { + return nil, fmt.Errorf("failed to estimate gas price: %w", gasErr) + } + maxGasAmount = cfg.TransactionManager.DefaultMaxGasAmount + cfg.TransactionManager.GasLimitOverhead + gasUnitPrice = gasInfo.GasEstimate + } + + txID, err := uuid.NewUUID() + if err != nil { + return nil, err + } + submittedTx, err := r.chain.TxManager().SubmitPayload( + ctx, + txID.String(), + nil, + fromAddress, + publicKey, + payload, + maxGasAmount, + gasUnitPrice, + ) + if err != nil { + return nil, fmt.Errorf("failed to submit transaction via txm: %w", err) + } + if submittedTx == nil || submittedTx.Hash == "" { + return nil, fmt.Errorf("submit transaction returned empty hash") + } + + sender := typeaptos.AccountAddress(submittedTx.Sender) + + return &typeaptos.SubmitTransactionReply{ + PendingTransaction: &typeaptos.PendingTransaction{ + Hash: submittedTx.Hash, + Sender: sender, + SequenceNumber: submittedTx.Nonce, + MaxGasAmount: submittedTx.MaxGasAmount, + GasUnitPrice: submittedTx.GasUnitPrice, + ExpirationTimestampSecs: submittedTx.ExpirationTimestampSecs, + Payload: req.EncodedPayload, + Signature: submittedTx.Signature, + }, + }, nil +} + +func (r *relayer) AccountTransactions(ctx context.Context, req typeaptos.AccountTransactionsRequest) (*typeaptos.AccountTransactionsReply, error) { + client, err := r.chain.GetClient() + if err != nil { + return nil, fmt.Errorf("failed to get client: %w", err) + } + + txs, err := client.AccountTransactions(aptosdk.AccountAddress(req.Address), req.Start, req.Limit) + if err != nil { + return nil, err + } + + out := &typeaptos.AccountTransactionsReply{Transactions: make([]*typeaptos.Transaction, 0, len(txs))} + for _, tx := range txs { + if tx == nil { + continue + } + asTxn := &aptosapi.Transaction{Type: tx.Type, Inner: tx.Inner} + converted, convErr := convertSDKTransaction(asTxn) + if convErr != nil { + return nil, convErr + } + out.Transactions = append(out.Transactions, converted) + } + + return out, nil +} + +func convertSDKTransaction(tx *aptosapi.Transaction) (*typeaptos.Transaction, error) { + if tx == nil { + return nil, nil + } + + var version *uint64 + if v := tx.Version(); v != nil { + vv := *v + version = &vv + } + + var success *bool + if s := tx.Success(); s != nil { + ss := *s + success = &ss + } + + data, err := json.Marshal(tx) + if err != nil { + return nil, fmt.Errorf("failed to marshal transaction: %w", err) + } + + return &typeaptos.Transaction{ + Type: typeaptos.TransactionVariant(tx.Type), + Hash: string(tx.Hash()), + Version: version, + Success: success, + Data: data, + }, nil +} + +func toSDKViewPayload(payload *typeaptos.ViewPayload) (*aptosdk.ViewPayload, error) { + if payload == nil { + return nil, fmt.Errorf("view payload is nil") + } + + argTypes := make([]aptosdk.TypeTag, 0, len(payload.ArgTypes)) + for _, t := range payload.ArgTypes { + converted, err := toSDKTypeTag(t) + if err != nil { + return nil, err + } + argTypes = append(argTypes, converted) + } + + return &aptosdk.ViewPayload{ + Module: aptosdk.ModuleId{ + Address: aptosdk.AccountAddress(payload.Module.Address), + Name: payload.Module.Name, + }, + Function: payload.Function, + ArgTypes: argTypes, + Args: payload.Args, + }, nil +} + +func toSDKTypeTag(tag typeaptos.TypeTag) (aptosdk.TypeTag, error) { + switch v := tag.Value.(type) { + case typeaptos.BoolTag: + return aptosdk.TypeTag{Value: &aptosdk.BoolTag{}}, nil + case typeaptos.U8Tag: + return aptosdk.TypeTag{Value: &aptosdk.U8Tag{}}, nil + case typeaptos.U16Tag: + return aptosdk.TypeTag{Value: &aptosdk.U16Tag{}}, nil + case typeaptos.U32Tag: + return aptosdk.TypeTag{Value: &aptosdk.U32Tag{}}, nil + case typeaptos.U64Tag: + return aptosdk.TypeTag{Value: &aptosdk.U64Tag{}}, nil + case typeaptos.U128Tag: + return aptosdk.TypeTag{Value: &aptosdk.U128Tag{}}, nil + case typeaptos.U256Tag: + return aptosdk.TypeTag{Value: &aptosdk.U256Tag{}}, nil + case typeaptos.AddressTag: + return aptosdk.TypeTag{Value: &aptosdk.AddressTag{}}, nil + case typeaptos.SignerTag: + return aptosdk.TypeTag{Value: &aptosdk.SignerTag{}}, nil + case typeaptos.VectorTag: + inner, err := toSDKTypeTag(v.ElementType) + if err != nil { + return aptosdk.TypeTag{}, err + } + return aptosdk.TypeTag{Value: &aptosdk.VectorTag{TypeParam: inner}}, nil + case typeaptos.StructTag: + typeParams := make([]aptosdk.TypeTag, 0, len(v.TypeParams)) + for _, p := range v.TypeParams { + inner, err := toSDKTypeTag(p) + if err != nil { + return aptosdk.TypeTag{}, err + } + typeParams = append(typeParams, inner) + } + return aptosdk.TypeTag{ + Value: &aptosdk.StructTag{ + Address: aptosdk.AccountAddress(v.Address), + Module: v.Module, + Name: v.Name, + TypeParams: typeParams, + }, + }, nil + case typeaptos.GenericTag: + return aptosdk.TypeTag{Value: &aptosdk.GenericTag{Num: uint64(v.Index)}}, nil + default: + return aptosdk.TypeTag{}, fmt.Errorf("unsupported aptos type tag: %T", tag.Value) + } +} diff --git a/relayer/txm/tx.go b/relayer/txm/tx.go index a3128dc2e..2ce13a7d1 100644 --- a/relayer/txm/tx.go +++ b/relayer/txm/tx.go @@ -20,8 +20,18 @@ type AptosTx struct { FunctionName string TypeTags []aptos.TypeTag BcsValues [][]byte - Attempt uint64 - Status commontypes.TransactionStatus - Simulate bool - Fee *big.Int // Transaction fee in octas (1e-8 APT) + // Payload, when set, is used directly instead of reconstructing an EntryFunction from ModuleName/FunctionName/Args. + Payload *aptos.TransactionPayload + GasUnitPriceOverride *uint64 + Attempt uint64 + Status commontypes.TransactionStatus + Simulate bool + Fee *big.Int // Transaction fee in octas (1e-8 APT) + + LastSubmittedHash string + LastSubmittedNonce uint64 + LastSubmittedExpirationTimestampSecs uint64 + LastSubmittedMaxGasAmount uint64 + LastSubmittedGasUnitPrice uint64 + LastSubmittedSignature []byte } diff --git a/relayer/txm/txm.go b/relayer/txm/txm.go index 4c8f4e1bc..6f1525ec6 100644 --- a/relayer/txm/txm.go +++ b/relayer/txm/txm.go @@ -45,6 +45,16 @@ type AptosTxm struct { getClient func() (aptos.AptosRpcClient, error) } +type SubmittedTransaction struct { + Hash string + Nonce uint64 + ExpirationTimestampSecs uint64 + MaxGasAmount uint64 + GasUnitPrice uint64 + Signature []byte + Sender aptos.AccountAddress +} + // TODO: Config input is not validated for sanity func New(lgr logger.Logger, keystore loop.Keystore, config Config, getClient func() (aptos.AptosRpcClient, error)) (*AptosTxm, error) { return &AptosTxm{ @@ -221,6 +231,85 @@ func (a *AptosTxm) Enqueue(transactionID string, txMetadata *commontypes.TxMeta, return nil } +// SubmitPayload submits a pre-encoded Aptos payload via TXM synchronously and +// returns the first submitted tx details (hash/nonce/gas/signature) when +// accepted into mempool. +func (a *AptosTxm) SubmitPayload( + ctx context.Context, + transactionID string, + txMetadata *commontypes.TxMeta, + fromAddress aptos.AccountAddress, + publicKey ed25519.PublicKey, + payload aptos.TransactionPayload, + maxGasAmount uint64, + gasUnitPrice uint64, +) (*SubmittedTransaction, error) { + if transactionID == "" { + transactionID = uuid.New().String() + } + + if err := ctx.Err(); err != nil { + return nil, err + } + + currentTimestamp := getTimestampSecs() + tx := &AptosTx{ + ID: transactionID, + Metadata: txMetadata, + Timestamp: currentTimestamp, + FromAddress: fromAddress, + PublicKey: publicKey, + Payload: &payload, + Status: commontypes.Pending, + Simulate: false, + } + if maxGasAmount > 0 { + if tx.Metadata == nil { + tx.Metadata = &commontypes.TxMeta{} + } + tx.Metadata.GasLimit = new(big.Int).SetUint64(maxGasAmount) + } + if gasUnitPrice > 0 { + price := gasUnitPrice + tx.GasUnitPriceOverride = &price + } + + a.transactionsLock.Lock() + _, transactionExists := a.transactions[transactionID] + if transactionExists { + a.transactionsLock.Unlock() + return nil, errors.New("transaction already exists") + } + + if (currentTimestamp - a.transactionsLastPruneTime) > a.config.PruneIntervalSecs { + for txID, existingTx := range a.transactions { + if existingTx.Status != commontypes.Finalized && existingTx.Status != commontypes.Failed && existingTx.Status != commontypes.Fatal { + continue + } + if (currentTimestamp - existingTx.Timestamp) < a.config.PruneTxExpirationSecs { + continue + } + delete(a.transactions, txID) + } + a.transactionsLastPruneTime = currentTimestamp + } + + a.transactions[transactionID] = tx + a.transactionsLock.Unlock() + + a.signAndBroadcast(tx) + + status, err := a.GetStatus(transactionID) + if err != nil { + return nil, err + } + if status != commontypes.Unconfirmed && status != commontypes.Finalized { + return nil, fmt.Errorf("transaction not accepted by txm, status=%d", status) + } + + return a.GetSubmittedTransaction(transactionID) +} + func (a *AptosTxm) GetStatus(transactionID string) (commontypes.TransactionStatus, error) { if transactionID == "" { return commontypes.Unknown, errors.New("nil tx id") @@ -259,6 +348,34 @@ func (a *AptosTxm) GetTransactionFee(ctx context.Context, transactionID string) return tx.Fee, nil } +func (a *AptosTxm) GetSubmittedTransaction(transactionID string) (*SubmittedTransaction, error) { + if transactionID == "" { + return nil, errors.New("nil tx id") + } + + a.transactionsLock.RLock() + defer a.transactionsLock.RUnlock() + + tx, ok := a.transactions[transactionID] + if !ok { + return nil, errors.New("no such tx") + } + if tx.LastSubmittedHash == "" { + return nil, errors.New("transaction has not been submitted yet") + } + + sig := append([]byte(nil), tx.LastSubmittedSignature...) + return &SubmittedTransaction{ + Hash: tx.LastSubmittedHash, + Nonce: tx.LastSubmittedNonce, + ExpirationTimestampSecs: tx.LastSubmittedExpirationTimestampSecs, + MaxGasAmount: tx.LastSubmittedMaxGasAmount, + GasUnitPrice: tx.LastSubmittedGasUnitPrice, + Signature: sig, + Sender: tx.FromAddress, + }, nil +} + func (a *AptosTxm) broadcastLoop() { defer a.done.Done() @@ -322,16 +439,21 @@ func (a *AptosTxm) createRawTx(client aptos.AptosRpcClient, tx *AptosTx, nonce u expirationTimestampSecs := ledgerTimestampSecs + a.config.TxExpirationSecs - payload := aptos.TransactionPayload{ - Payload: &aptos.EntryFunction{ - Module: aptos.ModuleId{ - Address: tx.ContractAddress, - Name: tx.ModuleName, + payload := aptos.TransactionPayload{} + if tx.Payload != nil { + payload = *tx.Payload + } else { + payload = aptos.TransactionPayload{ + Payload: &aptos.EntryFunction{ + Module: aptos.ModuleId{ + Address: tx.ContractAddress, + Name: tx.ModuleName, + }, + Function: tx.FunctionName, + ArgTypes: tx.TypeTags, + Args: tx.BcsValues, }, - Function: tx.FunctionName, - ArgTypes: tx.TypeTags, - Args: tx.BcsValues, - }, + } } rawTx := &aptos.RawTransaction{ @@ -372,6 +494,10 @@ func (a *AptosTxm) createRawTx(client aptos.AptosRpcClient, tx *AptosTx, nonce u } } + if tx.GasUnitPriceOverride != nil && *tx.GasUnitPriceOverride > 0 { + rawTx.GasUnitPrice = *tx.GasUnitPriceOverride + } + if rawTx.GasUnitPrice == 0 { // If simulate was disabled or failed, populate the gas unit price. gasInfo, err := client.EstimateGasPrice() @@ -406,21 +532,21 @@ func (a *AptosTxm) createRawTx(client aptos.AptosRpcClient, tx *AptosTx, nonce u return rawTx, nil } -func (a *AptosTxm) createSignedTx(client aptos.AptosRpcClient, rawTx *aptos.RawTransaction, publicKey ed25519.PublicKey, fromAddress aptos.AccountAddress) (*aptos.SignedTransaction, error) { +func (a *AptosTxm) createSignedTx(client aptos.AptosRpcClient, rawTx *aptos.RawTransaction, publicKey ed25519.PublicKey, fromAddress aptos.AccountAddress) (*aptos.SignedTransaction, []byte, error) { signingMessage, err := rawTx.SigningMessage() if err != nil { - return nil, fmt.Errorf("failed to create signing message: %w", err) + return nil, nil, fmt.Errorf("failed to create signing message: %w", err) } signature, err := a.keystore.Sign(context.Background(), fmt.Sprintf("%064x", publicKey), signingMessage) if err != nil { - return nil, fmt.Errorf("failed to sign message for address %s: %w", fromAddress, err) + return nil, nil, fmt.Errorf("failed to sign message for address %s: %w", fromAddress, err) } sig := aptoscrypto.Ed25519Signature{} err = sig.FromBytes(signature) if err != nil { - return nil, fmt.Errorf("failed to deserialize signature: %w", err) + return nil, nil, fmt.Errorf("failed to deserialize signature: %w", err) } authenticator := &aptoscrypto.Ed25519Authenticator{ @@ -433,10 +559,10 @@ func (a *AptosTxm) createSignedTx(client aptos.AptosRpcClient, rawTx *aptos.RawT Auth: authenticator, }) if err != nil { - return nil, fmt.Errorf("failed to sign tx: %w", err) + return nil, nil, fmt.Errorf("failed to sign tx: %w", err) } - return signedTx, nil + return signedTx, signature, nil } func (a *AptosTxm) updateTransactionStatus(tx *AptosTx, status commontypes.TransactionStatus) { @@ -445,6 +571,17 @@ func (a *AptosTxm) updateTransactionStatus(tx *AptosTx, status commontypes.Trans tx.Status = status } +func (a *AptosTxm) updateTransactionSubmittedState(tx *AptosTx, hash string, nonce uint64, expirationTimestampSecs uint64, maxGasAmount uint64, gasUnitPrice uint64, signature []byte) { + a.transactionsLock.Lock() + defer a.transactionsLock.Unlock() + tx.LastSubmittedHash = hash + tx.LastSubmittedNonce = nonce + tx.LastSubmittedExpirationTimestampSecs = expirationTimestampSecs + tx.LastSubmittedMaxGasAmount = maxGasAmount + tx.LastSubmittedGasUnitPrice = gasUnitPrice + tx.LastSubmittedSignature = append([]byte(nil), signature...) +} + func (a *AptosTxm) updateTransactionFee(tx *AptosTx, fee *big.Int) { a.transactionsLock.Lock() defer a.transactionsLock.Unlock() @@ -481,11 +618,16 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { } newTxStore, err := a.accountStore.CreateTxStore(tx.FromAddress.String(), sequenceNumber) if err != nil { - ctxLogger.Errorw("failed to create tx store", "fromAddress", tx.FromAddress.String(), "error", err) - a.updateTransactionStatus(tx, commontypes.Failed) - return + // Another concurrent submission may have created the store first. + txStore = a.accountStore.GetTxStore(tx.FromAddress.String()) + if txStore == nil { + ctxLogger.Errorw("failed to create tx store", "fromAddress", tx.FromAddress.String(), "error", err) + a.updateTransactionStatus(tx, commontypes.Failed) + return + } + } else { + txStore = newTxStore } - txStore = newTxStore } currentAttempt := a.getTransactionAttempt(tx) @@ -507,7 +649,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { return } - signedTx, err := a.createSignedTx(client, rawTx, tx.PublicKey, tx.FromAddress) + signedTx, signature, err := a.createSignedTx(client, rawTx, tx.PublicKey, tx.FromAddress) if err != nil { ctxLogger.Errorw("failed to create signed tx", "error", err) a.updateTransactionStatus(tx, commontypes.Failed) @@ -525,6 +667,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { // tx included in the Mempool currentAttempt := a.getTransactionAttempt(tx) ctxLogger.Debugw("submit tx successful", "attempt", currentAttempt, "submitResponse", submitResponse) + a.updateTransactionSubmittedState(tx, submitResponse.Hash, nonce, rawTx.ExpirationTimestampSeconds, rawTx.MaxGasAmount, rawTx.GasUnitPrice, signature) err = txStore.AddUnconfirmed(nonce, submitResponse.Hash, rawTx.ExpirationTimestampSeconds, tx) if err != nil { @@ -551,11 +694,20 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { ctxLogger.Errorw("failed to submit signed tx, retrying..", "error", httpError) time.Sleep(time.Duration(a.config.SubmitDelayDuration) * time.Second) - httpErrorBody := string(httpError.Body) - if strings.Contains(httpErrorBody, "SEQUENCE_NUMBER_TOO_OLD") || strings.Contains(httpErrorBody, "SEQUENCE_NUMBER_TOO_NEW") { + httpErrorBody := strings.ToUpper(string(httpError.Body)) + isTooOld := strings.Contains(httpErrorBody, "SEQUENCE_NUMBER_TOO_OLD") + isTooNew := strings.Contains(httpErrorBody, "SEQUENCE_NUMBER_TOO_NEW") + isInvalidUpdate := strings.Contains(httpErrorBody, "INVALID_TRANSACTION_UPDATE") || strings.Contains(httpErrorBody, "TRANSACTION ALREADY IN MEMPOOL WITH A DIFFERENT PAYLOAD") + + if isTooOld || isTooNew { // Try to resync the nonce before the next attempt. _ = a.resyncNonce(client, tx) } + if isTooOld || isInvalidUpdate { + // Mempool accepted a tx with this nonce, but onchain sequence may not + // be updated yet. Move local nonce cursor forward. + txStore.AdvanceNextNonce(nonce + 1) + } } } diff --git a/relayer/txm/txm_error_test.go b/relayer/txm/txm_error_test.go index 04ade5c00..295585fae 100644 --- a/relayer/txm/txm_error_test.go +++ b/relayer/txm/txm_error_test.go @@ -137,7 +137,7 @@ func runErrorsTest(t *testing.T, logger logger.Logger, config Config, rpcURL str require.NoError(t, err) rawTx.SequenceNumber = sequenceNumber - 1 - signedTx, err := txm.createSignedTx(rlClient, rawTx, selectedTx.PublicKey, selectedTx.FromAddress) + signedTx, _, err := txm.createSignedTx(rlClient, rawTx, selectedTx.PublicKey, selectedTx.FromAddress) require.NoError(t, err) _, err = client.SubmitTransaction(signedTx) @@ -147,7 +147,7 @@ func runErrorsTest(t *testing.T, logger logger.Logger, config Config, rpcURL str // Test with expired transaction rawTx.SequenceNumber = sequenceNumber rawTx.ExpirationTimestampSeconds = rawTx.ExpirationTimestampSeconds - txm.config.TxExpirationSecs - 3600 // 1 hour ago - signedTx, err = txm.createSignedTx(rlClient, rawTx, selectedTx.PublicKey, selectedTx.FromAddress) + signedTx, _, err = txm.createSignedTx(rlClient, rawTx, selectedTx.PublicKey, selectedTx.FromAddress) require.NoError(t, err) _, err = client.SubmitTransaction(signedTx) diff --git a/relayer/txm/txstore.go b/relayer/txm/txstore.go index db7717561..ce73e51ef 100644 --- a/relayer/txm/txstore.go +++ b/relayer/txm/txstore.go @@ -86,6 +86,17 @@ func (s *TxStore) GetNextNonce() uint64 { return nextNonce } +// AdvanceNextNonce moves the tx store cursor forward to at least minNextNonce. +// This is used for mempool-only sequence collisions where onchain nonce has not +// moved yet but the previous nonce is already occupied by another in-flight tx. +func (s *TxStore) AdvanceNextNonce(minNextNonce uint64) { + s.lock.Lock() + defer s.lock.Unlock() + if s.nextNonce < minNextNonce { + s.nextNonce = minNextNonce + } +} + func (s *TxStore) AddUnconfirmed(nonce uint64, hash string, expirationTimestampSecs uint64, tx *AptosTx) error { s.lock.Lock() defer s.lock.Unlock()