Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion apps/evm/single/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -109,7 +109,7 @@ func NewRollbackCmd() *cobra.Command {

fmt.Printf("Rolled back ev-node state to height %d\n", height)
if syncNode {
fmt.Println("Restart the node with the `--clear-cache` flag")
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
}

return errs
Expand Down
2 changes: 1 addition & 1 deletion apps/testapp/cmd/rollback.go
Original file line number Diff line number Diff line change
Expand Up @@ -120,7 +120,7 @@ func NewRollbackCmd() *cobra.Command {

fmt.Printf("Rolled back ev-node state to height %d\n", height)
if syncNode {
fmt.Println("Restart the node with the `--clear-cache` flag")
fmt.Println("Restart the node with the `--evnode.clear_cache` flag")
}

return errs
Expand Down
13 changes: 9 additions & 4 deletions block/internal/cache/generic_cache.go
Original file line number Diff line number Diff line change
Expand Up @@ -97,9 +97,14 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6
c.hashByHeight.Store(blockHeight, hash)

// Update max DA height if necessary
current := c.maxDAHeight.Load()
if daHeight >= current {
_ = c.maxDAHeight.CompareAndSwap(current, daHeight)
for {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

personal preference: not likely to loop forever but why not put a reasonable max times here?

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

consistency with SetProcessedHeight that uses the same pattern. wouldn't be the max an arbitrary number?

current := c.maxDAHeight.Load()
if daHeight <= current {
return
}
if c.maxDAHeight.CompareAndSwap(current, daHeight) {
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 best effort to store the highest value

return
}
}
}

Expand Down Expand Up @@ -250,7 +255,7 @@ func (c *Cache[T]) LoadFromDisk(folderPath string) error {
// Update max DA height during load
current := c.maxDAHeight.Load()
if v > current {
_ = c.maxDAHeight.CompareAndSwap(current, v)
c.maxDAHeight.Store(v)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

👍 better!
Could even move this out of the loop and update the value at the end?

}
}

Expand Down
99 changes: 35 additions & 64 deletions block/internal/syncing/syncer.go
Original file line number Diff line number Diff line change
Expand Up @@ -52,8 +52,8 @@ type Syncer struct {
// State management
lastState *atomic.Pointer[types.State]

// DA state
daHeight *atomic.Uint64
// DA retriever height
daRetrieverHeight *atomic.Uint64

// P2P stores
headerStore common.Broadcaster[*types.SignedHeader]
Expand Down Expand Up @@ -95,29 +95,28 @@ func NewSyncer(
errorCh chan<- error,
) *Syncer {
return &Syncer{
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerStore: headerStore,
dataStore: dataStore,
lastState: &atomic.Pointer[types.State]{},
daHeight: &atomic.Uint64{},
heightInCh: make(chan common.DAHeightEvent, 1_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
store: store,
exec: exec,
da: da,
cache: cache,
metrics: metrics,
config: config,
genesis: genesis,
options: options,
headerStore: headerStore,
dataStore: dataStore,
lastState: &atomic.Pointer[types.State]{},
daRetrieverHeight: &atomic.Uint64{},
heightInCh: make(chan common.DAHeightEvent, 1_000),
errorCh: errorCh,
logger: logger.With().Str("component", "syncer").Logger(),
}
}

// Start begins the syncing component
func (s *Syncer) Start(ctx context.Context) error {
s.ctx, s.cancel = context.WithCancel(ctx)

// Initialize state
if err := s.initializeState(); err != nil {
return fmt.Errorf("failed to initialize syncer state: %w", err)
}
Expand All @@ -131,12 +130,12 @@ func (s *Syncer) Start(ctx context.Context) error {
s.p2pHandler.SetProcessedHeight(currentHeight)
}

if !s.waitForGenesis() {
return nil
}

// Start main processing loop
s.wg.Add(1)
go func() {
defer s.wg.Done()
s.processLoop()
}()
go s.processLoop()

// Start dedicated workers for DA, and pending processing
s.startSyncWorkers()
Expand Down Expand Up @@ -175,16 +174,6 @@ func (s *Syncer) SetLastState(state types.State) {
s.lastState.Store(&state)
}

// GetDAHeight returns the current DA height
func (s *Syncer) GetDAHeight() uint64 {
return max(s.daHeight.Load(), s.cache.DaHeight())
}

// SetDAHeight updates the DA height
func (s *Syncer) SetDAHeight(height uint64) {
s.daHeight.Store(height)
}

// initializeState loads the current sync state
func (s *Syncer) initializeState() error {
// Load state from store
Expand Down Expand Up @@ -216,14 +205,13 @@ func (s *Syncer) initializeState() error {
}
s.SetLastState(state)

// Set DA height
// we get the max from the genesis da height, the state da height and the cache (fetched) da height
// if a user has messed up and sync da too far ahead, on restart they can clear the cache (--clear-cache) and the retrieve will restart fetching from the last known block synced and executed from DA or the set genesis da height.
s.SetDAHeight(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))
// Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height.
// This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height.
s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight))

s.logger.Info().
Uint64("height", state.LastBlockHeight).
Uint64("da_height", s.GetDAHeight()).
Uint64("da_height", s.daRetrieverHeight.Load()).
Str("chain_id", state.ChainID).
Msg("initialized syncer state")

Expand All @@ -238,6 +226,9 @@ func (s *Syncer) initializeState() error {

// processLoop is the main coordination loop for processing events
func (s *Syncer) processLoop() {
s.wg.Add(1)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

It is not likely in our use case that start and stop are called immediately but this would cause a race between Add/Wait. The previous version was safe.

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

oh true, because this is in the go routine. I was trying to make it consistent with startSyncWorkers

defer s.wg.Done()

s.logger.Info().Msg("starting process loop")
defer s.logger.Info().Msg("process loop stopped")

Expand All @@ -261,10 +252,6 @@ func (s *Syncer) startSyncWorkers() {
func (s *Syncer) daWorkerLoop() {
defer s.wg.Done()

if !s.waitForGenesis() {
return
}

s.logger.Info().Msg("starting DA worker")
defer s.logger.Info().Msg("DA worker stopped")

Expand Down Expand Up @@ -299,13 +286,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
default:
}

daHeight := s.GetDAHeight()
daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight())

events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight)
if err != nil {
switch {
case errors.Is(err, coreda.ErrBlobNotFound):
s.SetDAHeight(daHeight + 1)
s.daRetrieverHeight.Store(daHeight + 1)
continue // Fetch next height immediately
case errors.Is(err, coreda.ErrHeightFromFuture):
s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests")
Expand All @@ -330,18 +317,14 @@ func (s *Syncer) fetchDAUntilCaughtUp() error {
}
}

// increment DA height on successful retrieval
s.SetDAHeight(daHeight + 1)
// increment DA retrieval height on successful retrieval
s.daRetrieverHeight.Store(daHeight + 1)
}
}

func (s *Syncer) pendingWorkerLoop() {
defer s.wg.Done()

if !s.waitForGenesis() {
return
}

s.logger.Info().Msg("starting pending worker")
defer s.logger.Info().Msg("pending worker stopped")

Expand All @@ -361,10 +344,6 @@ func (s *Syncer) pendingWorkerLoop() {
func (s *Syncer) p2pWorkerLoop() {
defer s.wg.Done()

if !s.waitForGenesis() {
return
}

logger := s.logger.With().Str("worker", "p2p").Logger()
logger.Info().Msg("starting P2P worker")
defer logger.Info().Msg("P2P worker stopped")
Expand Down Expand Up @@ -545,13 +524,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error {
return err
}

// Apply block
newState, err := s.applyBlock(header.Header, data, currentState)
if err != nil {
return fmt.Errorf("failed to apply block: %w", err)
}

// Update DA height if needed
// This height is only updated when a height is processed from DA as P2P
// events do not contain DA height information
if event.DaHeight > newState.DAHeight {
newState.DAHeight = event.DaHeight
}
Expand Down Expand Up @@ -677,15 +657,6 @@ func (s *Syncer) sendCriticalError(err error) {
}
}

// sendNonBlockingSignal sends a signal without blocking
func (s *Syncer) sendNonBlockingSignal(ch chan struct{}, name string) {
select {
case ch <- struct{}{}:
default:
s.logger.Debug().Str("channel", name).Msg("channel full, signal dropped")
}
}

// processPendingEvents fetches and processes pending events from cache
// optimistically fetches the next events from cache until no matching heights are found
func (s *Syncer) processPendingEvents() {
Expand Down
2 changes: 1 addition & 1 deletion block/internal/syncing/syncer_benchmark_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -58,7 +58,7 @@ func BenchmarkSyncerIO(b *testing.B) {
}
require.Len(b, fixt.s.heightInCh, 0)

assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight)
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
Copy link
Contributor

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

Suggested change
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight)
assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight.Load())

Copy link
Member Author

Choose a reason for hiding this comment

The reason will be displayed to describe this comment to others. Learn more.

have this test ever passed? benchmarks aren't run in CI 😅

gotStoreHeight, err := fixt.s.store.Height(b.Context())
require.NoError(b, err)
assert.Equal(b, spec.heights, gotStoreHeight)
Expand Down
37 changes: 10 additions & 27 deletions block/internal/syncing/syncer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -274,23 +274,6 @@ func TestSequentialBlockSync(t *testing.T) {
requireEmptyChan(t, errChan)
}

func TestSyncer_sendNonBlockingSignal(t *testing.T) {
s := &Syncer{logger: zerolog.Nop()}
ch := make(chan struct{}, 1)
ch <- struct{}{}
done := make(chan struct{})
go func() {
s.sendNonBlockingSignal(ch, "test")
close(done)
}()
select {
case <-done:
// ok
case <-time.After(200 * time.Millisecond):
t.Fatal("sendNonBlockingSignal blocked unexpectedly")
}
}

func TestSyncer_processPendingEvents(t *testing.T) {
ds := dssync.MutexWrap(datastore.NewMapDatastore())
st := store.New(ds)
Expand Down Expand Up @@ -432,7 +415,7 @@ func TestSyncLoopPersistState(t *testing.T) {
requireEmptyChan(t, errorCh)

t.Log("sync workers on instance1 completed")
require.Equal(t, myFutureDAHeight, syncerInst1.GetDAHeight())
require.Equal(t, myFutureDAHeight, syncerInst1.daRetrieverHeight.Load())

// wait for all events consumed
require.NoError(t, cacheMgr.SaveToDisk())
Expand Down Expand Up @@ -482,7 +465,7 @@ func TestSyncLoopPersistState(t *testing.T) {
Run(func(arg mock.Arguments) {
cancel()
// retrieve last one again
assert.Equal(t, syncerInst2.GetDAHeight(), arg.Get(1).(uint64))
assert.Equal(t, syncerInst2.daRetrieverHeight.Load(), arg.Get(1).(uint64))
}).
Return(nil, nil)

Expand Down Expand Up @@ -623,14 +606,14 @@ func TestSyncer_InitializeState_CallsReplayer(t *testing.T) {

// Create syncer with minimal dependencies
syncer := &Syncer{
store: mockStore,
exec: mockExec,
genesis: gen,
lastState: &atomic.Pointer[types.State]{},
daHeight: &atomic.Uint64{},
logger: zerolog.Nop(),
ctx: context.Background(),
cache: cm,
store: mockStore,
exec: mockExec,
genesis: gen,
lastState: &atomic.Pointer[types.State]{},
daRetrieverHeight: &atomic.Uint64{},
logger: zerolog.Nop(),
ctx: context.Background(),
cache: cm,
}

// Initialize state - this should call Replayer
Expand Down
Loading