Skip to content
This repository was archived by the owner on Nov 25, 2025. It is now read-only.
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
17 changes: 17 additions & 0 deletions plugin/evm/atomic/sync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -146,6 +146,23 @@ func (s *Syncer) Sync(ctx context.Context) error {
return s.syncer.Sync(ctx)
}

// UpdateSyncTarget updates the target summary to sync to.
// TODO(alarso16): see https://github.com/ava-labs/coreth/issues/1143
func (*Syncer) UpdateSyncTarget(message.Syncable) error {
return errors.New("not yet implemented")
}

// Finalize is called after Sync completes to ensure all data matches the expected state.
// This may block after UpdateSyncTarget is implemented.
func (s *Syncer) Finalize(_ context.Context, summary message.Syncable) error {
// Ensure we synced to the expected target height.
if summary.Height() != s.targetHeight {
return fmt.Errorf("expected to sync to height %d but synced to %d", s.targetHeight, summary.Height())
}

return nil
}

// addZeroes returns the big-endian representation of `height`, prefixed with [common.HashLength] zeroes.
func addZeroes(height uint64) []byte {
// Key format is [height(8 bytes)][blockchainID(32 bytes)]. Start should be the
Expand Down
10 changes: 8 additions & 2 deletions plugin/evm/vmsync/doubles_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,9 +9,13 @@ import (
"sync"
"time"

"github.com/ava-labs/coreth/plugin/evm/message"

syncpkg "github.com/ava-labs/coreth/sync"
)

var _ syncpkg.Syncer = FuncSyncer{}

// FuncSyncer adapts a function to the simple Syncer shape used in tests. It is
// useful for defining small, behavior-driven syncers inline.
type FuncSyncer struct {
Expand All @@ -21,12 +25,14 @@ type FuncSyncer struct {
// Sync calls the wrapped function and returns its result.
func (f FuncSyncer) Sync(ctx context.Context) error { return f.fn(ctx) }

// Unnecessary for these tests.
func (FuncSyncer) UpdateSyncTarget(message.Syncable) error { return nil }
func (FuncSyncer) Finalize(context.Context, message.Syncable) error { return nil }

// Name returns the provided name or a default if unspecified.
func (FuncSyncer) Name() string { return "Test Name" }
func (FuncSyncer) ID() string { return "test_id" }

var _ syncpkg.Syncer = FuncSyncer{}

// NewBarrierSyncer returns a syncer that, upon entering Sync, calls wg.Done() to
// signal it has started, then blocks until either:
// - `releaseCh` is closed, returning nil; or
Expand Down
65 changes: 62 additions & 3 deletions plugin/evm/vmsync/registry.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
"context"
"errors"
"fmt"
"sync"

"github.com/ava-labs/libevm/log"
"golang.org/x/sync/errgroup"
Expand All @@ -16,7 +17,10 @@ import (
synccommon "github.com/ava-labs/coreth/sync"
)

var errSyncerAlreadyRegistered = errors.New("syncer already registered")
var (
errSyncerAlreadyRegistered = errors.New("syncer already registered")
errUpdateSyncTargetNotAllowed = errors.New("UpdateSyncTarget not allowed while syncers are not running")
)

// SyncerTask represents a single syncer with its name for identification.
type SyncerTask struct {
Expand All @@ -28,6 +32,11 @@ type SyncerTask struct {
type SyncerRegistry struct {
syncers []SyncerTask
registeredNames map[string]bool // Track registered IDs to prevent duplicates.

// This lock protects access to target and updateTargetAllowed.
targetLock sync.Mutex
target message.Syncable
updateTargetAllowed bool // Only allowed while syncers are running.
}

// NewSyncerRegistry creates a new empty syncer registry.
Expand Down Expand Up @@ -61,12 +70,12 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syn
summaryBlockHashHex := summary.GetBlockHash().Hex()
blockHeight := summary.Height()

g, ctx := errgroup.WithContext(ctx)
g, egCtx := errgroup.WithContext(ctx)

for _, task := range r.syncers {
g.Go(func() error {
log.Info("starting syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
if err := task.syncer.Sync(ctx); err != nil {
if err := task.syncer.Sync(egCtx); err != nil {
log.Error("failed syncing", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight, "err", err)
return fmt.Errorf("%s failed: %w", task.name, err)
}
Expand All @@ -76,11 +85,61 @@ func (r *SyncerRegistry) RunSyncerTasks(ctx context.Context, summary message.Syn
})
}

// Allow UpdateSyncTarget calls while waiting for syncers to complete.
r.targetLock.Lock()
r.updateTargetAllowed = true
r.target = summary
r.targetLock.Unlock()

if err := g.Wait(); err != nil {
return err
}

log.Info("all syncers completed successfully", "count", len(r.syncers), "summary", summaryBlockHashHex)

// At this point, UpdateSyncTarget should no longer be called, so we should prevent it.
r.targetLock.Lock()
r.updateTargetAllowed = false
finalTarget := r.target
r.targetLock.Unlock()

// Finalize each syncer to ensure all data matches the expected state.
// Use a new error group to finalize in parallel.
g, egCtx = errgroup.WithContext(ctx)
for _, task := range r.syncers {
log.Info("finalizing syncer", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight)
g.Go(func() error {
if err := task.syncer.Finalize(egCtx, finalTarget); err != nil {
log.Error("failed finalizing", "name", task.name, "summary", summaryBlockHashHex, "height", blockHeight, "err", err)
return fmt.Errorf("%s finalize failed: %w", task.name, err)
}
return nil
})
}

return g.Wait()
}

// UpdateSyncTarget updates the target for all registered syncers.
func (r *SyncerRegistry) UpdateSyncTarget(summary message.Syncable) error {
r.targetLock.Lock()
defer r.targetLock.Unlock()

if !r.updateTargetAllowed {
return errUpdateSyncTargetNotAllowed
}

// Since UpdateSyncTarget can block, we call each syncer in its own goroutine.
var eg errgroup.Group
for _, task := range r.syncers {
eg.Go(func() error {
return task.syncer.UpdateSyncTarget(summary)
})
}
if err := eg.Wait(); err != nil {
return fmt.Errorf("failed to update sync target: %w", err)
}

r.target = summary
return nil
}
95 changes: 91 additions & 4 deletions plugin/evm/vmsync/registry_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,9 +24,11 @@ var _ syncpkg.Syncer = (*mockSyncer)(nil)

// mockSyncer implements [syncpkg.Syncer] for testing.
type mockSyncer struct {
name string
syncError error
started bool // Track if already started
name string
syncError error
started bool // Track if already started
updateCount int // Track number of UpdateSyncTarget calls
finalizeTarget message.Syncable
}

func newMockSyncer(name string, syncError error) *mockSyncer {
Expand All @@ -38,6 +40,16 @@ func (m *mockSyncer) Sync(context.Context) error {
return m.syncError
}

func (m *mockSyncer) UpdateSyncTarget(message.Syncable) error {
m.updateCount++
return nil
}

func (m *mockSyncer) Finalize(_ context.Context, target message.Syncable) error {
m.finalizeTarget = target
return nil
}

func (m *mockSyncer) Name() string { return m.name }
func (m *mockSyncer) ID() string { return m.name }

Expand All @@ -47,9 +59,17 @@ type namedSyncer struct {
syncer syncpkg.Syncer
}

func (n *namedSyncer) Sync(ctx context.Context) error { return n.syncer.Sync(ctx) }
func (n *namedSyncer) Name() string { return n.name }
func (n *namedSyncer) ID() string { return n.name }
func (n *namedSyncer) Sync(ctx context.Context) error { return n.syncer.Sync(ctx) }

func (n *namedSyncer) UpdateSyncTarget(target message.Syncable) error {
return n.syncer.UpdateSyncTarget(target)
}

func (n *namedSyncer) Finalize(ctx context.Context, summary message.Syncable) error {
return n.syncer.Finalize(ctx, summary)
}

// syncerConfig describes a test syncer setup for RunSyncerTasks table tests.
type syncerConfig struct {
Expand Down Expand Up @@ -307,6 +327,73 @@ func TestSyncerRegistry_NoSyncersRegistered(t *testing.T) {
require.NoError(t, registry.RunSyncerTasks(ctx, newTestClientSummary(t)))
}

func TestSyncerRegistry_UpdateSyncTarget(t *testing.T) {
t.Parallel()

registry := NewSyncerRegistry()
ctx, cancel := utilstest.NewTestContext(t)
t.Cleanup(cancel)

var allStartedWG sync.WaitGroup
allStartedWG.Add(1)
releaseCh := make(chan struct{})
barrierSyncer := NewBarrierSyncer(&allStartedWG, releaseCh)
registry.Register(barrierSyncer)

// Create a tracked syncer to verify UpdateSyncTarget calls
syncer := newMockSyncer("UpdateTargetSyncer", nil)
registry.Register(syncer)

doneCh := make(chan error, 1)
go func() { doneCh <- registry.RunSyncerTasks(ctx, newTestClientSummary(t)) }()

utilstest.WaitGroupWithTimeout(t, &allStartedWG, 2*time.Second, "timed out waiting for barrier syncers to start")

customSummary, err := message.NewBlockSyncSummary(common.HexToHash("0xabcdef"), 1234, common.HexToHash("0xabcdef"))
require.NoError(t, err)
require.NoError(t, registry.UpdateSyncTarget(customSummary))
close(releaseCh)

require.NoError(t, utilstest.WaitErrWithTimeout(t, doneCh, 4*time.Second))
require.Equal(t, 1, syncer.updateCount)
require.Equal(t, customSummary, syncer.finalizeTarget)
}

func TestSyncerRegistry_UpdateSyncTargetNotAllowed(t *testing.T) {
t.Parallel()

registry := NewSyncerRegistry()
ctx, cancel := utilstest.NewTestContext(t)
t.Cleanup(cancel)

const numUpdateSyncers = 3
var mockSyncers []*mockSyncer
for i := 0; i < numUpdateSyncers; i++ {
name := fmt.Sprintf("UpdateSyncer-%d", i)
mockSyncer := newMockSyncer(name, nil)
mockSyncers = append(mockSyncers, mockSyncer)
require.NoError(t, registry.Register(&namedSyncer{name: name, syncer: mockSyncer}))
}

summary := newTestClientSummary(t)

// Call UpdateSyncTarget before starting syncers
err := registry.UpdateSyncTarget(summary)
require.ErrorIs(t, err, errUpdateSyncTargetNotAllowed)

// First call to RunSyncerTasks
require.NoError(t, registry.RunSyncerTasks(ctx, summary))

// Call UpdateSyncTarget after syncers have completed
err = registry.UpdateSyncTarget(summary)
require.ErrorIs(t, err, errUpdateSyncTargetNotAllowed)

// Verify UpdateSyncTarget was not called for each syncer
for i, mockSyncer := range mockSyncers {
require.Zero(t, mockSyncer.updateCount, "UpdateSyncTarget was called for syncer %d", i)
}
}

func newTestClientSummary(t *testing.T) message.Syncable {
t.Helper()
summary, err := message.NewBlockSyncSummary(common.HexToHash("0xdeadbeef"), 1000, common.HexToHash("0xdeadbeef"))
Expand Down
21 changes: 18 additions & 3 deletions sync/blocksync/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -13,16 +13,19 @@ import (
"github.com/ava-labs/libevm/ethdb"
"github.com/ava-labs/libevm/log"

"github.com/ava-labs/coreth/plugin/evm/message"

syncpkg "github.com/ava-labs/coreth/sync"
statesyncclient "github.com/ava-labs/coreth/sync/client"
)

const blocksPerRequest = 32

var (
_ syncpkg.Syncer = (*BlockSyncer)(nil)
errBlocksToFetchRequired = errors.New("blocksToFetch must be > 0")
errFromHashRequired = errors.New("fromHash must be non-zero when fromHeight > 0")
_ syncpkg.Syncer = (*BlockSyncer)(nil)

errBlocksToFetchRequired = errors.New("blocksToFetch must be > 0")
errFromHashRequired = errors.New("fromHash must be non-zero when fromHeight > 0")
)

type BlockSyncer struct {
Expand Down Expand Up @@ -112,3 +115,15 @@ func (s *BlockSyncer) Sync(ctx context.Context) error {
log.Info("fetched blocks from peer", "total", blocksToFetch)
return batch.Write()
}

// UpdateSyncTarget updates the target summary to sync to.
// TODO(alarso16): see https://github.com/ava-labs/coreth/issues/1260
func (*BlockSyncer) UpdateSyncTarget(message.Syncable) error {
return errors.New("not yet implemented")
}

// Finalize asserts that all blocks have been requested and written to disk.
// This is a no-op until UpdateSyncTarget is implemented.
func (*BlockSyncer) Finalize(context.Context, message.Syncable) error {
return nil
}
12 changes: 12 additions & 0 deletions sync/statesync/code_syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package statesync

import (
"context"
"errors"
"fmt"
"sync"

Expand Down Expand Up @@ -115,6 +116,17 @@ func (c *CodeSyncer) Sync(ctx context.Context) error {
return eg.Wait()
}

// UpdateSyncTarget is not yet implemented.
// TODO(alarso16): see https://github.com/ava-labs/coreth/issues/1341
func (*CodeSyncer) UpdateSyncTarget(message.Syncable) error {
return errors.New("not yet implemented")
}

// Finalize is currently a no-op since UpdateSyncTarget is not implemented.
func (*CodeSyncer) Finalize(context.Context, message.Syncable) error {
return nil
}

// work fulfills any incoming requests from the producer channel by fetching code bytes from the network
// and fulfilling them by updating the database.
func (c *CodeSyncer) work(ctx context.Context) error {
Expand Down
Loading
Loading