Skip to content

Commit 9795d18

Browse files
authored
Made autobahn producer use TxMempool (#3224)
Also * disabled bunch of reactors unused in autobahn mode. * fixed race conditions in mempool processPeerUpdates
1 parent ec0ec44 commit 9795d18

31 files changed

Lines changed: 831 additions & 509 deletions

sei-tendermint/config/config.go

Lines changed: 43 additions & 20 deletions
Original file line numberDiff line numberDiff line change
@@ -11,6 +11,7 @@ import (
1111
"strings"
1212
"time"
1313

14+
mempoolcfg "github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool"
1415
tmos "github.com/sei-protocol/sei-chain/sei-tendermint/libs/os"
1516
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
1617
)
@@ -857,38 +858,60 @@ type MempoolConfig struct {
857858
DropPriorityReservoirSize int `mapstructure:"drop-priority-reservoir-size"`
858859
}
859860

861+
func (cfg *MempoolConfig) ToMempoolConfig() *mempoolcfg.Config {
862+
return &mempoolcfg.Config{
863+
Size: cfg.Size,
864+
MaxTxsBytes: cfg.MaxTxsBytes,
865+
CacheSize: cfg.CacheSize,
866+
DuplicateTxsCacheSize: cfg.DuplicateTxsCacheSize,
867+
KeepInvalidTxsInCache: cfg.KeepInvalidTxsInCache,
868+
MaxTxBytes: cfg.MaxTxBytes,
869+
TTLDuration: cfg.TTLDuration,
870+
TTLNumBlocks: cfg.TTLNumBlocks,
871+
TxNotifyThreshold: cfg.TxNotifyThreshold,
872+
PendingSize: cfg.PendingSize,
873+
MaxPendingTxsBytes: cfg.MaxPendingTxsBytes,
874+
RemoveExpiredTxsFromQueue: cfg.RemoveExpiredTxsFromQueue,
875+
DropPriorityThreshold: cfg.DropPriorityThreshold,
876+
DropUtilisationThreshold: cfg.DropUtilisationThreshold,
877+
DropPriorityReservoirSize: cfg.DropPriorityReservoirSize,
878+
}
879+
}
880+
860881
// DefaultMempoolConfig returns a default configuration for the Tendermint mempool.
861882
func DefaultMempoolConfig() *MempoolConfig {
883+
cfg := mempoolcfg.DefaultConfig()
862884
return &MempoolConfig{
863-
Broadcast: true,
864-
// Each signature verification takes .5ms, Size reduced until we implement
865-
// ABCI Recheck
866-
Size: 5000,
867-
MaxTxsBytes: 1024 * 1024 * 1024, // 1GB
868-
CacheSize: 10000,
869-
DuplicateTxsCacheSize: 100000,
870-
MaxTxBytes: 1024 * 1024, // 1MB
871-
TTLDuration: 5 * time.Second, // prevent stale txs from filling mempool
872-
TTLNumBlocks: 10, // remove txs after 10 blocks
873-
TxNotifyThreshold: 0,
885+
Broadcast: true,
886+
Size: cfg.Size,
887+
MaxTxsBytes: cfg.MaxTxsBytes,
888+
CacheSize: cfg.CacheSize,
889+
DuplicateTxsCacheSize: cfg.DuplicateTxsCacheSize,
890+
KeepInvalidTxsInCache: cfg.KeepInvalidTxsInCache,
891+
MaxTxBytes: cfg.MaxTxBytes,
892+
MaxBatchBytes: 0,
893+
TTLDuration: cfg.TTLDuration,
894+
TTLNumBlocks: cfg.TTLNumBlocks,
895+
TxNotifyThreshold: cfg.TxNotifyThreshold,
874896
CheckTxErrorBlacklistEnabled: true,
875897
CheckTxErrorThreshold: 50,
876-
PendingSize: 5000,
877-
MaxPendingTxsBytes: 1024 * 1024 * 1024, // 1GB
878-
PendingTTLDuration: 0 * time.Second,
898+
PendingSize: cfg.PendingSize,
899+
MaxPendingTxsBytes: cfg.MaxPendingTxsBytes,
900+
PendingTTLDuration: 0,
879901
PendingTTLNumBlocks: 0,
880-
RemoveExpiredTxsFromQueue: true,
881-
DropPriorityThreshold: 0.1,
882-
DropUtilisationThreshold: 1.0,
883-
DropPriorityReservoirSize: 10_240,
902+
RemoveExpiredTxsFromQueue: cfg.RemoveExpiredTxsFromQueue,
903+
DropPriorityThreshold: cfg.DropPriorityThreshold,
904+
DropUtilisationThreshold: cfg.DropUtilisationThreshold,
905+
DropPriorityReservoirSize: cfg.DropPriorityReservoirSize,
884906
}
885907
}
886908

887909
// TestMempoolConfig returns a configuration for testing the Tendermint mempool
888910
func TestMempoolConfig() *MempoolConfig {
889911
cfg := DefaultMempoolConfig()
890-
cfg.CacheSize = 1000
891-
cfg.DropUtilisationThreshold = 0.0
912+
testCfg := mempoolcfg.TestConfig()
913+
cfg.CacheSize = testCfg.CacheSize
914+
cfg.DropUtilisationThreshold = testCfg.DropUtilisationThreshold
892915
return cfg
893916
}
894917

sei-tendermint/internal/autobahn/producer/state.go

Lines changed: 32 additions & 23 deletions
Original file line numberDiff line numberDiff line change
@@ -6,8 +6,8 @@ import (
66
"time"
77

88
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/consensus"
9-
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/pb"
109
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/autobahn/types"
10+
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/mempool"
1111
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
1212
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils/scope"
1313
"golang.org/x/time/rate"
@@ -32,19 +32,18 @@ func (c *Config) maxTxsPerBlock() uint64 {
3232

3333
// State is the block producer state.
3434
type State struct {
35-
cfg *Config
36-
// channel of transactions to build the next block from.
37-
mempool chan *pb.Transaction
35+
cfg *Config
36+
txMempool *mempool.TxMempool
3837
// consensus state to which published blocks will be reported.
3938
consensus *consensus.State
4039
}
4140

4241
// NewState constructs a new block producer state.
4342
// Returns an error if the current node is NOT a producer.
44-
func NewState(cfg *Config, consensus *consensus.State) *State {
43+
func NewState(cfg *Config, txMempool *mempool.TxMempool, consensus *consensus.State) *State {
4544
return &State{
4645
cfg: cfg,
47-
mempool: make(chan *pb.Transaction, cfg.MempoolSize),
46+
txMempool: txMempool,
4847
consensus: consensus,
4948
}
5049
}
@@ -54,22 +53,37 @@ func NewState(cfg *Config, consensus *consensus.State) *State {
5453
func (s *State) makePayload(ctx context.Context) *types.Payload {
5554
ctx, cancel := context.WithTimeout(ctx, s.cfg.BlockInterval)
5655
defer cancel()
57-
maxTxs := s.cfg.maxTxsPerBlock()
58-
var totalGas uint64
59-
var txs [][]byte
60-
for totalGas < s.cfg.MaxGasPerBlock && uint64(len(txs)) < maxTxs {
61-
tx, err := utils.Recv(ctx, s.mempool)
62-
if err != nil {
63-
break
56+
57+
if s.txMempool.NumTxsNotPending() == 0 {
58+
select {
59+
case <-ctx.Done():
60+
case <-s.txMempool.TxsAvailable():
6461
}
65-
txs = append(txs, tx.Payload)
66-
totalGas += tx.GasUsed
6762
}
68-
return types.PayloadBuilder{
63+
64+
txs, gasEstimated := s.txMempool.PopTxs(mempool.ReapLimits{
65+
MaxTxs: utils.Some(min(types.MaxTxsPerBlock, s.cfg.maxTxsPerBlock())),
66+
MaxBytes: utils.Some(utils.Clamp[int64](types.MaxTxsBytesPerBlock)),
67+
MaxGasWanted: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)),
68+
MaxGasEstimated: utils.Some(utils.Clamp[int64](s.cfg.MaxGasPerBlock)),
69+
})
70+
payloadTxs := make([][]byte, 0, len(txs))
71+
for _, tx := range txs {
72+
payloadTxs = append(payloadTxs, tx)
73+
}
74+
payload, err := types.PayloadBuilder{
6975
CreatedAt: time.Now(),
70-
TotalGas: totalGas,
71-
Txs: txs,
76+
// TODO: ReapMaxTxsBytesMaxGas does not handle corner cases correctly rn, which actually
77+
// can produce negative total gas. Fixing it right away might be backward incompatible afaict,
78+
// so we leave it as is for now.
79+
TotalGas: uint64(gasEstimated), // nolint:gosec
80+
Txs: payloadTxs,
7281
}.Build()
82+
// This should never happen: we construct the payload from correctly sized data.
83+
if err != nil {
84+
panic(fmt.Errorf("PayloadBuilder{}.Build(): %w", err))
85+
}
86+
return payload
7387
}
7488

7589
// nextPayload constructs the payload for the next block.
@@ -86,11 +100,6 @@ func (s *State) nextPayload(ctx context.Context) (*types.Payload, error) {
86100
}
87101
}
88102

89-
// PushToMempool pushes the transaction to the mempool.
90-
func (s *State) PushToMempool(ctx context.Context, tx *pb.Transaction) error {
91-
return utils.Send(ctx, s.mempool, tx)
92-
}
93-
94103
// Run runs the background tasks of the producer state.
95104
func (s *State) Run(ctx context.Context) error {
96105
return scope.Run(ctx, func(ctx context.Context, scope scope.Scope) error {

sei-tendermint/internal/autobahn/types/block.go

Lines changed: 38 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -72,6 +72,30 @@ func (h *BlockHeader) Verify(c *Committee) error {
7272
return nil
7373
}
7474

75+
const standardTxBytes uint64 = 1024
76+
77+
// Maximum number of transactions in a block.
78+
const MaxTxsPerBlock uint64 = 2000
79+
80+
// Maximum total size of all the transactions.
81+
// It can be split arbitrarily across transactions (1 large, 2000 small ones, etc.)
82+
// up to MaxTxsPerBlock limit.
83+
const MaxTxsBytesPerBlock = MaxTxsPerBlock * standardTxBytes
84+
85+
// Upper bound on the block proto encoding.
86+
var MaxBlockProtoSize = func() uint64 {
87+
// Payload.Txs represents the variable part of the Block size.
88+
// Proto size is maximized if we distribute data evenly across transactions.
89+
tx := make([]byte, standardTxBytes)
90+
txs := make([][]byte, MaxTxsPerBlock)
91+
for i := range txs {
92+
txs[i] = tx
93+
}
94+
// Crude estimate of all other fields.
95+
const otherFields = 100 * 1024
96+
return otherFields + uint64(protoutils.Size(&pb.Block{Payload: &pb.Payload{Txs: txs}}))
97+
}()
98+
7599
// Block .
76100
type Block struct {
77101
utils.ReadOnly
@@ -149,7 +173,19 @@ type Payload struct {
149173
}
150174

151175
// Build builds the Payload.
152-
func (b PayloadBuilder) Build() *Payload { return &Payload{p: b} }
176+
func (b PayloadBuilder) Build() (*Payload, error) {
177+
if uint64(len(b.Txs)) > MaxTxsPerBlock {
178+
return nil, fmt.Errorf("too many transactions")
179+
}
180+
total := uint64(0)
181+
for _, tx := range b.Txs {
182+
total += uint64(len(tx))
183+
}
184+
if total > MaxTxsBytesPerBlock {
185+
return nil, fmt.Errorf("total txs bytes too large")
186+
}
187+
return &Payload{p: b}, nil
188+
}
153189

154190
// ToBuilder converts the Payload to a PayloadBuilder.
155191
func (p *Payload) ToBuilder() PayloadBuilder { return p.p }
@@ -245,7 +281,7 @@ var PayloadConv = protoutils.Conv[*Payload, *pb.Payload]{
245281
Coinbase: p.Coinbase,
246282
Basefee: *p.Basefee,
247283
Txs: p.Txs,
248-
}.Build(), nil
284+
}.Build()
249285
},
250286
}
251287

sei-tendermint/internal/autobahn/types/testonly.go

Lines changed: 2 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -90,14 +90,14 @@ func GenBlockHeader(rng utils.Rng) *BlockHeader {
9090

9191
// GenPayload generates a random Payload.
9292
func GenPayload(rng utils.Rng) *Payload {
93-
return PayloadBuilder{
93+
return utils.OrPanic1(PayloadBuilder{
9494
CreatedAt: utils.GenTimestamp(rng),
9595
TotalGas: rng.Uint64(),
9696
EdgeCount: rng.Int63(),
9797
Coinbase: utils.GenBytes(rng, 10),
9898
Basefee: rng.Int63(),
9999
Txs: utils.GenSlice(rng, func(rng utils.Rng) []byte { return utils.GenBytes(rng, 10) }),
100-
}.Build()
100+
}.Build())
101101
}
102102

103103
// GenBlock generates a random Block.

sei-tendermint/internal/blocksync/reactor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -93,7 +93,7 @@ func makeReactor(
9393
state, err := sm.MakeGenesisState(genDoc)
9494
require.NoError(t, err)
9595
require.NoError(t, stateStore.Save(state))
96-
mp := mempool.NewTxMempool(config.TestMempoolConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher)
96+
mp := mempool.NewTxMempool(mempool.TestConfig(), app, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher)
9797
eventbus := eventbus.NewDefault()
9898
require.NoError(t, eventbus.Start(ctx))
9999

sei-tendermint/internal/consensus/common_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -465,7 +465,7 @@ func newStateWithConfigAndBlockStore(
465465
// Make Mempool
466466

467467
mempool := mempool.NewTxMempool(
468-
thisConfig.Mempool,
468+
thisConfig.Mempool.ToMempoolConfig(),
469469
proxyAppConnMem,
470470
mempool.NopMetrics(),
471471
mempool.NopTxConstraintsFetcher,

sei-tendermint/internal/consensus/mempool_test.go

Lines changed: 2 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -20,6 +20,7 @@ import (
2020
sm "github.com/sei-protocol/sei-chain/sei-tendermint/internal/state"
2121
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/store"
2222
"github.com/sei-protocol/sei-chain/sei-tendermint/internal/test/factory"
23+
"github.com/sei-protocol/sei-chain/sei-tendermint/libs/utils"
2324
"github.com/sei-protocol/sei-chain/sei-tendermint/types"
2425
)
2526

@@ -247,7 +248,7 @@ func TestMempoolRmBadTx(t *testing.T) {
247248

248249
// check for the tx
249250
for {
250-
txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), -1, -1)
251+
txs := cs.txMempool.ReapMaxBytesMaxGas(int64(len(txBytes)), utils.Max[int64](), utils.Max[int64]())
251252
if len(txs) == 0 {
252253
emptyMempoolCh <- struct{}{}
253254
return

sei-tendermint/internal/consensus/reactor_test.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -273,7 +273,7 @@ func TestReactorWithEvidence(t *testing.T) {
273273
proxyAppConnCon := app
274274

275275
mempool := mempool.NewTxMempool(
276-
thisConfig.Mempool,
276+
thisConfig.Mempool.ToMempoolConfig(),
277277
proxyAppConnMem,
278278
mempool.NopMetrics(),
279279
mempool.NopTxConstraintsFetcher,

sei-tendermint/internal/consensus/replay.go

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -135,7 +135,7 @@ func NewHandshaker(
135135
}
136136

137137
func newReplayTxMempool(appClient abci.Application) *mempool.TxMempool {
138-
return mempool.NewTxMempool(config.DefaultMempoolConfig(), appClient, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher)
138+
return mempool.NewTxMempool(config.DefaultMempoolConfig().ToMempoolConfig(), appClient, mempool.NopMetrics(), mempool.NopTxConstraintsFetcher)
139139
}
140140

141141
// NBlocks returns the number of blocks applied to the state.

0 commit comments

Comments
 (0)