diff --git a/README.md b/README.md index 77b2980..4937f1c 100644 --- a/README.md +++ b/README.md @@ -57,6 +57,93 @@ A distributed system for processing Ethereum execution layer data with support f └─────────────────────────────────────────┘ ``` +## Embedded Mode (Library Usage) + +The execution-processor can be embedded as a library within an execution client, providing direct data access without JSON-RPC overhead. + +### Implementing DataSource + +```go +import ( + "context" + "math/big" + + "github.com/ethereum/go-ethereum/core/types" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" +) + +type MyDataSource struct { + client *MyExecutionClient +} + +func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) { + num := ds.client.CurrentBlock() + return &num, nil +} + +func (ds *MyDataSource) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) { + return ds.client.GetBlock(number), nil +} + +func (ds *MyDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]*types.Receipt, error) { + return ds.client.GetBlockReceipts(number), nil +} + +func (ds *MyDataSource) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) { + return ds.client.GetReceipt(hash), nil +} + +func (ds *MyDataSource) DebugTraceTransaction( + ctx context.Context, + hash string, + blockNumber *big.Int, + opts execution.TraceOptions, +) (*execution.TraceTransaction, error) { + return ds.client.TraceTransaction(hash, opts), nil +} + +func (ds *MyDataSource) ChainID() int32 { + return ds.client.ChainID() +} + +func (ds *MyDataSource) ClientType() string { + return "my-client/1.0.0" +} + +func (ds *MyDataSource) IsSynced() bool { + return ds.client.IsSynced() +} +``` + +### Creating an Embedded Pool + +```go +import ( + "github.com/ethpandaops/execution-processor/pkg/ethereum" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" +) + +// Create embedded node with your data source +dataSource := &MyDataSource{client: myClient} +node := execution.NewEmbeddedNode(log, "embedded", dataSource) + +// Create pool with the embedded node +pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil) +pool.Start(ctx) + +// Mark ready when your client is synced and ready to serve data +node.MarkReady(ctx) +``` + +### Embedded vs RPC Mode + +| Aspect | RPC Mode | Embedded Mode | +|--------|----------|---------------| +| Data Access | JSON-RPC over HTTP | Direct function calls | +| Readiness | Auto-detected via RPC health checks | Host calls MarkReady() | +| Performance | Network + serialization overhead | Zero serialization overhead | +| Use Case | External execution clients | Library integration | + ## Manual Block Queue API The execution processor provides an HTTP API for manually queuing blocks for reprocessing. This is useful for fixing data issues or reprocessing specific blocks. diff --git a/pkg/config/config.go b/pkg/config/config.go new file mode 100644 index 0000000..f20d0a7 --- /dev/null +++ b/pkg/config/config.go @@ -0,0 +1,84 @@ +// Package config provides configuration types for execution-processor. +// This package is designed to be imported without pulling in go-ethereum dependencies, +// making it suitable for embedded mode integrations. +package config + +import ( + "fmt" + "time" + + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" + "github.com/ethpandaops/execution-processor/pkg/processor" + "github.com/ethpandaops/execution-processor/pkg/redis" + "github.com/ethpandaops/execution-processor/pkg/state" +) + +// EthereumConfig is the ethereum network configuration. +// This is a copy of ethereum.Config to avoid importing pkg/ethereum +// which would pull in go-ethereum dependencies. +type EthereumConfig struct { + // Execution configuration + Execution []*execution.Config `yaml:"execution"` + // Override network name for custom networks (bypasses networkMap) + OverrideNetworkName *string `yaml:"overrideNetworkName"` +} + +// Validate validates the ethereum configuration. +func (c *EthereumConfig) Validate() error { + for i, exec := range c.Execution { + if err := exec.Validate(); err != nil { + return fmt.Errorf("invalid execution configuration at index %d: %w", i, err) + } + } + + return nil +} + +// Config is the main configuration for execution-processor. +type Config struct { + // MetricsAddr is the address to listen on for metrics. + MetricsAddr string `yaml:"metricsAddr" default:":9090"` + // HealthCheckAddr is the address to listen on for healthcheck. + HealthCheckAddr *string `yaml:"healthCheckAddr"` + // PProfAddr is the address to listen on for pprof. + PProfAddr *string `yaml:"pprofAddr"` + // APIAddr is the address to listen on for the API server. + APIAddr *string `yaml:"apiAddr"` + // LoggingLevel is the logging level to use. + LoggingLevel string `yaml:"logging" default:"info"` + // Ethereum is the ethereum network configuration. + Ethereum EthereumConfig `yaml:"ethereum"` + // Redis is the redis configuration. + Redis *redis.Config `yaml:"redis"` + // StateManager is the state manager configuration. + StateManager state.Config `yaml:"stateManager"` + // Processors is the processor configuration. + Processors processor.Config `yaml:"processors"` + // ShutdownTimeout is the timeout for shutting down the server. + ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s"` +} + +// Validate validates the configuration. +func (c *Config) Validate() error { + if c.Redis == nil { + return fmt.Errorf("redis configuration is required") + } + + if err := c.Redis.Validate(); err != nil { + return fmt.Errorf("invalid redis configuration: %w", err) + } + + if err := c.Ethereum.Validate(); err != nil { + return fmt.Errorf("invalid ethereum configuration: %w", err) + } + + if err := c.StateManager.Validate(); err != nil { + return fmt.Errorf("invalid state manager configuration: %w", err) + } + + if err := c.Processors.Validate(); err != nil { + return fmt.Errorf("invalid processor configuration: %w", err) + } + + return nil +} diff --git a/pkg/ethereum/execution/block.go b/pkg/ethereum/execution/block.go new file mode 100644 index 0000000..e4e839d --- /dev/null +++ b/pkg/ethereum/execution/block.go @@ -0,0 +1,136 @@ +package execution + +import "math/big" + +// Hash represents a 32-byte hash. +type Hash [32]byte + +// Hex returns the hex string representation of the hash. +func (h Hash) Hex() string { + return "0x" + encodeHex(h[:]) +} + +// String returns the hex string representation of the hash. +func (h Hash) String() string { + return h.Hex() +} + +// Address represents a 20-byte Ethereum address. +type Address [20]byte + +// Hex returns the hex string representation of the address with checksum. +func (a Address) Hex() string { + return "0x" + encodeHex(a[:]) +} + +// String returns the hex string representation of the address. +func (a Address) String() string { + return a.Hex() +} + +// encodeHex encodes bytes as hex string without 0x prefix. +func encodeHex(b []byte) string { + const hexChars = "0123456789abcdef" + + result := make([]byte, len(b)*2) + + for i, v := range b { + result[i*2] = hexChars[v>>4] + result[i*2+1] = hexChars[v&0x0f] + } + + return string(result) +} + +// Transaction type constants matching go-ethereum values. +const ( + LegacyTxType = 0 + AccessListTxType = 1 + DynamicFeeTxType = 2 + BlobTxType = 3 +) + +// Block interface defines methods for accessing block data. +// Implementations are provided by data sources (RPC, embedded clients). +type Block interface { + // Number returns the block number. + Number() *big.Int + + // Hash returns the block hash. + Hash() Hash + + // ParentHash returns the parent block hash. + ParentHash() Hash + + // BaseFee returns the base fee per gas (EIP-1559), or nil for pre-London blocks. + BaseFee() *big.Int + + // Transactions returns all transactions in the block. + Transactions() []Transaction +} + +// Transaction interface defines methods for accessing transaction data. +// The From() method returns the sender address, computed by the data source +// using its own crypto implementation (avoiding go-ethereum crypto imports). +type Transaction interface { + // Hash returns the transaction hash. + Hash() Hash + + // Type returns the transaction type (0=legacy, 1=access list, 2=dynamic fee, 3=blob). + Type() uint8 + + // To returns the recipient address, or nil for contract creation. + To() *Address + + // From returns the sender address. + // This is computed by the data source using types.Sender() or equivalent. + From() Address + + // Nonce returns the sender account nonce. + Nonce() uint64 + + // Gas returns the gas limit. + Gas() uint64 + + // GasPrice returns the gas price (for legacy transactions). + GasPrice() *big.Int + + // GasTipCap returns the max priority fee per gas (EIP-1559). + GasTipCap() *big.Int + + // GasFeeCap returns the max fee per gas (EIP-1559). + GasFeeCap() *big.Int + + // Value returns the value transferred in wei. + Value() *big.Int + + // Data returns the input data (calldata). + Data() []byte + + // Size returns the encoded transaction size in bytes. + Size() uint64 + + // ChainId returns the chain ID, or nil for legacy transactions. + ChainId() *big.Int + + // BlobGas returns the blob gas used (for blob transactions). + BlobGas() uint64 + + // BlobGasFeeCap returns the max blob fee per gas (for blob transactions). + BlobGasFeeCap() *big.Int + + // BlobHashes returns the versioned hashes (for blob transactions). + BlobHashes() []Hash +} + +// Receipt interface defines methods for accessing transaction receipt data. +type Receipt interface { + // Status returns the transaction status (1=success, 0=failure). + Status() uint64 + + // TxHash returns the transaction hash. + TxHash() Hash + + // GasUsed returns the gas used by the transaction. + GasUsed() uint64 +} diff --git a/pkg/ethereum/execution/embedded_node.go b/pkg/ethereum/execution/embedded_node.go new file mode 100644 index 0000000..c1f0031 --- /dev/null +++ b/pkg/ethereum/execution/embedded_node.go @@ -0,0 +1,200 @@ +package execution + +import ( + "context" + "math/big" + "sync" + + "github.com/sirupsen/logrus" +) + +// DataSource is the interface host applications implement to provide +// execution data directly without JSON-RPC. This enables embedding +// execution-processor as a library within an execution client. +// +// All methods must be safe for concurrent calls from multiple goroutines. +// Context cancellation should be respected for all I/O operations. +// +// The interface uses abstract types (Block, Transaction, Receipt) instead of +// go-ethereum types to avoid CGO dependencies. Host applications should +// implement these interfaces with their own types. +// +// Example implementation: +// +// type MyDataSource struct { +// client *MyExecutionClient +// } +// +// func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) { +// num := ds.client.CurrentBlock() +// return &num, nil +// } +type DataSource interface { + // BlockNumber returns the current block number. + BlockNumber(ctx context.Context) (*uint64, error) + + // BlockByNumber returns the block at the given number. + BlockByNumber(ctx context.Context, number *big.Int) (Block, error) + + // BlockReceipts returns all receipts for the block at the given number. + BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error) + + // TransactionReceipt returns the receipt for the transaction with the given hash. + TransactionReceipt(ctx context.Context, hash string) (Receipt, error) + + // DebugTraceTransaction returns the execution trace for the transaction. + DebugTraceTransaction(ctx context.Context, hash string, blockNumber *big.Int, opts TraceOptions) (*TraceTransaction, error) + + // ChainID returns the chain ID. + ChainID() int32 + + // ClientType returns the client type/version string. + ClientType() string + + // IsSynced returns true if the data source is fully synced. + IsSynced() bool +} + +// Compile-time check that EmbeddedNode implements Node interface. +var _ Node = (*EmbeddedNode)(nil) + +// EmbeddedNode implements Node by delegating to a DataSource. +// This allows host applications to provide execution data directly +// without going through JSON-RPC, eliminating serialization overhead. +// +// Lifecycle: +// 1. Create with NewEmbeddedNode(log, name, dataSource) +// 2. Register OnReady callbacks (optional) +// 3. Pool calls Start() (no-op for embedded) +// 4. Host calls MarkReady() when DataSource is ready to serve data +// 5. Callbacks execute in registration order, node becomes healthy in pool +// 6. Pool calls Stop() on shutdown (no-op for embedded) +// +// Thread-safety: All methods are safe for concurrent use. +type EmbeddedNode struct { + log logrus.FieldLogger + name string + source DataSource + ready bool + onReadyCallbacks []func(ctx context.Context) error + mu sync.RWMutex +} + +// NewEmbeddedNode creates a new EmbeddedNode with the given DataSource. +// +// Parameters: +// - log: Logger for node operations +// - name: Human-readable name for this node (used in logs and metrics) +// - source: DataSource implementation providing execution data +// +// The returned node is not yet ready. Call MarkReady() when the DataSource +// is ready to serve data. +func NewEmbeddedNode(log logrus.FieldLogger, name string, source DataSource) *EmbeddedNode { + return &EmbeddedNode{ + log: log.WithFields(logrus.Fields{"type": "execution", "source": name, "mode": "embedded"}), + name: name, + source: source, + onReadyCallbacks: make([]func(ctx context.Context) error, 0), + } +} + +// Start is a no-op for EmbeddedNode. The host controls readiness via MarkReady(). +func (n *EmbeddedNode) Start(_ context.Context) error { + n.log.Info("EmbeddedNode started - waiting for host to call MarkReady()") + + return nil +} + +// Stop is a no-op for EmbeddedNode. The host manages the DataSource lifecycle. +func (n *EmbeddedNode) Stop(_ context.Context) error { + n.log.Info("EmbeddedNode stopped") + + return nil +} + +// MarkReady is called by the host application when the DataSource is ready. +// This triggers all registered OnReady callbacks. +func (n *EmbeddedNode) MarkReady(ctx context.Context) error { + n.mu.Lock() + n.ready = true + callbacks := n.onReadyCallbacks + n.mu.Unlock() + + n.log.Info("EmbeddedNode marked as ready, executing callbacks") + + for _, cb := range callbacks { + if err := cb(ctx); err != nil { + n.log.WithError(err).Error("Failed to execute OnReady callback") + + return err + } + } + + return nil +} + +// OnReady registers a callback to be called when the node becomes ready. +func (n *EmbeddedNode) OnReady(_ context.Context, callback func(ctx context.Context) error) { + n.mu.Lock() + defer n.mu.Unlock() + + n.onReadyCallbacks = append(n.onReadyCallbacks, callback) +} + +// IsReady returns true if the node has been marked as ready. +func (n *EmbeddedNode) IsReady() bool { + n.mu.RLock() + defer n.mu.RUnlock() + + return n.ready +} + +// BlockNumber delegates to the DataSource. +func (n *EmbeddedNode) BlockNumber(ctx context.Context) (*uint64, error) { + return n.source.BlockNumber(ctx) +} + +// BlockByNumber delegates to the DataSource. +func (n *EmbeddedNode) BlockByNumber(ctx context.Context, number *big.Int) (Block, error) { + return n.source.BlockByNumber(ctx, number) +} + +// BlockReceipts delegates to the DataSource. +func (n *EmbeddedNode) BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error) { + return n.source.BlockReceipts(ctx, number) +} + +// TransactionReceipt delegates to the DataSource. +func (n *EmbeddedNode) TransactionReceipt(ctx context.Context, hash string) (Receipt, error) { + return n.source.TransactionReceipt(ctx, hash) +} + +// DebugTraceTransaction delegates to the DataSource. +func (n *EmbeddedNode) DebugTraceTransaction( + ctx context.Context, + hash string, + blockNumber *big.Int, + opts TraceOptions, +) (*TraceTransaction, error) { + return n.source.DebugTraceTransaction(ctx, hash, blockNumber, opts) +} + +// ChainID delegates to the DataSource. +func (n *EmbeddedNode) ChainID() int32 { + return n.source.ChainID() +} + +// ClientType delegates to the DataSource. +func (n *EmbeddedNode) ClientType() string { + return n.source.ClientType() +} + +// IsSynced delegates to the DataSource. +func (n *EmbeddedNode) IsSynced() bool { + return n.source.IsSynced() +} + +// Name returns the configured name for this node. +func (n *EmbeddedNode) Name() string { + return n.name +} diff --git a/pkg/ethereum/execution/embedded_node_test.go b/pkg/ethereum/execution/embedded_node_test.go new file mode 100644 index 0000000..6271745 --- /dev/null +++ b/pkg/ethereum/execution/embedded_node_test.go @@ -0,0 +1,728 @@ +package execution_test + +import ( + "context" + "errors" + "math/big" + "sync" + "sync/atomic" + "testing" + "time" + + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" + "github.com/sirupsen/logrus" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/mock" + "github.com/stretchr/testify/require" +) + +// MockBlock implements execution.Block for testing. +type MockBlock struct { + number *big.Int + hash execution.Hash + parentHash execution.Hash + baseFee *big.Int + txs []execution.Transaction +} + +func (b *MockBlock) Number() *big.Int { return b.number } +func (b *MockBlock) Hash() execution.Hash { return b.hash } +func (b *MockBlock) ParentHash() execution.Hash { return b.parentHash } +func (b *MockBlock) BaseFee() *big.Int { return b.baseFee } +func (b *MockBlock) Transactions() []execution.Transaction { return b.txs } + +// NewMockBlock creates a mock block with the given number. +func NewMockBlock(number *big.Int) *MockBlock { + return &MockBlock{ + number: number, + hash: execution.Hash{}, + parentHash: execution.Hash{}, + baseFee: big.NewInt(1000000000), + txs: []execution.Transaction{}, + } +} + +// MockReceipt implements execution.Receipt for testing. +type MockReceipt struct { + status uint64 + txHash execution.Hash + gasUsed uint64 +} + +func (r *MockReceipt) Status() uint64 { return r.status } +func (r *MockReceipt) TxHash() execution.Hash { return r.txHash } +func (r *MockReceipt) GasUsed() uint64 { return r.gasUsed } + +// NewMockReceipt creates a mock receipt with the given status. +func NewMockReceipt(status uint64, gasUsed uint64) *MockReceipt { + return &MockReceipt{ + status: status, + txHash: execution.Hash{}, + gasUsed: gasUsed, + } +} + +// MockDataSource implements execution.DataSource for testing. +type MockDataSource struct { + mock.Mock +} + +func (m *MockDataSource) BlockNumber(ctx context.Context) (*uint64, error) { + args := m.Called(ctx) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + val, ok := args.Get(0).(*uint64) + if !ok { + return nil, args.Error(1) + } + + return val, args.Error(1) +} + +func (m *MockDataSource) BlockByNumber(ctx context.Context, number *big.Int) (execution.Block, error) { + args := m.Called(ctx, number) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + val, ok := args.Get(0).(execution.Block) + if !ok { + return nil, args.Error(1) + } + + return val, args.Error(1) +} + +func (m *MockDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]execution.Receipt, error) { + args := m.Called(ctx, number) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + val, ok := args.Get(0).([]execution.Receipt) + if !ok { + return nil, args.Error(1) + } + + return val, args.Error(1) +} + +func (m *MockDataSource) TransactionReceipt(ctx context.Context, hash string) (execution.Receipt, error) { + args := m.Called(ctx, hash) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + val, ok := args.Get(0).(execution.Receipt) + if !ok { + return nil, args.Error(1) + } + + return val, args.Error(1) +} + +func (m *MockDataSource) DebugTraceTransaction( + ctx context.Context, + hash string, + blockNumber *big.Int, + opts execution.TraceOptions, +) (*execution.TraceTransaction, error) { + args := m.Called(ctx, hash, blockNumber, opts) + + if args.Get(0) == nil { + return nil, args.Error(1) + } + + val, ok := args.Get(0).(*execution.TraceTransaction) + if !ok { + return nil, args.Error(1) + } + + return val, args.Error(1) +} + +func (m *MockDataSource) ChainID() int32 { + args := m.Called() + + val, ok := args.Get(0).(int32) + if !ok { + return 0 + } + + return val +} + +func (m *MockDataSource) ClientType() string { + args := m.Called() + + return args.String(0) +} + +func (m *MockDataSource) IsSynced() bool { + args := m.Called() + + return args.Bool(0) +} + +func TestEmbeddedNode_Creation(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + require.NotNil(t, node) + assert.Equal(t, "test-node", node.Name()) + assert.False(t, node.IsReady()) +} + +func TestEmbeddedNode_Start_NoOp(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + err := node.Start(ctx) + + assert.NoError(t, err) + // Start should not mark the node as ready + assert.False(t, node.IsReady()) +} + +func TestEmbeddedNode_Stop_NoOp(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + // Start and mark ready first + err := node.Start(ctx) + require.NoError(t, err) + + err = node.MarkReady(ctx) + require.NoError(t, err) + + // Stop should complete without error + err = node.Stop(ctx) + assert.NoError(t, err) +} + +func TestEmbeddedNode_MarkReady_ExecutesCallbacks(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + // Track callback execution order + var order []int + + var mu sync.Mutex + + node.OnReady(ctx, func(_ context.Context) error { + mu.Lock() + defer mu.Unlock() + + order = append(order, 1) + + return nil + }) + + node.OnReady(ctx, func(_ context.Context) error { + mu.Lock() + defer mu.Unlock() + + order = append(order, 2) + + return nil + }) + + node.OnReady(ctx, func(_ context.Context) error { + mu.Lock() + defer mu.Unlock() + + order = append(order, 3) + + return nil + }) + + assert.False(t, node.IsReady()) + + err := node.MarkReady(ctx) + require.NoError(t, err) + + assert.True(t, node.IsReady()) + assert.Equal(t, []int{1, 2, 3}, order) +} + +func TestEmbeddedNode_MarkReady_CallbackError(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + expectedErr := errors.New("callback failed") + + var callbacksCalled int + + node.OnReady(ctx, func(_ context.Context) error { + callbacksCalled++ + + return nil + }) + + node.OnReady(ctx, func(_ context.Context) error { + callbacksCalled++ + + return expectedErr + }) + + node.OnReady(ctx, func(_ context.Context) error { + callbacksCalled++ + + return nil + }) + + err := node.MarkReady(ctx) + assert.ErrorIs(t, err, expectedErr) + // Only first two callbacks should have been called (second one failed) + assert.Equal(t, 2, callbacksCalled) +} + +func TestEmbeddedNode_OnReady_MultipleCallbacks(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + var count atomic.Int32 + + const numCallbacks = 10 + for i := 0; i < numCallbacks; i++ { + node.OnReady(ctx, func(_ context.Context) error { + count.Add(1) + + return nil + }) + } + + err := node.MarkReady(ctx) + require.NoError(t, err) + + assert.Equal(t, int32(numCallbacks), count.Load()) +} + +func TestEmbeddedNode_IsReady_States(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + // Initially not ready + assert.False(t, node.IsReady()) + + // Still not ready after Start + err := node.Start(ctx) + require.NoError(t, err) + + assert.False(t, node.IsReady()) + + // Ready after MarkReady + err = node.MarkReady(ctx) + require.NoError(t, err) + + assert.True(t, node.IsReady()) + + // Still ready after Stop + err = node.Stop(ctx) + require.NoError(t, err) + + assert.True(t, node.IsReady()) +} + +func TestEmbeddedNode_DelegatesToDataSource_BlockNumber(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + expectedBlock := uint64(12345) + + ds.On("BlockNumber", ctx).Return(&expectedBlock, nil) + + result, err := node.BlockNumber(ctx) + require.NoError(t, err) + require.NotNil(t, result) + assert.Equal(t, expectedBlock, *result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_BlockByNumber(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + blockNum := big.NewInt(12345) + expectedBlock := NewMockBlock(blockNum) + + ds.On("BlockByNumber", ctx, blockNum).Return(expectedBlock, nil) + + result, err := node.BlockByNumber(ctx, blockNum) + require.NoError(t, err) + assert.Equal(t, expectedBlock.Number(), result.Number()) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_BlockReceipts(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + blockNum := big.NewInt(12345) + expectedReceipts := []execution.Receipt{ + NewMockReceipt(1, 21000), + NewMockReceipt(0, 50000), + } + + ds.On("BlockReceipts", ctx, blockNum).Return(expectedReceipts, nil) + + result, err := node.BlockReceipts(ctx, blockNum) + require.NoError(t, err) + assert.Len(t, result, 2) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_TransactionReceipt(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + txHash := "0x1234567890abcdef" + expectedReceipt := NewMockReceipt(1, 21000) + + ds.On("TransactionReceipt", ctx, txHash).Return(expectedReceipt, nil) + + result, err := node.TransactionReceipt(ctx, txHash) + require.NoError(t, err) + assert.Equal(t, uint64(1), result.Status()) + assert.Equal(t, uint64(21000), result.GasUsed()) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_DebugTraceTransaction(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + txHash := "0x1234567890abcdef" + blockNum := big.NewInt(12345) + opts := execution.DefaultTraceOptions() + expectedTrace := &execution.TraceTransaction{ + Gas: 21000, + Failed: false, + } + + ds.On("DebugTraceTransaction", ctx, txHash, blockNum, opts).Return(expectedTrace, nil) + + result, err := node.DebugTraceTransaction(ctx, txHash, blockNum, opts) + require.NoError(t, err) + assert.Equal(t, expectedTrace, result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_ChainID(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ds.On("ChainID").Return(int32(1)) + + result := node.ChainID() + assert.Equal(t, int32(1), result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_ClientType(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ds.On("ClientType").Return("geth/1.10.0") + + result := node.ClientType() + assert.Equal(t, "geth/1.10.0", result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_DelegatesToDataSource_IsSynced(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ds.On("IsSynced").Return(true) + + result := node.IsSynced() + assert.True(t, result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_Name(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + testCases := []struct { + name string + expectedName string + }{ + {name: "simple-name", expectedName: "simple-name"}, + {name: "with-numbers-123", expectedName: "with-numbers-123"}, + {name: "embedded-erigon", expectedName: "embedded-erigon"}, + {name: "", expectedName: ""}, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, tc.name, ds) + + assert.Equal(t, tc.expectedName, node.Name()) + }) + } +} + +func TestEmbeddedNode_ConcurrentMarkReady(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + var callbackCount atomic.Int32 + + node.OnReady(ctx, func(_ context.Context) error { + callbackCount.Add(1) + + return nil + }) + + // Start multiple goroutines calling MarkReady concurrently + const numGoroutines = 10 + + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + // Ignore errors - only the first MarkReady should execute callbacks + _ = node.MarkReady(ctx) + }() + } + + wg.Wait() + + // Node should be ready + assert.True(t, node.IsReady()) + + // Callback should have been called at least once + // (implementation may allow multiple calls, but at least one should succeed) + assert.GreaterOrEqual(t, callbackCount.Load(), int32(1)) +} + +func TestEmbeddedNode_ConcurrentOnReady(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + var callbackCount atomic.Int32 + + // Register callbacks concurrently + const numGoroutines = 10 + + var wg sync.WaitGroup + + for i := 0; i < numGoroutines; i++ { + wg.Add(1) + + go func() { + defer wg.Done() + + node.OnReady(ctx, func(_ context.Context) error { + callbackCount.Add(1) + + return nil + }) + }() + } + + wg.Wait() + + // Now mark ready and verify all callbacks execute + err := node.MarkReady(ctx) + require.NoError(t, err) + + assert.Equal(t, int32(numGoroutines), callbackCount.Load()) +} + +func TestEmbeddedNode_InterfaceCompliance(t *testing.T) { + // Compile-time check that EmbeddedNode implements Node interface + var _ execution.Node = (*execution.EmbeddedNode)(nil) + + // Create an actual instance and verify it can be used as Node + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + + var node execution.Node = execution.NewEmbeddedNode(log, "test-node", ds) + + require.NotNil(t, node) + assert.Equal(t, "test-node", node.Name()) +} + +func TestEmbeddedNode_DataSourceError(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + expectedErr := errors.New("data source error") + + ds.On("BlockNumber", ctx).Return(nil, expectedErr) + + result, err := node.BlockNumber(ctx) + assert.ErrorIs(t, err, expectedErr) + assert.Nil(t, result) + + ds.AssertExpectations(t) +} + +func TestEmbeddedNode_ContextCancellation(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx, cancel := context.WithCancel(context.Background()) + cancel() // Cancel immediately + + // DataSource should receive cancelled context + ds.On("BlockNumber", ctx).Return(nil, ctx.Err()) + + result, err := node.BlockNumber(ctx) + assert.Error(t, err) + assert.Nil(t, result) +} + +func TestEmbeddedNode_CallbackWithContext(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx := context.Background() + + var receivedCtx context.Context + + node.OnReady(ctx, func(cbCtx context.Context) error { + receivedCtx = cbCtx + + return nil + }) + + err := node.MarkReady(ctx) + require.NoError(t, err) + + // The callback should receive the context passed to MarkReady + assert.Equal(t, ctx, receivedCtx) +} + +func TestEmbeddedNode_CallbackWithTimeout(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + ds := new(MockDataSource) + node := execution.NewEmbeddedNode(log, "test-node", ds) + + ctx, cancel := context.WithTimeout(context.Background(), 100*time.Millisecond) + defer cancel() + + callbackExecuted := make(chan struct{}) + + node.OnReady(ctx, func(_ context.Context) error { + close(callbackExecuted) + + return nil + }) + + err := node.MarkReady(ctx) + require.NoError(t, err) + + select { + case <-callbackExecuted: + // Success + case <-time.After(1 * time.Second): + t.Fatal("callback did not execute") + } +} diff --git a/pkg/ethereum/execution/geth/adapter.go b/pkg/ethereum/execution/geth/adapter.go new file mode 100644 index 0000000..447ad76 --- /dev/null +++ b/pkg/ethereum/execution/geth/adapter.go @@ -0,0 +1,227 @@ +//go:build !embedded + +// Package geth provides go-ethereum adapters for the execution interfaces. +// This package contains all go-ethereum dependencies, allowing the core +// execution package to remain free of CGO-dependent imports. +package geth + +import ( + "math/big" + + "github.com/ethereum/go-ethereum/common" + "github.com/ethereum/go-ethereum/core/types" + + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" +) + +// Compile-time interface checks. +var ( + _ execution.Block = (*BlockAdapter)(nil) + _ execution.Transaction = (*TransactionAdapter)(nil) + _ execution.Receipt = (*ReceiptAdapter)(nil) +) + +// BlockAdapter wraps a go-ethereum Block to implement execution.Block. +type BlockAdapter struct { + block *types.Block + txs []execution.Transaction +} + +// NewBlockAdapter creates a new BlockAdapter from a go-ethereum Block. +// It extracts sender addresses for all transactions using the appropriate signer. +func NewBlockAdapter(block *types.Block) *BlockAdapter { + gethTxs := block.Transactions() + txs := make([]execution.Transaction, len(gethTxs)) + + for i, tx := range gethTxs { + txs[i] = NewTransactionAdapter(tx) + } + + return &BlockAdapter{ + block: block, + txs: txs, + } +} + +// Number returns the block number. +func (b *BlockAdapter) Number() *big.Int { + return b.block.Number() +} + +// Hash returns the block hash. +func (b *BlockAdapter) Hash() execution.Hash { + return execution.Hash(b.block.Hash()) +} + +// ParentHash returns the parent block hash. +func (b *BlockAdapter) ParentHash() execution.Hash { + return execution.Hash(b.block.ParentHash()) +} + +// BaseFee returns the base fee per gas (EIP-1559), or nil for pre-London blocks. +func (b *BlockAdapter) BaseFee() *big.Int { + return b.block.BaseFee() +} + +// Transactions returns all transactions in the block. +func (b *BlockAdapter) Transactions() []execution.Transaction { + return b.txs +} + +// TransactionAdapter wraps a go-ethereum Transaction to implement execution.Transaction. +type TransactionAdapter struct { + tx *types.Transaction + from common.Address +} + +// NewTransactionAdapter creates a new TransactionAdapter from a go-ethereum Transaction. +// It computes the sender address using the appropriate signer. +func NewTransactionAdapter(tx *types.Transaction) *TransactionAdapter { + // Determine the appropriate signer for extracting the sender + var signer types.Signer + + chainID := tx.ChainId() + if chainID == nil || chainID.Sign() == 0 { + // Legacy transaction without EIP-155 replay protection + signer = types.HomesteadSigner{} + } else { + signer = types.LatestSignerForChainID(chainID) + } + + // Extract sender - this uses go-ethereum's crypto package internally + from, _ := types.Sender(signer, tx) + + return &TransactionAdapter{ + tx: tx, + from: from, + } +} + +// Hash returns the transaction hash. +func (t *TransactionAdapter) Hash() execution.Hash { + return execution.Hash(t.tx.Hash()) +} + +// Type returns the transaction type. +func (t *TransactionAdapter) Type() uint8 { + return t.tx.Type() +} + +// To returns the recipient address, or nil for contract creation. +func (t *TransactionAdapter) To() *execution.Address { + if t.tx.To() == nil { + return nil + } + + addr := execution.Address(*t.tx.To()) + + return &addr +} + +// From returns the sender address. +func (t *TransactionAdapter) From() execution.Address { + return execution.Address(t.from) +} + +// Nonce returns the sender account nonce. +func (t *TransactionAdapter) Nonce() uint64 { + return t.tx.Nonce() +} + +// Gas returns the gas limit. +func (t *TransactionAdapter) Gas() uint64 { + return t.tx.Gas() +} + +// GasPrice returns the gas price (for legacy transactions). +func (t *TransactionAdapter) GasPrice() *big.Int { + return t.tx.GasPrice() +} + +// GasTipCap returns the max priority fee per gas (EIP-1559). +func (t *TransactionAdapter) GasTipCap() *big.Int { + return t.tx.GasTipCap() +} + +// GasFeeCap returns the max fee per gas (EIP-1559). +func (t *TransactionAdapter) GasFeeCap() *big.Int { + return t.tx.GasFeeCap() +} + +// Value returns the value transferred in wei. +func (t *TransactionAdapter) Value() *big.Int { + return t.tx.Value() +} + +// Data returns the input data (calldata). +func (t *TransactionAdapter) Data() []byte { + return t.tx.Data() +} + +// Size returns the encoded transaction size in bytes. +func (t *TransactionAdapter) Size() uint64 { + return t.tx.Size() +} + +// ChainId returns the chain ID, or nil for legacy transactions. +func (t *TransactionAdapter) ChainId() *big.Int { + return t.tx.ChainId() +} + +// BlobGas returns the blob gas used (for blob transactions). +func (t *TransactionAdapter) BlobGas() uint64 { + return t.tx.BlobGas() +} + +// BlobGasFeeCap returns the max blob fee per gas (for blob transactions). +func (t *TransactionAdapter) BlobGasFeeCap() *big.Int { + return t.tx.BlobGasFeeCap() +} + +// BlobHashes returns the versioned hashes (for blob transactions). +func (t *TransactionAdapter) BlobHashes() []execution.Hash { + gethHashes := t.tx.BlobHashes() + hashes := make([]execution.Hash, len(gethHashes)) + + for i, h := range gethHashes { + hashes[i] = execution.Hash(h) + } + + return hashes +} + +// ReceiptAdapter wraps a go-ethereum Receipt to implement execution.Receipt. +type ReceiptAdapter struct { + receipt *types.Receipt +} + +// NewReceiptAdapter creates a new ReceiptAdapter from a go-ethereum Receipt. +func NewReceiptAdapter(receipt *types.Receipt) *ReceiptAdapter { + return &ReceiptAdapter{receipt: receipt} +} + +// Status returns the transaction status (1=success, 0=failure). +func (r *ReceiptAdapter) Status() uint64 { + return r.receipt.Status +} + +// TxHash returns the transaction hash. +func (r *ReceiptAdapter) TxHash() execution.Hash { + return execution.Hash(r.receipt.TxHash) +} + +// GasUsed returns the gas used by the transaction. +func (r *ReceiptAdapter) GasUsed() uint64 { + return r.receipt.GasUsed +} + +// AdaptReceipts converts a slice of go-ethereum receipts to execution.Receipt interfaces. +func AdaptReceipts(receipts []*types.Receipt) []execution.Receipt { + result := make([]execution.Receipt, len(receipts)) + + for i, r := range receipts { + result[i] = NewReceiptAdapter(r) + } + + return result +} diff --git a/pkg/ethereum/execution/rpc.go b/pkg/ethereum/execution/geth/rpc.go similarity index 66% rename from pkg/ethereum/execution/rpc.go rename to pkg/ethereum/execution/geth/rpc.go index 638cb84..c34da64 100644 --- a/pkg/ethereum/execution/rpc.go +++ b/pkg/ethereum/execution/geth/rpc.go @@ -1,4 +1,6 @@ -package execution +//go:build !embedded + +package geth import ( "context" @@ -8,18 +10,18 @@ import ( "time" "github.com/ethereum/go-ethereum/common" - "github.com/ethereum/go-ethereum/core/types" "github.com/ethereum/go-ethereum/rpc" pcommon "github.com/ethpandaops/execution-processor/pkg/common" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" ) const ( - STATUS_ERROR = "error" - STATUS_SUCCESS = "success" + statusError = "error" + statusSuccess = "success" ) -func (n *Node) BlockNumber(ctx context.Context) (*uint64, error) { +func (n *RPCNode) blockNumber(ctx context.Context) (*uint64, error) { start := time.Now() blockNumber, err := n.client.BlockNumber(ctx) @@ -27,9 +29,9 @@ func (n *Node) BlockNumber(ctx context.Context) (*uint64, error) { duration := time.Since(start) // Record RPC metrics - status := STATUS_SUCCESS + status := statusSuccess if err != nil { - status = STATUS_ERROR + status = statusError } network := n.Metadata().ChainID() @@ -44,7 +46,7 @@ func (n *Node) BlockNumber(ctx context.Context) (*uint64, error) { return &blockNumber, nil } -func (n *Node) BlockByNumber(ctx context.Context, blockNumber *big.Int) (*types.Block, error) { +func (n *RPCNode) blockByNumber(ctx context.Context, blockNumber *big.Int) (execution.Block, error) { start := time.Now() block, err := n.client.BlockByNumber(ctx, blockNumber) @@ -52,9 +54,9 @@ func (n *Node) BlockByNumber(ctx context.Context, blockNumber *big.Int) (*types. duration := time.Since(start) // Record RPC metrics - status := STATUS_SUCCESS + status := statusSuccess if err != nil { - status = STATUS_ERROR + status = statusError } network := n.Metadata().ChainID() @@ -66,11 +68,11 @@ func (n *Node) BlockByNumber(ctx context.Context, blockNumber *big.Int) (*types. return nil, err } - return block, nil + return NewBlockAdapter(block), nil } // getTraceParams returns VM trace parameters with configurable options. -func getTraceParams(hash string, options TraceOptions) []any { +func getTraceParams(hash string, options execution.TraceOptions) []any { return []any{ hash, map[string]any{ @@ -83,8 +85,8 @@ func getTraceParams(hash string, options TraceOptions) []any { } // traceTransactionErigon handles tracing for Erigon clients. -func (n *Node) traceTransactionErigon(ctx context.Context, hash string, options TraceOptions) (*TraceTransaction, error) { - var rsp ErigonResult +func (n *RPCNode) traceTransactionErigon(ctx context.Context, hash string, options execution.TraceOptions) (*execution.TraceTransaction, error) { + var rsp erigonResult start := time.Now() @@ -93,9 +95,9 @@ func (n *Node) traceTransactionErigon(ctx context.Context, hash string, options duration := time.Since(start) // Record RPC metrics - status := STATUS_SUCCESS + status := statusSuccess if err != nil { - status = STATUS_ERROR + status = statusError } network := n.Metadata().ChainID() @@ -112,11 +114,11 @@ func (n *Node) traceTransactionErigon(ctx context.Context, hash string, options returnValue = nil } - result := &TraceTransaction{ + result := &execution.TraceTransaction{ Gas: rsp.Gas, Failed: rsp.Failed, ReturnValue: returnValue, - Structlogs: []StructLog{}, + Structlogs: []execution.StructLog{}, } // Empty array on transfer @@ -128,7 +130,7 @@ func (n *Node) traceTransactionErigon(ctx context.Context, hash string, options *returnData = hex.EncodeToString(log.ReturnData) } - result.Structlogs = append(result.Structlogs, StructLog{ + result.Structlogs = append(result.Structlogs, execution.StructLog{ PC: log.PC, Op: log.Op, Gas: log.Gas, @@ -144,8 +146,8 @@ func (n *Node) traceTransactionErigon(ctx context.Context, hash string, options return result, nil } -// BlockReceipts fetches all receipts for a block by number (much faster than per-tx). -func (n *Node) BlockReceipts(ctx context.Context, blockNumber *big.Int) ([]*types.Receipt, error) { +// blockReceipts fetches all receipts for a block by number (much faster than per-tx). +func (n *RPCNode) blockReceipts(ctx context.Context, blockNumber *big.Int) ([]execution.Receipt, error) { start := time.Now() blockNrOrHash := rpc.BlockNumberOrHashWithNumber(rpc.BlockNumber(blockNumber.Int64())) @@ -155,9 +157,9 @@ func (n *Node) BlockReceipts(ctx context.Context, blockNumber *big.Int) ([]*type duration := time.Since(start) // Record RPC metrics - status := STATUS_SUCCESS + status := statusSuccess if err != nil { - status = STATUS_ERROR + status = statusError } network := n.Metadata().ChainID() @@ -180,11 +182,11 @@ func (n *Node) BlockReceipts(ctx context.Context, blockNumber *big.Int) ([]*type return nil, err } - return receipts, nil + return AdaptReceipts(receipts), nil } -// TransactionReceipt fetches the receipt for a transaction by hash. -func (n *Node) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) { +// transactionReceipt fetches the receipt for a transaction by hash. +func (n *RPCNode) transactionReceipt(ctx context.Context, hash string) (execution.Receipt, error) { start := time.Now() txHash := common.HexToHash(hash) @@ -194,9 +196,9 @@ func (n *Node) TransactionReceipt(ctx context.Context, hash string) (*types.Rece duration := time.Since(start) // Record RPC metrics - status := STATUS_SUCCESS + status := statusSuccess if err != nil { - status = STATUS_ERROR + status = statusError } network := n.Metadata().ChainID() @@ -219,11 +221,16 @@ func (n *Node) TransactionReceipt(ctx context.Context, hash string) (*types.Rece return nil, err } - return receipt, nil + return NewReceiptAdapter(receipt), nil } -// DebugTraceTransaction traces a transaction execution using the client's debug API. -func (n *Node) DebugTraceTransaction(ctx context.Context, hash string, blockNumber *big.Int, options TraceOptions) (*TraceTransaction, error) { +// debugTraceTransaction traces a transaction execution using the client's debug API. +func (n *RPCNode) debugTraceTransaction( + ctx context.Context, + hash string, + _ *big.Int, + options execution.TraceOptions, +) (*execution.TraceTransaction, error) { // Add a timeout if the context doesn't already have one if _, hasDeadline := ctx.Deadline(); !hasDeadline { var cancel context.CancelFunc @@ -251,3 +258,24 @@ func (n *Node) DebugTraceTransaction(ctx context.Context, hash string, blockNumb return n.traceTransactionErigon(ctx, hash, options) } } + +// erigonResult represents the result from an Erigon debug_traceTransaction call. +type erigonResult struct { + Gas uint64 `json:"gas"` + Failed bool `json:"failed"` + ReturnValue *string `json:"returnValue"` + StructLogs []erigonStructLog `json:"structLogs"` +} + +// erigonStructLog represents a single structlog entry from Erigon. +type erigonStructLog struct { + PC uint32 `json:"pc"` + Op string `json:"op"` + Gas uint64 `json:"gas"` + GasCost uint64 `json:"gasCost"` + Depth uint64 `json:"depth"` + ReturnData []byte `json:"returnData"` + Refund *uint64 `json:"refund"` + Error *string `json:"error"` + Stack *[]string `json:"stack"` +} diff --git a/pkg/ethereum/execution/node.go b/pkg/ethereum/execution/geth/rpc_node.go similarity index 68% rename from pkg/ethereum/execution/node.go rename to pkg/ethereum/execution/geth/rpc_node.go index d070193..210ce56 100644 --- a/pkg/ethereum/execution/node.go +++ b/pkg/ethereum/execution/geth/rpc_node.go @@ -1,9 +1,12 @@ -package execution +//go:build !embedded + +package geth import ( "context" "errors" "fmt" + "math/big" "net" "net/http" "sync" @@ -11,10 +14,15 @@ import ( "github.com/ethereum/go-ethereum/ethclient" "github.com/ethereum/go-ethereum/rpc" - "github.com/ethpandaops/execution-processor/pkg/ethereum/execution/services" "github.com/sirupsen/logrus" + + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution/geth/services" ) +// Compile-time check that RPCNode implements execution.Node interface. +var _ execution.Node = (*RPCNode)(nil) + // headerTransport adds custom headers to requests and respects context cancellation. type headerTransport struct { headers map[string]string @@ -36,8 +44,9 @@ func (t *headerTransport) RoundTrip(req *http.Request) (*http.Response, error) { return t.base.RoundTrip(req) } -type Node struct { - config *Config +// RPCNode implements execution.Node using JSON-RPC connections. +type RPCNode struct { + config *execution.Config log logrus.FieldLogger client *ethclient.Client rpcClient *rpc.Client @@ -53,19 +62,20 @@ type Node struct { cancel context.CancelFunc } -func NewNode(log logrus.FieldLogger, conf *Config) *Node { - return &Node{ +// NewRPCNode creates a new RPC-based execution node. +func NewRPCNode(log logrus.FieldLogger, conf *execution.Config) *RPCNode { + return &RPCNode{ config: conf, log: log.WithFields(logrus.Fields{"type": "execution", "source": conf.Name}), services: []services.Service{}, } } -func (n *Node) OnReady(_ context.Context, callback func(ctx context.Context) error) { +func (n *RPCNode) OnReady(_ context.Context, callback func(ctx context.Context) error) { n.onReadyCallbacks = append(n.onReadyCallbacks, callback) } -func (n *Node) Start(ctx context.Context) error { +func (n *RPCNode) Start(ctx context.Context) error { n.log.WithFields(logrus.Fields{ "node_name": n.name, }).Info("Starting execution node") @@ -193,7 +203,7 @@ func (n *Node) Start(ctx context.Context) error { return nil } -func (n *Node) Stop(ctx context.Context) error { +func (n *RPCNode) Stop(ctx context.Context) error { n.log.Info("Stopping execution node") // Cancel the node context to signal all goroutines to stop @@ -230,7 +240,7 @@ func (n *Node) Stop(ctx context.Context) error { return nil } -func (n *Node) getServiceByName(name services.Name) (services.Service, error) { +func (n *RPCNode) getServiceByName(name services.Name) (services.Service, error) { for _, service := range n.services { if service.Name() == name { return service, nil @@ -240,7 +250,8 @@ func (n *Node) getServiceByName(name services.Name) (services.Service, error) { return nil, errors.New("service not found") } -func (n *Node) Metadata() *services.MetadataService { +// Metadata returns the metadata service for this node. +func (n *RPCNode) Metadata() *services.MetadataService { service, err := n.getServiceByName("metadata") if err != nil { // This should never happen. If it does, good luck. @@ -255,6 +266,64 @@ func (n *Node) Metadata() *services.MetadataService { return svc } -func (n *Node) Name() string { +// Name returns the configured name for this node. +func (n *RPCNode) Name() string { return n.config.Name } + +// ChainID returns the chain ID from the metadata service. +func (n *RPCNode) ChainID() int32 { + if meta := n.Metadata(); meta != nil { + return meta.ChainID() + } + + return 0 +} + +// ClientType returns the client type from the metadata service. +func (n *RPCNode) ClientType() string { + if meta := n.Metadata(); meta != nil { + return meta.ClientVersion() + } + + return "" +} + +// IsSynced returns true if the node is synced. +func (n *RPCNode) IsSynced() bool { + if meta := n.Metadata(); meta != nil { + return meta.IsSynced() + } + + return false +} + +// BlockNumber returns the current block number. +func (n *RPCNode) BlockNumber(ctx context.Context) (*uint64, error) { + return n.blockNumber(ctx) +} + +// BlockByNumber returns the block at the given number. +func (n *RPCNode) BlockByNumber(ctx context.Context, number *big.Int) (execution.Block, error) { + return n.blockByNumber(ctx, number) +} + +// BlockReceipts returns all receipts for the block at the given number. +func (n *RPCNode) BlockReceipts(ctx context.Context, number *big.Int) ([]execution.Receipt, error) { + return n.blockReceipts(ctx, number) +} + +// TransactionReceipt returns the receipt for the transaction with the given hash. +func (n *RPCNode) TransactionReceipt(ctx context.Context, hash string) (execution.Receipt, error) { + return n.transactionReceipt(ctx, hash) +} + +// DebugTraceTransaction returns the execution trace for the transaction. +func (n *RPCNode) DebugTraceTransaction( + ctx context.Context, + hash string, + blockNumber *big.Int, + opts execution.TraceOptions, +) (*execution.TraceTransaction, error) { + return n.debugTraceTransaction(ctx, hash, blockNumber, opts) +} diff --git a/pkg/ethereum/execution/services/client.go b/pkg/ethereum/execution/geth/services/client.go similarity index 97% rename from pkg/ethereum/execution/services/client.go rename to pkg/ethereum/execution/geth/services/client.go index 729c8e5..eb00df0 100644 --- a/pkg/ethereum/execution/services/client.go +++ b/pkg/ethereum/execution/geth/services/client.go @@ -1,3 +1,5 @@ +//go:build !embedded + package services import ( diff --git a/pkg/ethereum/execution/services/metadata.go b/pkg/ethereum/execution/geth/services/metadata.go similarity index 99% rename from pkg/ethereum/execution/services/metadata.go rename to pkg/ethereum/execution/geth/services/metadata.go index e156a36..c53e795 100644 --- a/pkg/ethereum/execution/services/metadata.go +++ b/pkg/ethereum/execution/geth/services/metadata.go @@ -1,3 +1,5 @@ +//go:build !embedded + package services import ( diff --git a/pkg/ethereum/execution/services/service.go b/pkg/ethereum/execution/geth/services/service.go similarity index 92% rename from pkg/ethereum/execution/services/service.go rename to pkg/ethereum/execution/geth/services/service.go index 7048e47..3a103fa 100644 --- a/pkg/ethereum/execution/services/service.go +++ b/pkg/ethereum/execution/geth/services/service.go @@ -1,3 +1,5 @@ +//go:build !embedded + package services import "context" diff --git a/pkg/ethereum/execution/interface.go b/pkg/ethereum/execution/interface.go new file mode 100644 index 0000000..ed273bd --- /dev/null +++ b/pkg/ethereum/execution/interface.go @@ -0,0 +1,64 @@ +package execution + +import ( + "context" + "math/big" +) + +// Node defines the interface for execution data providers. +// +// Implementations include: +// - geth.RPCNode: connects to execution clients via JSON-RPC over HTTP +// - EmbeddedNode: receives data directly from host application via DataSource +// +// All methods must be safe for concurrent use by multiple goroutines. +// +// Lifecycle: +// 1. Create node with appropriate constructor (geth.NewRPCNode or NewEmbeddedNode) +// 2. Register OnReady callbacks before calling Start +// 3. Call Start to begin initialization +// 4. Node signals readiness by executing OnReady callbacks +// 5. Call Stop for graceful shutdown +type Node interface { + // Start initializes the node and begins any background operations. + // For RPCNode, this establishes the RPC connection and starts health monitoring. + // For EmbeddedNode, this is a no-op as the host controls the DataSource lifecycle. + Start(ctx context.Context) error + + // Stop gracefully shuts down the node and releases resources. + // Should be called when the node is no longer needed. + Stop(ctx context.Context) error + + // OnReady registers a callback to be invoked when the node becomes ready. + // For RPCNode, callbacks execute when the RPC connection is healthy. + // For EmbeddedNode, callbacks execute when MarkReady is called by the host. + // Multiple callbacks can be registered and will execute in registration order. + OnReady(ctx context.Context, callback func(ctx context.Context) error) + + // BlockNumber returns the current block number from the execution client. + BlockNumber(ctx context.Context) (*uint64, error) + + // BlockByNumber returns the block at the given number. + BlockByNumber(ctx context.Context, number *big.Int) (Block, error) + + // BlockReceipts returns all receipts for the block at the given number. + BlockReceipts(ctx context.Context, number *big.Int) ([]Receipt, error) + + // TransactionReceipt returns the receipt for the transaction with the given hash. + TransactionReceipt(ctx context.Context, hash string) (Receipt, error) + + // DebugTraceTransaction returns the execution trace for the transaction. + DebugTraceTransaction(ctx context.Context, hash string, blockNumber *big.Int, opts TraceOptions) (*TraceTransaction, error) + + // ChainID returns the chain ID reported by the execution client. + ChainID() int32 + + // ClientType returns the client type/version string (e.g., "geth/1.10.0"). + ClientType() string + + // IsSynced returns true if the execution client is fully synced. + IsSynced() bool + + // Name returns the configured name for this node. + Name() string +} diff --git a/pkg/ethereum/pool.go b/pkg/ethereum/pool.go index bc53519..2c52438 100644 --- a/pkg/ethereum/pool.go +++ b/pkg/ethereum/pool.go @@ -14,35 +14,56 @@ import ( type Pool struct { log logrus.FieldLogger - executionNodes []*execution.Node + executionNodes []execution.Node metrics *Metrics config *Config mu sync.RWMutex - healthyExecutionNodes map[*execution.Node]bool + healthyExecutionNodes map[execution.Node]bool // Goroutine management wg sync.WaitGroup cancel context.CancelFunc } -func NewPool(log logrus.FieldLogger, namespace string, config *Config) *Pool { +// NewPoolWithNodes creates a pool with pre-created Node implementations. +// Use this when embedding execution-processor as a library where the host +// provides custom Node implementations (e.g., EmbeddedNode with DataSource). +// +// Parameters: +// - log: Logger for pool operations +// - namespace: Metrics namespace prefix (will have "_ethereum" appended) +// - nodes: Pre-created Node implementations +// - config: Optional configuration (nil creates empty config with defaults) +// +// Example: +// +// // Create embedded node with custom data source +// dataSource := &MyDataSource{client: myClient} +// node := execution.NewEmbeddedNode(log, "my-node", dataSource) +// +// // Create pool with the embedded node +// pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil) +// pool.Start(ctx) +// +// // Mark ready when data source is ready +// node.MarkReady(ctx) +func NewPoolWithNodes(log logrus.FieldLogger, namespace string, nodes []execution.Node, config *Config) *Pool { namespace = fmt.Sprintf("%s_ethereum", namespace) - p := &Pool{ + + // If config is nil, create an empty config + if config == nil { + config = &Config{} + } + + return &Pool{ log: log, - executionNodes: make([]*execution.Node, 0), - healthyExecutionNodes: make(map[*execution.Node]bool), + executionNodes: nodes, + healthyExecutionNodes: make(map[execution.Node]bool, len(nodes)), metrics: GetMetricsInstance(namespace), config: config, } - - for _, execCfg := range config.Execution { - node := execution.NewNode(log, execCfg) - p.executionNodes = append(p.executionNodes, node) - } - - return p } func (p *Pool) HasExecutionNodes() bool { @@ -62,11 +83,11 @@ func (p *Pool) HasHealthyExecutionNodes() bool { return false } -func (p *Pool) GetHealthyExecutionNodes() []*execution.Node { +func (p *Pool) GetHealthyExecutionNodes() []execution.Node { p.mu.RLock() defer p.mu.RUnlock() - var healthyNodes []*execution.Node + healthyNodes := make([]execution.Node, 0, len(p.healthyExecutionNodes)) for node, healthy := range p.healthyExecutionNodes { if healthy { @@ -77,11 +98,11 @@ func (p *Pool) GetHealthyExecutionNodes() []*execution.Node { return healthyNodes } -func (p *Pool) GetHealthyExecutionNode() *execution.Node { +func (p *Pool) GetHealthyExecutionNode() execution.Node { p.mu.RLock() defer p.mu.RUnlock() - var healthyNodes []*execution.Node + healthyNodes := make([]execution.Node, 0, len(p.healthyExecutionNodes)) for node, healthy := range p.healthyExecutionNodes { if healthy { @@ -97,7 +118,7 @@ func (p *Pool) GetHealthyExecutionNode() *execution.Node { return healthyNodes[rand.IntN(len(healthyNodes))] } -func (p *Pool) WaitForHealthyExecutionNode(ctx context.Context) (*execution.Node, error) { +func (p *Pool) WaitForHealthyExecutionNode(ctx context.Context) (execution.Node, error) { // Check if we have any execution nodes configured if len(p.executionNodes) == 0 { return nil, fmt.Errorf("no execution nodes configured") diff --git a/pkg/ethereum/pool_rpc.go b/pkg/ethereum/pool_rpc.go new file mode 100644 index 0000000..38ef9e8 --- /dev/null +++ b/pkg/ethereum/pool_rpc.go @@ -0,0 +1,32 @@ +//go:build !embedded + +package ethereum + +import ( + "fmt" + + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution/geth" + "github.com/sirupsen/logrus" +) + +// NewPool creates a new pool from config, using RPC nodes. +// This function imports go-ethereum types through the geth package. +// For embedded mode (no go-ethereum dependency), use NewPoolWithNodes instead. +func NewPool(log logrus.FieldLogger, namespace string, config *Config) *Pool { + namespace = fmt.Sprintf("%s_ethereum", namespace) + p := &Pool{ + log: log, + executionNodes: make([]execution.Node, 0, len(config.Execution)), + healthyExecutionNodes: make(map[execution.Node]bool, len(config.Execution)), + metrics: GetMetricsInstance(namespace), + config: config, + } + + for _, execCfg := range config.Execution { + node := geth.NewRPCNode(log, execCfg) + p.executionNodes = append(p.executionNodes, node) + } + + return p +} diff --git a/pkg/ethereum/pool_test.go b/pkg/ethereum/pool_test.go index 5cda22b..04b43aa 100644 --- a/pkg/ethereum/pool_test.go +++ b/pkg/ethereum/pool_test.go @@ -2,6 +2,7 @@ package ethereum_test import ( "context" + "math/big" "sync" "testing" "time" @@ -13,6 +14,126 @@ import ( "github.com/stretchr/testify/require" ) +// MockBlock implements execution.Block for testing. +type MockBlock struct { + number *big.Int +} + +func (b *MockBlock) Number() *big.Int { return b.number } +func (b *MockBlock) Hash() execution.Hash { return execution.Hash{} } +func (b *MockBlock) ParentHash() execution.Hash { return execution.Hash{} } +func (b *MockBlock) BaseFee() *big.Int { return nil } +func (b *MockBlock) Transactions() []execution.Transaction { return nil } + +// MockReceipt implements execution.Receipt for testing. +type MockReceipt struct{} + +func (r *MockReceipt) Status() uint64 { return 1 } +func (r *MockReceipt) TxHash() execution.Hash { return execution.Hash{} } +func (r *MockReceipt) GasUsed() uint64 { return 21000 } + +// MockNode implements execution.Node for testing. +type MockNode struct { + name string + started bool + stopped bool + onReadyCallbacks []func(ctx context.Context) error + mu sync.Mutex +} + +func NewMockNode(name string) *MockNode { + return &MockNode{ + name: name, + onReadyCallbacks: make([]func(ctx context.Context) error, 0), + } +} + +func (m *MockNode) Start(_ context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.started = true + + return nil +} + +func (m *MockNode) Stop(_ context.Context) error { + m.mu.Lock() + defer m.mu.Unlock() + + m.stopped = true + + return nil +} + +func (m *MockNode) OnReady(_ context.Context, callback func(ctx context.Context) error) { + m.mu.Lock() + defer m.mu.Unlock() + + m.onReadyCallbacks = append(m.onReadyCallbacks, callback) +} + +// TriggerReady simulates the node becoming ready by calling all OnReady callbacks. +func (m *MockNode) TriggerReady(ctx context.Context) error { + m.mu.Lock() + callbacks := m.onReadyCallbacks + m.mu.Unlock() + + for _, cb := range callbacks { + if err := cb(ctx); err != nil { + return err + } + } + + return nil +} + +func (m *MockNode) BlockNumber(_ context.Context) (*uint64, error) { + num := uint64(12345) + + return &num, nil +} + +func (m *MockNode) BlockByNumber(_ context.Context, number *big.Int) (execution.Block, error) { + return &MockBlock{number: number}, nil +} + +func (m *MockNode) BlockReceipts(_ context.Context, _ *big.Int) ([]execution.Receipt, error) { + return []execution.Receipt{}, nil +} + +func (m *MockNode) TransactionReceipt(_ context.Context, _ string) (execution.Receipt, error) { + return &MockReceipt{}, nil +} + +func (m *MockNode) DebugTraceTransaction( + _ context.Context, + _ string, + _ *big.Int, + _ execution.TraceOptions, +) (*execution.TraceTransaction, error) { + return &execution.TraceTransaction{}, nil +} + +func (m *MockNode) ChainID() int32 { + return 1 +} + +func (m *MockNode) ClientType() string { + return "mock" +} + +func (m *MockNode) IsSynced() bool { + return true +} + +func (m *MockNode) Name() string { + return m.name +} + +// Compile-time check that MockNode implements execution.Node. +var _ execution.Node = (*MockNode)(nil) + func TestPool_Creation(t *testing.T) { log := logrus.New() log.SetLevel(logrus.ErrorLevel) @@ -449,3 +570,240 @@ func TestPool_NodeSelection(t *testing.T) { func stringPtr(s string) *string { return &s } + +// Tests for NewPoolWithNodes + +func TestPool_NewPoolWithNodes_Basic(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node1 := NewMockNode("mock-node-1") + node2 := NewMockNode("mock-node-2") + + nodes := []execution.Node{node1, node2} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + require.NotNil(t, pool) + assert.True(t, pool.HasExecutionNodes()) + assert.False(t, pool.HasHealthyExecutionNodes()) +} + +func TestPool_NewPoolWithNodes_NilConfig(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node := NewMockNode("mock-node") + + nodes := []execution.Node{node} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + require.NotNil(t, pool) + assert.True(t, pool.HasExecutionNodes()) + + // Verify default config behavior - unknown chain ID should error + network, err := pool.GetNetworkByChainID(999999) + assert.Error(t, err) + assert.Nil(t, network) +} + +func TestPool_NewPoolWithNodes_EmptyNodes(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + nodes := []execution.Node{} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + require.NotNil(t, pool) + assert.False(t, pool.HasExecutionNodes()) + assert.False(t, pool.HasHealthyExecutionNodes()) + assert.Nil(t, pool.GetHealthyExecutionNode()) + assert.Empty(t, pool.GetHealthyExecutionNodes()) +} + +func TestPool_NewPoolWithNodes_MultipleNodes(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + const numNodes = 5 + + nodes := make([]execution.Node, numNodes) + for i := 0; i < numNodes; i++ { + nodes[i] = NewMockNode("mock-node-" + string(rune('a'+i))) + } + + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + require.NotNil(t, pool) + assert.True(t, pool.HasExecutionNodes()) +} + +func TestPool_NewPoolWithNodes_WithConfig(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node := NewMockNode("mock-node") + overrideName := "custom-network" + + config := ðereum.Config{ + OverrideNetworkName: &overrideName, + } + + nodes := []execution.Node{node} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, config) + + require.NotNil(t, pool) + + // Verify config is used - override name should be returned + network, err := pool.GetNetworkByChainID(999999) + require.NoError(t, err) + assert.Equal(t, "custom-network", network.Name) +} + +func TestPool_NewPoolWithNodes_StartStop(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node := NewMockNode("mock-node") + + nodes := []execution.Node{node} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + ctx := context.Background() + pool.Start(ctx) + + // Wait for async Start goroutines to execute + time.Sleep(50 * time.Millisecond) + + // Node should have been started + node.mu.Lock() + assert.True(t, node.started) + node.mu.Unlock() + + stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := pool.Stop(stopCtx) + assert.NoError(t, err) + + // Node should have been stopped + node.mu.Lock() + assert.True(t, node.stopped) + node.mu.Unlock() +} + +func TestPool_NewPoolWithNodes_NodeBecomesHealthy(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + node := NewMockNode("mock-node") + + nodes := []execution.Node{node} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + ctx := context.Background() + pool.Start(ctx) + + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := pool.Stop(stopCtx) + assert.NoError(t, err) + }() + + // Wait for async Start goroutines to register callbacks + time.Sleep(50 * time.Millisecond) + + // Initially no healthy nodes + assert.False(t, pool.HasHealthyExecutionNodes()) + + // Trigger the node to become ready (simulates OnReady callback) + err := node.TriggerReady(ctx) + require.NoError(t, err) + + // Now the pool should have a healthy node + assert.True(t, pool.HasHealthyExecutionNodes()) + assert.NotNil(t, pool.GetHealthyExecutionNode()) +} + +func TestPool_EmbeddedNodeIntegration(t *testing.T) { + log := logrus.New() + log.SetLevel(logrus.ErrorLevel) + + // Create an EmbeddedNode with a mock data source + ds := &testDataSource{} + embeddedNode := execution.NewEmbeddedNode(log, "embedded-test", ds) + + nodes := []execution.Node{embeddedNode} + pool := ethereum.NewPoolWithNodes(log, "test", nodes, nil) + + ctx := context.Background() + pool.Start(ctx) + + defer func() { + stopCtx, cancel := context.WithTimeout(context.Background(), 2*time.Second) + defer cancel() + + err := pool.Stop(stopCtx) + assert.NoError(t, err) + }() + + // Wait for async Start goroutines to register callbacks + time.Sleep(50 * time.Millisecond) + + // Initially no healthy nodes + assert.False(t, pool.HasHealthyExecutionNodes()) + + // Mark embedded node as ready + err := embeddedNode.MarkReady(ctx) + require.NoError(t, err) + + // Now the pool should have a healthy node + assert.True(t, pool.HasHealthyExecutionNodes()) + + healthyNode := pool.GetHealthyExecutionNode() + require.NotNil(t, healthyNode) + assert.Equal(t, "embedded-test", healthyNode.Name()) +} + +// testDataSource is a minimal DataSource implementation for integration tests. +type testDataSource struct{} + +func (ds *testDataSource) BlockNumber(_ context.Context) (*uint64, error) { + num := uint64(12345) + + return &num, nil +} + +func (ds *testDataSource) BlockByNumber(_ context.Context, number *big.Int) (execution.Block, error) { + return &MockBlock{number: number}, nil +} + +func (ds *testDataSource) BlockReceipts(_ context.Context, _ *big.Int) ([]execution.Receipt, error) { + return []execution.Receipt{}, nil +} + +func (ds *testDataSource) TransactionReceipt(_ context.Context, _ string) (execution.Receipt, error) { + return &MockReceipt{}, nil +} + +func (ds *testDataSource) DebugTraceTransaction( + _ context.Context, + _ string, + _ *big.Int, + _ execution.TraceOptions, +) (*execution.TraceTransaction, error) { + return &execution.TraceTransaction{}, nil +} + +func (ds *testDataSource) ChainID() int32 { + return 1 +} + +func (ds *testDataSource) ClientType() string { + return "test" +} + +func (ds *testDataSource) IsSynced() bool { + return true +} diff --git a/pkg/processor/manager.go b/pkg/processor/manager.go index a6dc920..78ab668 100644 --- a/pkg/processor/manager.go +++ b/pkg/processor/manager.go @@ -116,7 +116,7 @@ func (m *Manager) Start(ctx context.Context) error { return fmt.Errorf("no healthy execution node available") } - m.network, err = m.pool.GetNetworkByChainID(node.Metadata().ChainID()) + m.network, err = m.pool.GetNetworkByChainID(node.ChainID()) if err != nil { return fmt.Errorf("failed to get network by chain ID: %w", err) } diff --git a/pkg/processor/transaction/simple/handlers.go b/pkg/processor/transaction/simple/handlers.go index 43611b7..1fc48d8 100644 --- a/pkg/processor/transaction/simple/handlers.go +++ b/pkg/processor/transaction/simple/handlers.go @@ -6,11 +6,11 @@ import ( "math/big" "time" - "github.com/ethereum/go-ethereum/core/types" "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" c "github.com/ethpandaops/execution-processor/pkg/processor/common" "github.com/ethpandaops/execution-processor/pkg/processor/transaction/structlog" ) @@ -100,13 +100,13 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err blockTxs := block.Transactions() // Build receipt map - try batch first, fall back to per-tx - receiptMap := make(map[string]*types.Receipt, len(blockTxs)) + receiptMap := make(map[string]execution.Receipt, len(blockTxs)) receipts, err := node.BlockReceipts(ctx, blockNumber) if err == nil { // Use batch receipts for _, r := range receipts { - receiptMap[r.TxHash.Hex()] = r + receiptMap[r.TxHash().Hex()] = r } } @@ -169,26 +169,13 @@ func (p *Processor) handleProcessTask(ctx context.Context, task *asynq.Task) err // buildTransactionRow builds a transaction row from block, tx, and receipt data. func (p *Processor) buildTransactionRow( - block *types.Block, - tx *types.Transaction, - receipt *types.Receipt, + block execution.Block, + tx execution.Transaction, + receipt execution.Receipt, index uint64, ) (Transaction, error) { - // Get sender (from) - handle legacy transactions without chain ID - var signer types.Signer - - chainID := tx.ChainId() - if chainID == nil || chainID.Sign() == 0 { - // Legacy transaction without EIP-155 replay protection - signer = types.HomesteadSigner{} - } else { - signer = types.LatestSignerForChainID(chainID) - } - - from, err := types.Sender(signer, tx) - if err != nil { - return Transaction{}, fmt.Errorf("failed to get sender: %w", err) - } + // Get sender (from) - computed by the data source + from := tx.From() // Build to address (nil for contract creation) var toAddress *string @@ -259,7 +246,7 @@ func (p *Processor) buildTransactionRow( Size: txSize, CallDataSize: callDataSize, BlobHashes: []string{}, // Default empty array - Success: receipt.Status == 1, + Success: receipt.Status() == 1, NInputBytes: callDataSize, NInputZeroBytes: nInputZeroBytes, NInputNonzeroBytes: nInputNonzeroBytes, @@ -268,7 +255,7 @@ func (p *Processor) buildTransactionRow( } // Handle blob transaction fields (type 3) - if tx.Type() == types.BlobTxType { + if tx.Type() == execution.BlobTxType { blobGas := tx.BlobGas() txRow.BlobGas = &blobGas @@ -292,11 +279,11 @@ func (p *Processor) buildTransactionRow( // calculateEffectiveGasPrice calculates the effective gas price for a transaction. // For legacy/access list txs: returns tx.GasPrice(). // For EIP-1559+ txs: returns min(max_fee_per_gas, base_fee + max_priority_fee_per_gas). -func calculateEffectiveGasPrice(block *types.Block, tx *types.Transaction) *big.Int { +func calculateEffectiveGasPrice(block execution.Block, tx execution.Transaction) *big.Int { txType := tx.Type() // Legacy and access list transactions use GasPrice directly - if txType == types.LegacyTxType || txType == types.AccessListTxType { + if txType == execution.LegacyTxType || txType == execution.AccessListTxType { if tx.GasPrice() != nil { return tx.GasPrice() } diff --git a/pkg/processor/transaction/structlog/block_processing.go b/pkg/processor/transaction/structlog/block_processing.go index 83dd525..3183308 100644 --- a/pkg/processor/transaction/structlog/block_processing.go +++ b/pkg/processor/transaction/structlog/block_processing.go @@ -7,11 +7,11 @@ import ( "math/big" "strings" - "github.com/ethereum/go-ethereum/core/types" "github.com/hibiken/asynq" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" + "github.com/ethpandaops/execution-processor/pkg/ethereum/execution" c "github.com/ethpandaops/execution-processor/pkg/processor/common" "github.com/ethpandaops/execution-processor/pkg/state" ) @@ -175,7 +175,7 @@ func isBlockNotFoundError(err error) bool { // enqueueTransactionTasks enqueues tasks for all transactions in a block. // EnqueueTransactionTasks enqueues transaction processing tasks for a given block. -func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block *types.Block) (int, error) { +func (p *Processor) EnqueueTransactionTasks(ctx context.Context, block execution.Block) (int, error) { var enqueuedCount int var errs []error diff --git a/pkg/processor/transaction/structlog/transaction_processing.go b/pkg/processor/transaction/structlog/transaction_processing.go index b700cc6..9d0cfec 100644 --- a/pkg/processor/transaction/structlog/transaction_processing.go +++ b/pkg/processor/transaction/structlog/transaction_processing.go @@ -5,7 +5,6 @@ import ( "fmt" "time" - "github.com/ethereum/go-ethereum/core/types" "github.com/sirupsen/logrus" "github.com/ethpandaops/execution-processor/pkg/common" @@ -37,7 +36,7 @@ type Structlog struct { } // ProcessSingleTransaction processes a single transaction and inserts its structlogs directly to ClickHouse. -func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) { +func (p *Processor) ProcessSingleTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error) { // Extract structlog data structlogs, err := p.ExtractStructlogs(ctx, block, index, tx) if err != nil { @@ -67,7 +66,7 @@ func (p *Processor) ProcessSingleTransaction(ctx context.Context, block *types.B } // ProcessTransaction processes a transaction using memory-efficient channel-based batching. -func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, index int, tx *types.Transaction) (int, error) { +func (p *Processor) ProcessTransaction(ctx context.Context, block execution.Block, index int, tx execution.Transaction) (int, error) { // Get trace from execution node trace, err := p.getTransactionTrace(ctx, tx, block) if err != nil { @@ -207,7 +206,7 @@ func (p *Processor) ProcessTransaction(ctx context.Context, block *types.Block, } // getTransactionTrace gets the trace for a transaction. -func (p *Processor) getTransactionTrace(ctx context.Context, tx *types.Transaction, block *types.Block) (*execution.TraceTransaction, error) { +func (p *Processor) getTransactionTrace(ctx context.Context, tx execution.Transaction, block execution.Block) (*execution.TraceTransaction, error) { // Get execution node node := p.pool.GetHealthyExecutionNode() if node == nil { @@ -239,7 +238,7 @@ func (p *Processor) extractCallAddress(structLog *execution.StructLog) *string { } // ExtractStructlogs extracts structlog data from a transaction without inserting to database. -func (p *Processor) ExtractStructlogs(ctx context.Context, block *types.Block, index int, tx *types.Transaction) ([]Structlog, error) { +func (p *Processor) ExtractStructlogs(ctx context.Context, block execution.Block, index int, tx execution.Transaction) ([]Structlog, error) { start := time.Now() defer func() {