-
Notifications
You must be signed in to change notification settings - Fork 245
refactor(syncer,cache): use compare and swap loop and add comments #2873
New issue
Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.
By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.
Already on GitHub? Sign in to your account
Changes from all commits
665854e
f8ebe0a
a24b4af
fedcb3b
5a159b0
a376b8f
9a157c0
79204ae
acf3d0a
File filter
Filter by extension
Conversations
Jump to
Diff view
Diff view
There are no files selected for viewing
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -97,9 +97,14 @@ func (c *Cache[T]) setDAIncluded(hash string, daHeight uint64, blockHeight uint6 | |
| c.hashByHeight.Store(blockHeight, hash) | ||
|
|
||
| // Update max DA height if necessary | ||
| current := c.maxDAHeight.Load() | ||
| if daHeight >= current { | ||
| _ = c.maxDAHeight.CompareAndSwap(current, daHeight) | ||
| for { | ||
| current := c.maxDAHeight.Load() | ||
| if daHeight <= current { | ||
| return | ||
| } | ||
| if c.maxDAHeight.CompareAndSwap(current, daHeight) { | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 best effort to store the highest value |
||
| return | ||
| } | ||
| } | ||
| } | ||
|
|
||
|
|
@@ -250,7 +255,7 @@ func (c *Cache[T]) LoadFromDisk(folderPath string) error { | |
| // Update max DA height during load | ||
| current := c.maxDAHeight.Load() | ||
| if v > current { | ||
| _ = c.maxDAHeight.CompareAndSwap(current, v) | ||
| c.maxDAHeight.Store(v) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. 👍 better! |
||
| } | ||
| } | ||
|
|
||
|
|
||
| Original file line number | Diff line number | Diff line change |
|---|---|---|
|
|
@@ -52,8 +52,8 @@ type Syncer struct { | |
| // State management | ||
| lastState *atomic.Pointer[types.State] | ||
|
|
||
| // DA state | ||
| daHeight *atomic.Uint64 | ||
| // DA retriever height | ||
| daRetrieverHeight *atomic.Uint64 | ||
|
|
||
| // P2P stores | ||
| headerStore common.Broadcaster[*types.SignedHeader] | ||
|
|
@@ -95,29 +95,28 @@ func NewSyncer( | |
| errorCh chan<- error, | ||
| ) *Syncer { | ||
| return &Syncer{ | ||
| store: store, | ||
| exec: exec, | ||
| da: da, | ||
| cache: cache, | ||
| metrics: metrics, | ||
| config: config, | ||
| genesis: genesis, | ||
| options: options, | ||
| headerStore: headerStore, | ||
| dataStore: dataStore, | ||
| lastState: &atomic.Pointer[types.State]{}, | ||
| daHeight: &atomic.Uint64{}, | ||
| heightInCh: make(chan common.DAHeightEvent, 1_000), | ||
| errorCh: errorCh, | ||
| logger: logger.With().Str("component", "syncer").Logger(), | ||
| store: store, | ||
| exec: exec, | ||
| da: da, | ||
| cache: cache, | ||
| metrics: metrics, | ||
| config: config, | ||
| genesis: genesis, | ||
| options: options, | ||
| headerStore: headerStore, | ||
| dataStore: dataStore, | ||
| lastState: &atomic.Pointer[types.State]{}, | ||
| daRetrieverHeight: &atomic.Uint64{}, | ||
| heightInCh: make(chan common.DAHeightEvent, 1_000), | ||
| errorCh: errorCh, | ||
| logger: logger.With().Str("component", "syncer").Logger(), | ||
| } | ||
| } | ||
|
|
||
| // Start begins the syncing component | ||
| func (s *Syncer) Start(ctx context.Context) error { | ||
| s.ctx, s.cancel = context.WithCancel(ctx) | ||
|
|
||
| // Initialize state | ||
| if err := s.initializeState(); err != nil { | ||
| return fmt.Errorf("failed to initialize syncer state: %w", err) | ||
| } | ||
|
|
@@ -131,12 +130,12 @@ func (s *Syncer) Start(ctx context.Context) error { | |
| s.p2pHandler.SetProcessedHeight(currentHeight) | ||
| } | ||
|
|
||
| if !s.waitForGenesis() { | ||
| return nil | ||
| } | ||
|
|
||
| // Start main processing loop | ||
| s.wg.Add(1) | ||
| go func() { | ||
| defer s.wg.Done() | ||
| s.processLoop() | ||
| }() | ||
| go s.processLoop() | ||
|
|
||
| // Start dedicated workers for DA, and pending processing | ||
| s.startSyncWorkers() | ||
|
|
@@ -175,16 +174,6 @@ func (s *Syncer) SetLastState(state types.State) { | |
| s.lastState.Store(&state) | ||
| } | ||
|
|
||
| // GetDAHeight returns the current DA height | ||
| func (s *Syncer) GetDAHeight() uint64 { | ||
| return max(s.daHeight.Load(), s.cache.DaHeight()) | ||
| } | ||
|
|
||
| // SetDAHeight updates the DA height | ||
| func (s *Syncer) SetDAHeight(height uint64) { | ||
| s.daHeight.Store(height) | ||
| } | ||
|
|
||
| // initializeState loads the current sync state | ||
| func (s *Syncer) initializeState() error { | ||
| // Load state from store | ||
|
|
@@ -216,14 +205,13 @@ func (s *Syncer) initializeState() error { | |
| } | ||
| s.SetLastState(state) | ||
|
|
||
| // Set DA height | ||
| // we get the max from the genesis da height, the state da height and the cache (fetched) da height | ||
| // if a user has messed up and sync da too far ahead, on restart they can clear the cache (--clear-cache) and the retrieve will restart fetching from the last known block synced and executed from DA or the set genesis da height. | ||
| s.SetDAHeight(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight)) | ||
| // Set DA height to the maximum of the genesis start height, the state's DA height, and the cached DA height. | ||
| // This ensures we resume from the highest known DA height, even if the cache is cleared on restart. If the DA height is too high because of a user error, reset it with --evnode.clear_cache. The DA height will be back to the last highest known executed DA height for a height. | ||
| s.daRetrieverHeight.Store(max(s.genesis.DAStartHeight, s.cache.DaHeight(), state.DAHeight)) | ||
|
|
||
| s.logger.Info(). | ||
| Uint64("height", state.LastBlockHeight). | ||
| Uint64("da_height", s.GetDAHeight()). | ||
| Uint64("da_height", s.daRetrieverHeight.Load()). | ||
| Str("chain_id", state.ChainID). | ||
| Msg("initialized syncer state") | ||
|
|
||
|
|
@@ -238,6 +226,9 @@ func (s *Syncer) initializeState() error { | |
|
|
||
| // processLoop is the main coordination loop for processing events | ||
| func (s *Syncer) processLoop() { | ||
| s.wg.Add(1) | ||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. It is not likely in our use case that start and stop are called immediately but this would cause a race between Add/Wait. The previous version was safe.
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. oh true, because this is in the go routine. I was trying to make it consistent with startSyncWorkers |
||
| defer s.wg.Done() | ||
|
|
||
| s.logger.Info().Msg("starting process loop") | ||
| defer s.logger.Info().Msg("process loop stopped") | ||
|
|
||
|
|
@@ -261,10 +252,6 @@ func (s *Syncer) startSyncWorkers() { | |
| func (s *Syncer) daWorkerLoop() { | ||
| defer s.wg.Done() | ||
|
|
||
| if !s.waitForGenesis() { | ||
| return | ||
| } | ||
|
|
||
| s.logger.Info().Msg("starting DA worker") | ||
| defer s.logger.Info().Msg("DA worker stopped") | ||
|
|
||
|
|
@@ -299,13 +286,13 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { | |
| default: | ||
| } | ||
|
|
||
| daHeight := s.GetDAHeight() | ||
| daHeight := max(s.daRetrieverHeight.Load(), s.cache.DaHeight()) | ||
|
|
||
| events, err := s.daRetriever.RetrieveFromDA(s.ctx, daHeight) | ||
| if err != nil { | ||
| switch { | ||
| case errors.Is(err, coreda.ErrBlobNotFound): | ||
| s.SetDAHeight(daHeight + 1) | ||
| s.daRetrieverHeight.Store(daHeight + 1) | ||
| continue // Fetch next height immediately | ||
| case errors.Is(err, coreda.ErrHeightFromFuture): | ||
| s.logger.Debug().Err(err).Uint64("da_height", daHeight).Msg("DA is ahead of local target; backing off future height requests") | ||
|
|
@@ -330,18 +317,14 @@ func (s *Syncer) fetchDAUntilCaughtUp() error { | |
| } | ||
| } | ||
|
|
||
| // increment DA height on successful retrieval | ||
| s.SetDAHeight(daHeight + 1) | ||
| // increment DA retrieval height on successful retrieval | ||
| s.daRetrieverHeight.Store(daHeight + 1) | ||
| } | ||
| } | ||
|
|
||
| func (s *Syncer) pendingWorkerLoop() { | ||
| defer s.wg.Done() | ||
|
|
||
| if !s.waitForGenesis() { | ||
| return | ||
| } | ||
|
|
||
| s.logger.Info().Msg("starting pending worker") | ||
| defer s.logger.Info().Msg("pending worker stopped") | ||
|
|
||
|
|
@@ -361,10 +344,6 @@ func (s *Syncer) pendingWorkerLoop() { | |
| func (s *Syncer) p2pWorkerLoop() { | ||
| defer s.wg.Done() | ||
|
|
||
| if !s.waitForGenesis() { | ||
| return | ||
| } | ||
|
|
||
| logger := s.logger.With().Str("worker", "p2p").Logger() | ||
| logger.Info().Msg("starting P2P worker") | ||
| defer logger.Info().Msg("P2P worker stopped") | ||
|
|
@@ -545,13 +524,14 @@ func (s *Syncer) trySyncNextBlock(event *common.DAHeightEvent) error { | |
| return err | ||
| } | ||
|
|
||
| // Apply block | ||
| newState, err := s.applyBlock(header.Header, data, currentState) | ||
| if err != nil { | ||
| return fmt.Errorf("failed to apply block: %w", err) | ||
| } | ||
|
|
||
| // Update DA height if needed | ||
| // This height is only updated when a height is processed from DA as P2P | ||
| // events do not contain DA height information | ||
| if event.DaHeight > newState.DAHeight { | ||
| newState.DAHeight = event.DaHeight | ||
| } | ||
|
|
@@ -677,15 +657,6 @@ func (s *Syncer) sendCriticalError(err error) { | |
| } | ||
| } | ||
|
|
||
| // sendNonBlockingSignal sends a signal without blocking | ||
| func (s *Syncer) sendNonBlockingSignal(ch chan struct{}, name string) { | ||
| select { | ||
| case ch <- struct{}{}: | ||
| default: | ||
| s.logger.Debug().Str("channel", name).Msg("channel full, signal dropped") | ||
| } | ||
| } | ||
|
|
||
| // processPendingEvents fetches and processes pending events from cache | ||
| // optimistically fetches the next events from cache until no matching heights are found | ||
| func (s *Syncer) processPendingEvents() { | ||
|
|
||
| Original file line number | Diff line number | Diff line change | ||||
|---|---|---|---|---|---|---|
|
|
@@ -58,7 +58,7 @@ func BenchmarkSyncerIO(b *testing.B) { | |||||
| } | ||||||
| require.Len(b, fixt.s.heightInCh, 0) | ||||||
|
|
||||||
| assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daHeight) | ||||||
| assert.Equal(b, spec.heights+daHeightOffset, fixt.s.daRetrieverHeight) | ||||||
|
Contributor
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more.
Suggested change
Member
Author
There was a problem hiding this comment. Choose a reason for hiding this commentThe reason will be displayed to describe this comment to others. Learn more. have this test ever passed? benchmarks aren't run in CI 😅 |
||||||
| gotStoreHeight, err := fixt.s.store.Height(b.Context()) | ||||||
| require.NoError(b, err) | ||||||
| assert.Equal(b, spec.heights, gotStoreHeight) | ||||||
|
|
||||||
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
personal preference: not likely to loop forever but why not put a reasonable max times here?
There was a problem hiding this comment.
Choose a reason for hiding this comment
The reason will be displayed to describe this comment to others. Learn more.
consistency with SetProcessedHeight that uses the same pattern. wouldn't be the max an arbitrary number?