diff --git a/relayer/chain/chain.go b/relayer/chain/chain.go index ad53db34b..2b2a87977 100644 --- a/relayer/chain/chain.go +++ b/relayer/chain/chain.go @@ -119,7 +119,7 @@ func newChain(cfg *config.TOMLConfig, loopKs loop.Keystore, lggr logger.Logger, ds: ds, } - ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient) + ch.txm, err = txm.New(lggr, loopKs, *cfg.TransactionManager, ch.GetClient, cfg.ChainID) if err != nil { return nil, err } diff --git a/relayer/chainreader/chainreader_local_test.go b/relayer/chainreader/chainreader_local_test.go index 4d90e8940..3c73000ae 100644 --- a/relayer/chainreader/chainreader_local_test.go +++ b/relayer/chainreader/chainreader_local_test.go @@ -134,7 +134,7 @@ func runGetLatestValueTest(t *testing.T, logger logger.Logger, rpcUrl string, ac getClient := func() (aptos.AptosRpcClient, error) { return rateLimitedClient, nil } txmConfig := txm.DefaultConfigSet - txmgr, err := txm.New(logger, keystore, txmConfig, getClient) + txmgr, err := txm.New(logger, keystore, txmConfig, getClient, chainInfo.ChainID) require.NoError(t, err) err = txmgr.Start(context.Background()) @@ -560,7 +560,7 @@ func runQueryKeyPersistentTest(t *testing.T, logger logger.Logger, rpcUrl string rateLimitedClient := ratelimit.NewRateLimitedClient(client, chainInfo, rpcUrl, 100, 30*time.Second) getClient := func() (aptos.AptosRpcClient, error) { return rateLimitedClient, nil } - txmgr, err := txm.New(logger, keystore, txm.DefaultConfigSet, getClient) + txmgr, err := txm.New(logger, keystore, txm.DefaultConfigSet, getClient, chainInfo.ChainID) require.NoError(t, err) err = txmgr.Start(context.Background()) require.NoError(t, err) @@ -890,7 +890,7 @@ func TestLoopChainReaderPersistent(t *testing.T) { keystore := testutils.NewTestKeystore(t) keystore.AddKey(privKey) getClient := func() (aptos.AptosRpcClient, error) { return rlClient, nil } - txmgr, err := txm.New(lg, keystore, txm.DefaultConfigSet, getClient) + txmgr, err := txm.New(lg, keystore, txm.DefaultConfigSet, getClient, chainInfo.ChainID) require.NoError(t, err) err = txmgr.Start(context.Background()) require.NoError(t, err) diff --git a/relayer/chainwriter/chainwriter_local_test.go b/relayer/chainwriter/chainwriter_local_test.go index 925742ca3..fa794e142 100644 --- a/relayer/chainwriter/chainwriter_local_test.go +++ b/relayer/chainwriter/chainwriter_local_test.go @@ -75,7 +75,7 @@ func runChainWriterTest(t *testing.T, logger logger.Logger, rpcURL string, accou txmConfig := txm.DefaultConfigSet - txmgr, err := txm.New(logger, keystore, txmConfig, getClient) + txmgr, err := txm.New(logger, keystore, txmConfig, getClient, chainInfo.ChainID) require.NoError(t, err) err = txmgr.Start(context.Background()) require.NoError(t, err) diff --git a/relayer/txm/metrics.go b/relayer/txm/metrics.go new file mode 100644 index 000000000..35283953c --- /dev/null +++ b/relayer/txm/metrics.go @@ -0,0 +1,188 @@ +package txm + +import ( + "context" + "fmt" + + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/promauto" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/metric" + + "github.com/smartcontractkit/chainlink-common/pkg/beholder" + "github.com/smartcontractkit/chainlink-common/pkg/metrics" +) + +var ( + // broadcasted transactions + promAptosTxmBroadcastedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_broadcasted", + Help: "Number of transactions successfully submitted to the mempool", + }, []string{"chainID"}) + + // successful transactions + promAptosTxmSuccessTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_success", + Help: "Number of transactions confirmed successfully on-chain", + }, []string{"chainID"}) + promAptosTxmFinalizedTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_finalized", + Help: "Number of transactions that reached finalized status", + }, []string{"chainID"}) + + // inflight transactions + promAptosTxmPendingTxs = promauto.NewGaugeVec(prometheus.GaugeOpts{ + Name: "aptos_txm_tx_pending", + Help: "Number of unconfirmed transactions currently in-flight", + }, []string{"chainID"}) + + // error cases + promAptosTxmErrorTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_error", + Help: "Total number of transaction errors across all failure modes", + }, []string{"chainID"}) + promAptosTxmRevertTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_error_revert", + Help: "Number of transactions confirmed but unsuccessful on-chain (e.g. out of gas)", + }, []string{"chainID"}) + promAptosTxmRejectTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_error_reject", + Help: "Number of transactions rejected by the RPC after exhausting submit retries", + }, []string{"chainID"}) + promAptosTxmDropTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_error_drop", + Help: "Number of transactions that expired without being committed on-chain", + }, []string{"chainID"}) + promAptosTxmRetryTxs = promauto.NewCounterVec(prometheus.CounterOpts{ + Name: "aptos_txm_tx_retry", + Help: "Number of transaction retries triggered (out-of-gas or expired)", + }, []string{"chainID"}) +) + +type aptosTxmMetrics struct { + metrics.Labeler + chainID string + + broadcastedTxs metric.Int64Counter + successTxs metric.Int64Counter + finalizedTxs metric.Int64Counter + pendingTxs metric.Int64Gauge + errorTxs metric.Int64Counter + revertTxs metric.Int64Counter + rejectTxs metric.Int64Counter + dropTxs metric.Int64Counter + retryTxs metric.Int64Counter +} + +func newAptosTxmMetrics(chainID string) (*aptosTxmMetrics, error) { + m := beholder.GetMeter() + + broadcastedTxs, err := m.Int64Counter("aptos_txm_tx_broadcasted") + if err != nil { + return nil, fmt.Errorf("failed to register broadcasted txs counter: %w", err) + } + + successTxs, err := m.Int64Counter("aptos_txm_tx_success") + if err != nil { + return nil, fmt.Errorf("failed to register success txs counter: %w", err) + } + + finalizedTxs, err := m.Int64Counter("aptos_txm_tx_finalized") + if err != nil { + return nil, fmt.Errorf("failed to register finalized txs counter: %w", err) + } + + pendingTxs, err := m.Int64Gauge("aptos_txm_tx_pending") + if err != nil { + return nil, fmt.Errorf("failed to register pending txs gauge: %w", err) + } + + errorTxs, err := m.Int64Counter("aptos_txm_tx_error") + if err != nil { + return nil, fmt.Errorf("failed to register error txs counter: %w", err) + } + + revertTxs, err := m.Int64Counter("aptos_txm_tx_error_revert") + if err != nil { + return nil, fmt.Errorf("failed to register revert txs counter: %w", err) + } + + rejectTxs, err := m.Int64Counter("aptos_txm_tx_error_reject") + if err != nil { + return nil, fmt.Errorf("failed to register reject txs counter: %w", err) + } + + dropTxs, err := m.Int64Counter("aptos_txm_tx_error_drop") + if err != nil { + return nil, fmt.Errorf("failed to register drop txs counter: %w", err) + } + + retryTxs, err := m.Int64Counter("aptos_txm_tx_retry") + if err != nil { + return nil, fmt.Errorf("failed to register retry txs counter: %w", err) + } + + return &aptosTxmMetrics{ + chainID: chainID, + Labeler: metrics.NewLabeler().With("chainID", chainID), + + broadcastedTxs: broadcastedTxs, + successTxs: successTxs, + finalizedTxs: finalizedTxs, + pendingTxs: pendingTxs, + errorTxs: errorTxs, + revertTxs: revertTxs, + rejectTxs: rejectTxs, + dropTxs: dropTxs, + retryTxs: retryTxs, + }, nil +} + +func (m *aptosTxmMetrics) getOtelAttributes() []attribute.KeyValue { + return beholder.OtelAttributes(m.Labels).AsStringAttributes() +} + +func (m *aptosTxmMetrics) IncrementBroadcastedTxs(ctx context.Context) { + promAptosTxmBroadcastedTxs.WithLabelValues(m.chainID).Add(1) + m.broadcastedTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementSuccessTxs(ctx context.Context) { + promAptosTxmSuccessTxs.WithLabelValues(m.chainID).Add(1) + m.successTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementFinalizedTxs(ctx context.Context) { + promAptosTxmFinalizedTxs.WithLabelValues(m.chainID).Add(1) + m.finalizedTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) SetPendingTxs(ctx context.Context, count int) { + promAptosTxmPendingTxs.WithLabelValues(m.chainID).Set(float64(count)) + m.pendingTxs.Record(ctx, int64(count), metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementErrorTxs(ctx context.Context) { + promAptosTxmErrorTxs.WithLabelValues(m.chainID).Add(1) + m.errorTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementRevertTxs(ctx context.Context) { + promAptosTxmRevertTxs.WithLabelValues(m.chainID).Add(1) + m.revertTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementRejectTxs(ctx context.Context) { + promAptosTxmRejectTxs.WithLabelValues(m.chainID).Add(1) + m.rejectTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementDropTxs(ctx context.Context) { + promAptosTxmDropTxs.WithLabelValues(m.chainID).Add(1) + m.dropTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} + +func (m *aptosTxmMetrics) IncrementRetryTxs(ctx context.Context) { + promAptosTxmRetryTxs.WithLabelValues(m.chainID).Add(1) + m.retryTxs.Add(ctx, 1, metric.WithAttributes(m.getOtelAttributes()...)) +} diff --git a/relayer/txm/txm.go b/relayer/txm/txm.go index 4c8f4e1bc..181073643 100644 --- a/relayer/txm/txm.go +++ b/relayer/txm/txm.go @@ -31,6 +31,8 @@ type AptosTxm struct { baseLogger logger.Logger keystore loop.Keystore config Config + chainID string + metrics *aptosTxmMetrics transactions map[string]*AptosTx transactionsLock sync.RWMutex @@ -46,11 +48,18 @@ type AptosTxm struct { } // TODO: Config input is not validated for sanity -func New(lgr logger.Logger, keystore loop.Keystore, config Config, getClient func() (aptos.AptosRpcClient, error)) (*AptosTxm, error) { +func New(lgr logger.Logger, keystore loop.Keystore, config Config, getClient func() (aptos.AptosRpcClient, error), chainID string) (*AptosTxm, error) { + metrics, err := newAptosTxmMetrics(chainID) + if err != nil { + return nil, fmt.Errorf("failed to initialize metrics: %w", err) + } + return &AptosTxm{ baseLogger: logger.Named(lgr, "AptosTxm"), keystore: keystore, config: config, + chainID: chainID, + metrics: metrics, getClient: getClient, transactions: map[string]*AptosTx{}, @@ -262,7 +271,7 @@ func (a *AptosTxm) GetTransactionFee(ctx context.Context, transactionID string) func (a *AptosTxm) broadcastLoop() { defer a.done.Done() - _, cancel := commonutils.ContextFromChan(a.stop) + ctx, cancel := commonutils.ContextFromChan(a.stop) defer cancel() a.baseLogger.Debugw("broadcastLoop: started") @@ -299,7 +308,7 @@ func (a *AptosTxm) broadcastLoop() { }) for _, tx := range broadcastTxs { - a.signAndBroadcast(tx) + a.signAndBroadcast(ctx, tx) } case <-a.stop: a.baseLogger.Debugw("broadcastLoop: stopped") @@ -463,7 +472,7 @@ func (a *AptosTxm) getTransactionAttempt(tx *AptosTx) uint64 { return tx.Attempt } -func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { +func (a *AptosTxm) signAndBroadcast(ctx context.Context, tx *AptosTx) { ctxLogger := GetContexedTxLogger(a.baseLogger, tx.ID, tx.Metadata) client, err := a.getClient() if err != nil { @@ -477,12 +486,14 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { if err != nil { ctxLogger.Errorw("failed to get sequence number", "fromAddress", tx.FromAddress.String(), "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } newTxStore, err := a.accountStore.CreateTxStore(tx.FromAddress.String(), sequenceNumber) if err != nil { ctxLogger.Errorw("failed to create tx store", "fromAddress", tx.FromAddress.String(), "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } txStore = newTxStore @@ -504,6 +515,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { if err != nil { ctxLogger.Errorw("failed to create raw tx", "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } @@ -511,6 +523,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { if err != nil { ctxLogger.Errorw("failed to create signed tx", "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } @@ -519,6 +532,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { if submitResponse == nil || submitResponse.Hash == "" { ctxLogger.Errorw("did not receive hash after successful tx submission") a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } @@ -531,10 +545,12 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { // TODO: figure out what to do here, this should never occur. ctxLogger.Errorw("failed to add unconfirmed tx", "txHash", submitResponse.Hash, "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } a.updateTransactionStatus(tx, commontypes.Unconfirmed) + a.metrics.IncrementBroadcastedTxs(ctx) return } else { // In case of http errors (>400) wait gracefully and retry @@ -545,6 +561,7 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { // Do not retry on unknown errors ctxLogger.Errorw("failed to submit signed tx, discarding..", "error", err) a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) return } @@ -561,12 +578,14 @@ func (a *AptosTxm) signAndBroadcast(tx *AptosTx) { ctxLogger.Errorw("reached max retries for submitting the tx") a.updateTransactionStatus(tx, commontypes.Failed) + a.metrics.IncrementRejectTxs(ctx) + a.metrics.IncrementErrorTxs(ctx) } func (a *AptosTxm) confirmLoop() { defer a.done.Done() - _, cancel := commonutils.ContextFromChan(a.stop) + ctx, cancel := commonutils.ContextFromChan(a.stop) defer cancel() pollDuration := time.Duration(a.config.ConfirmPollSecs) * time.Second @@ -579,7 +598,7 @@ func (a *AptosTxm) confirmLoop() { case <-tick: start := time.Now() - a.checkUnconfirmed() + a.checkUnconfirmed(ctx) remaining := pollDuration - time.Since(start) if remaining > 0 { @@ -596,7 +615,7 @@ func (a *AptosTxm) confirmLoop() { } } -func (a *AptosTxm) checkUnconfirmed() { +func (a *AptosTxm) checkUnconfirmed(ctx context.Context) { client, err := a.getClient() if err != nil { a.baseLogger.Errorw("Unable to check unconfirmed: failed to get client", "error", err) @@ -604,6 +623,7 @@ func (a *AptosTxm) checkUnconfirmed() { } allUnconfirmedTxs := a.accountStore.GetAllUnconfirmed() + totalPending := 0 for accountAddress, unconfirmedTxs := range allUnconfirmedTxs { txStore := a.accountStore.GetTxStore(accountAddress) @@ -623,6 +643,7 @@ func (a *AptosTxm) checkUnconfirmed() { if ok { if userTx.Success { ctxLogger.Infow("confirmed tx: successful", "hash", hash, "chainTx", chainTx, "chainTx.Type", chainTx.Type) + a.metrics.IncrementSuccessTxs(ctx) // Calculate and store the transaction fee gasUsed := userTx.GasUsed @@ -634,11 +655,13 @@ func (a *AptosTxm) checkUnconfirmed() { } } else { ctxLogger.Infow("confirmed tx: unsuccessful", "hash", hash, "chainTx", chainTx, "chainTx.Type", chainTx.Type) + a.metrics.IncrementRevertTxs(ctx) + a.metrics.IncrementErrorTxs(ctx) if userTx.VmStatus == "Out of gas" { // https://github.com/aptos-labs/aptos-core/blob/77ff4bf413f54c41206bd5573e1891fa3a0dccf6/api/types/src/convert.rs#L1062 // Example transaction: https://api.testnet.aptoslabs.com/v1/transactions/by_hash/0x7a106db811c8d5dfd71ac98f374ca36e4f630ce5412b99c8f0e871e7feda37ea a.incrementTransactionAttempt(unconfirmedTx.Tx) - if !a.maybeRetry(unconfirmedTx, RetryReasonOutOfGas) { + if !a.maybeRetry(ctx, unconfirmedTx, RetryReasonOutOfGas) { a.updateTransactionStatus(unconfirmedTx.Tx, commontypes.Failed) } continue @@ -652,8 +675,10 @@ func (a *AptosTxm) checkUnconfirmed() { } a.updateTransactionStatus(unconfirmedTx.Tx, commontypes.Finalized) + a.metrics.IncrementFinalizedTxs(ctx) } else { ctxLogger.Debugw("tx is still unconfirmed", "hash", hash, "chainTx", chainTx) + totalPending++ // Check using the ledger timestamp whether the transaction has expired. ledgerTimestampSecs, err := a.getLedgerTimestampSecs(client) if err != nil { @@ -672,16 +697,20 @@ func (a *AptosTxm) checkUnconfirmed() { if err != nil { ctxLogger.Errorw("couldn't confirm expired tx", "error", err) a.updateTransactionStatus(unconfirmedTx.Tx, commontypes.Failed) + a.metrics.IncrementErrorTxs(ctx) continue } + a.metrics.IncrementDropTxs(ctx) + a.metrics.IncrementErrorTxs(ctx) a.incrementTransactionAttempt(unconfirmedTx.Tx) - if !a.maybeRetry(unconfirmedTx, RetryReasonExpired) { + if !a.maybeRetry(ctx, unconfirmedTx, RetryReasonExpired) { a.updateTransactionStatus(unconfirmedTx.Tx, commontypes.Failed) } } } } + a.metrics.SetPendingTxs(ctx, totalPending) } type RetryReason int @@ -702,7 +731,7 @@ func (r RetryReason) String() string { } } -func (a *AptosTxm) maybeRetry(unconfirmedTx *UnconfirmedTx, retryReason RetryReason) bool { +func (a *AptosTxm) maybeRetry(ctx context.Context, unconfirmedTx *UnconfirmedTx, retryReason RetryReason) bool { ctxLogger := GetContexedTxLogger(a.baseLogger, unconfirmedTx.Tx.ID, unconfirmedTx.Tx.Metadata) currentAttempt := a.getTransactionAttempt(unconfirmedTx.Tx) if currentAttempt >= a.config.MaxTxRetryAttempts { @@ -713,6 +742,7 @@ func (a *AptosTxm) maybeRetry(unconfirmedTx *UnconfirmedTx, retryReason RetryRea ctxLogger.Debugw("retrying tx", "attempt", currentAttempt, "hash", unconfirmedTx.Hash, "retryReason", retryReason) select { case a.broadcastChan <- unconfirmedTx.Tx.ID: + a.metrics.IncrementRetryTxs(ctx) default: ctxLogger.Errorw("failed to enqueue tx for rebroadcast", "attempt", currentAttempt, "hash", unconfirmedTx.Hash, "retryReason", retryReason) } diff --git a/relayer/txm/txm_error_test.go b/relayer/txm/txm_error_test.go index 04ade5c00..a2496aea7 100644 --- a/relayer/txm/txm_error_test.go +++ b/relayer/txm/txm_error_test.go @@ -74,7 +74,7 @@ func runErrorsTest(t *testing.T, logger logger.Logger, config Config, rpcURL str return rlClient, nil } - txm, err := New(logger, keystore, config, getClient) + txm, err := New(logger, keystore, config, getClient, chainInfo.ChainID) require.NoError(t, err) err = txm.Start(context.Background()) require.NoError(t, err) diff --git a/relayer/txm/txm_local_test.go b/relayer/txm/txm_local_test.go index 55b021b3b..f3cfd16c3 100644 --- a/relayer/txm/txm_local_test.go +++ b/relayer/txm/txm_local_test.go @@ -75,7 +75,7 @@ func runTxmTest(t *testing.T, logger logger.Logger, config Config, rpcURL string return rlClient, nil } - txm, err := New(logger, keystore, config, getClient) + txm, err := New(logger, keystore, config, getClient, chainInfo.ChainID) require.NoError(t, err) err = txm.Start(context.Background()) require.NoError(t, err) diff --git a/relayer/txm/txm_multisig_deploy_ccip_local_test.go b/relayer/txm/txm_multisig_deploy_ccip_local_test.go index 4b38d9bed..416264513 100644 --- a/relayer/txm/txm_multisig_deploy_ccip_local_test.go +++ b/relayer/txm/txm_multisig_deploy_ccip_local_test.go @@ -268,7 +268,7 @@ func runDeployMCMSAndCCIPInChunks(t *testing.T, logger logger.Logger, rpcURL str deployChainIdBig = new(big.Int).SetUint64(uint64(chainId)) config := DefaultConfigSet - txm, err := New(logger, keystore, config, getClient) + txm, err := New(logger, keystore, config, getClient, chainInfo.ChainID) require.NoError(t, err) err = txm.Start(context.Background()) require.NoError(t, err) diff --git a/relayer/txm/txm_multisig_local_test.go b/relayer/txm/txm_multisig_local_test.go index d6be1e332..659c15639 100644 --- a/relayer/txm/txm_multisig_local_test.go +++ b/relayer/txm/txm_multisig_local_test.go @@ -156,7 +156,7 @@ func runMultisigTest(t *testing.T, logger logger.Logger, rpcURL string, keystore chainIdBig = new(big.Int).SetUint64(uint64(chainId)) config := DefaultConfigSet - txm, err := New(logger, keystore, config, getClient) + txm, err := New(logger, keystore, config, getClient, chainInfo.ChainID) require.NoError(t, err) err = txm.Start(context.Background()) require.NoError(t, err)