Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
87 changes: 87 additions & 0 deletions README.md
Original file line number Diff line number Diff line change
Expand Up @@ -57,6 +57,93 @@ A distributed system for processing Ethereum execution layer data with support f
└─────────────────────────────────────────┘
```

## Embedded Mode (Library Usage)

The execution-processor can be embedded as a library within an execution client, providing direct data access without JSON-RPC overhead.

### Implementing DataSource

```go
import (
"context"
"math/big"

"github.com/ethereum/go-ethereum/core/types"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

type MyDataSource struct {
client *MyExecutionClient
}

func (ds *MyDataSource) BlockNumber(ctx context.Context) (*uint64, error) {
num := ds.client.CurrentBlock()
return &num, nil
}

func (ds *MyDataSource) BlockByNumber(ctx context.Context, number *big.Int) (*types.Block, error) {
return ds.client.GetBlock(number), nil
}

func (ds *MyDataSource) BlockReceipts(ctx context.Context, number *big.Int) ([]*types.Receipt, error) {
return ds.client.GetBlockReceipts(number), nil
}

func (ds *MyDataSource) TransactionReceipt(ctx context.Context, hash string) (*types.Receipt, error) {
return ds.client.GetReceipt(hash), nil
}

func (ds *MyDataSource) DebugTraceTransaction(
ctx context.Context,
hash string,
blockNumber *big.Int,
opts execution.TraceOptions,
) (*execution.TraceTransaction, error) {
return ds.client.TraceTransaction(hash, opts), nil
}

func (ds *MyDataSource) ChainID() int32 {
return ds.client.ChainID()
}

func (ds *MyDataSource) ClientType() string {
return "my-client/1.0.0"
}

func (ds *MyDataSource) IsSynced() bool {
return ds.client.IsSynced()
}
```

### Creating an Embedded Pool

```go
import (
"github.com/ethpandaops/execution-processor/pkg/ethereum"
"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
)

// Create embedded node with your data source
dataSource := &MyDataSource{client: myClient}
node := execution.NewEmbeddedNode(log, "embedded", dataSource)

// Create pool with the embedded node
pool := ethereum.NewPoolWithNodes(log, "processor", []execution.Node{node}, nil)
pool.Start(ctx)

// Mark ready when your client is synced and ready to serve data
node.MarkReady(ctx)
```

### Embedded vs RPC Mode

| Aspect | RPC Mode | Embedded Mode |
|--------|----------|---------------|
| Data Access | JSON-RPC over HTTP | Direct function calls |
| Readiness | Auto-detected via RPC health checks | Host calls MarkReady() |
| Performance | Network + serialization overhead | Zero serialization overhead |
| Use Case | External execution clients | Library integration |

## Manual Block Queue API

The execution processor provides an HTTP API for manually queuing blocks for reprocessing. This is useful for fixing data issues or reprocessing specific blocks.
Expand Down
84 changes: 84 additions & 0 deletions pkg/config/config.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,84 @@
// Package config provides configuration types for execution-processor.
// This package is designed to be imported without pulling in go-ethereum dependencies,
// making it suitable for embedded mode integrations.
package config

import (
"fmt"
"time"

"github.com/ethpandaops/execution-processor/pkg/ethereum/execution"
"github.com/ethpandaops/execution-processor/pkg/processor"
"github.com/ethpandaops/execution-processor/pkg/redis"
"github.com/ethpandaops/execution-processor/pkg/state"
)

// EthereumConfig is the ethereum network configuration.
// This is a copy of ethereum.Config to avoid importing pkg/ethereum
// which would pull in go-ethereum dependencies.
type EthereumConfig struct {
// Execution configuration
Execution []*execution.Config `yaml:"execution"`
// Override network name for custom networks (bypasses networkMap)
OverrideNetworkName *string `yaml:"overrideNetworkName"`
}

// Validate validates the ethereum configuration.
func (c *EthereumConfig) Validate() error {
for i, exec := range c.Execution {
if err := exec.Validate(); err != nil {
return fmt.Errorf("invalid execution configuration at index %d: %w", i, err)
}
}

return nil
}

// Config is the main configuration for execution-processor.
type Config struct {
// MetricsAddr is the address to listen on for metrics.
MetricsAddr string `yaml:"metricsAddr" default:":9090"`
// HealthCheckAddr is the address to listen on for healthcheck.
HealthCheckAddr *string `yaml:"healthCheckAddr"`
// PProfAddr is the address to listen on for pprof.
PProfAddr *string `yaml:"pprofAddr"`
// APIAddr is the address to listen on for the API server.
APIAddr *string `yaml:"apiAddr"`
// LoggingLevel is the logging level to use.
LoggingLevel string `yaml:"logging" default:"info"`
// Ethereum is the ethereum network configuration.
Ethereum EthereumConfig `yaml:"ethereum"`
// Redis is the redis configuration.
Redis *redis.Config `yaml:"redis"`
// StateManager is the state manager configuration.
StateManager state.Config `yaml:"stateManager"`
// Processors is the processor configuration.
Processors processor.Config `yaml:"processors"`
// ShutdownTimeout is the timeout for shutting down the server.
ShutdownTimeout time.Duration `yaml:"shutdownTimeout" default:"10s"`
}

// Validate validates the configuration.
func (c *Config) Validate() error {
if c.Redis == nil {
return fmt.Errorf("redis configuration is required")
}

if err := c.Redis.Validate(); err != nil {
return fmt.Errorf("invalid redis configuration: %w", err)
}

if err := c.Ethereum.Validate(); err != nil {
return fmt.Errorf("invalid ethereum configuration: %w", err)
}

if err := c.StateManager.Validate(); err != nil {
return fmt.Errorf("invalid state manager configuration: %w", err)
}

if err := c.Processors.Validate(); err != nil {
return fmt.Errorf("invalid processor configuration: %w", err)
}

return nil
}
136 changes: 136 additions & 0 deletions pkg/ethereum/execution/block.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,136 @@
package execution

import "math/big"

// Hash represents a 32-byte hash.
type Hash [32]byte

// Hex returns the hex string representation of the hash.
func (h Hash) Hex() string {
return "0x" + encodeHex(h[:])
}

// String returns the hex string representation of the hash.
func (h Hash) String() string {
return h.Hex()
}

// Address represents a 20-byte Ethereum address.
type Address [20]byte

// Hex returns the hex string representation of the address with checksum.
func (a Address) Hex() string {
return "0x" + encodeHex(a[:])
}

// String returns the hex string representation of the address.
func (a Address) String() string {
return a.Hex()
}

// encodeHex encodes bytes as hex string without 0x prefix.
func encodeHex(b []byte) string {
const hexChars = "0123456789abcdef"

result := make([]byte, len(b)*2)

for i, v := range b {
result[i*2] = hexChars[v>>4]
result[i*2+1] = hexChars[v&0x0f]
}

return string(result)
}

// Transaction type constants matching go-ethereum values.
const (
LegacyTxType = 0
AccessListTxType = 1
DynamicFeeTxType = 2
BlobTxType = 3
)

// Block interface defines methods for accessing block data.
// Implementations are provided by data sources (RPC, embedded clients).
type Block interface {
// Number returns the block number.
Number() *big.Int

// Hash returns the block hash.
Hash() Hash

// ParentHash returns the parent block hash.
ParentHash() Hash

// BaseFee returns the base fee per gas (EIP-1559), or nil for pre-London blocks.
BaseFee() *big.Int

// Transactions returns all transactions in the block.
Transactions() []Transaction
}

// Transaction interface defines methods for accessing transaction data.
// The From() method returns the sender address, computed by the data source
// using its own crypto implementation (avoiding go-ethereum crypto imports).
type Transaction interface {
// Hash returns the transaction hash.
Hash() Hash

// Type returns the transaction type (0=legacy, 1=access list, 2=dynamic fee, 3=blob).
Type() uint8

// To returns the recipient address, or nil for contract creation.
To() *Address

// From returns the sender address.
// This is computed by the data source using types.Sender() or equivalent.
From() Address

// Nonce returns the sender account nonce.
Nonce() uint64

// Gas returns the gas limit.
Gas() uint64

// GasPrice returns the gas price (for legacy transactions).
GasPrice() *big.Int

// GasTipCap returns the max priority fee per gas (EIP-1559).
GasTipCap() *big.Int

// GasFeeCap returns the max fee per gas (EIP-1559).
GasFeeCap() *big.Int

// Value returns the value transferred in wei.
Value() *big.Int

// Data returns the input data (calldata).
Data() []byte

// Size returns the encoded transaction size in bytes.
Size() uint64

// ChainId returns the chain ID, or nil for legacy transactions.
ChainId() *big.Int

// BlobGas returns the blob gas used (for blob transactions).
BlobGas() uint64

// BlobGasFeeCap returns the max blob fee per gas (for blob transactions).
BlobGasFeeCap() *big.Int

// BlobHashes returns the versioned hashes (for blob transactions).
BlobHashes() []Hash
}

// Receipt interface defines methods for accessing transaction receipt data.
type Receipt interface {
// Status returns the transaction status (1=success, 0=failure).
Status() uint64

// TxHash returns the transaction hash.
TxHash() Hash

// GasUsed returns the gas used by the transaction.
GasUsed() uint64
}
Loading