diff --git a/.mockery.yaml b/.mockery.yaml index 2694096012..8f139231cb 100644 --- a/.mockery.yaml +++ b/.mockery.yaml @@ -48,11 +48,11 @@ packages: filename: external/hstore.go github.com/evstack/ev-node/block/internal/syncing: interfaces: - daRetriever: + DARetriever: config: dir: ./block/internal/syncing pkgname: syncing - filename: syncer_mock.go + filename: da_retriever_mock.go p2pHandler: config: dir: ./block/internal/syncing diff --git a/CHANGELOG.md b/CHANGELOG.md index 2890d4c141..2334220995 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -12,6 +12,7 @@ and this project adheres to [Semantic Versioning](https://semver.org/spec/v2.0.0 ### Changed - Rename `evm-single` to `evm` and `grpc-single` to `evgrpc` for clarity. [#2839](https://github.com/evstack/ev-node/pull/2839) +- Split cache interface in `CacheManager` and `PendingManager` and create `da` client to easy DA handling. [#2878](https://github.com/evstack/ev-node/pull/2878) ## v1.0.0-beta.10 diff --git a/apps/evm/go.sum b/apps/evm/go.sum index f23ed73ad8..173fb3ba0d 100644 --- a/apps/evm/go.sum +++ b/apps/evm/go.sum @@ -760,6 +760,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/apps/grpc/go.sum b/apps/grpc/go.sum index fc5f99db73..c2e0e46a7d 100644 --- a/apps/grpc/go.sum +++ b/apps/grpc/go.sum @@ -654,6 +654,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/apps/testapp/go.sum b/apps/testapp/go.sum index a0cadf7da7..eeafba1a84 100644 --- a/apps/testapp/go.sum +++ b/apps/testapp/go.sum @@ -653,6 +653,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/block/components.go b/block/components.go index 4b493b5a4f..546cda62c3 100644 --- a/block/components.go +++ b/block/components.go @@ -144,13 +144,15 @@ func NewSyncComponents( return nil, fmt.Errorf("failed to create cache manager: %w", err) } + daClient := NewDAClient(da, config, logger) + // error channel for critical failures errorCh := make(chan error, 1) syncer := syncing.NewSyncer( store, exec, - da, + daClient, cacheManager, metrics, config, @@ -162,8 +164,8 @@ func NewSyncComponents( errorCh, ) - // Create DA submitter for sync nodes (no signer, only DA inclusion processing) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) + // Create submitter for sync nodes (no signer, only DA inclusion processing) + daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, @@ -243,8 +245,9 @@ func NewAggregatorComponents( return nil, fmt.Errorf("failed to create reaper: %w", err) } - // Create DA submitter for aggregator nodes (with signer for submission) - daSubmitter := submitting.NewDASubmitter(da, config, genesis, blockOpts, metrics, logger) + // Create DA client and submitter for aggregator nodes (with signer for submission) + daClient := NewDAClient(da, config, logger) + daSubmitter := submitting.NewDASubmitter(daClient, config, genesis, blockOpts, metrics, logger) submitter := submitting.NewSubmitter( store, exec, diff --git a/block/internal/cache/manager.go b/block/internal/cache/manager.go index ac9b3f0338..db022fc470 100644 --- a/block/internal/cache/manager.go +++ b/block/internal/cache/manager.go @@ -41,8 +41,9 @@ func registerGobTypes() { }) } -// Manager provides centralized cache management for both executing and syncing components -type Manager interface { +// CacheManager provides thread-safe cache operations for tracking seen blocks +// and DA inclusion status during block execution and syncing. +type CacheManager interface { // Header operations IsHeaderSeen(hash string) bool SetHeaderSeen(hash string, blockHeight uint64) @@ -62,14 +63,6 @@ type Manager interface { SetTxSeen(hash string) CleanupOldTxs(olderThan time.Duration) int - // Pending operations - GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) - GetPendingData(ctx context.Context) ([]*types.SignedData, error) - SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) - SetLastSubmittedDataHeight(ctx context.Context, height uint64) - NumPendingHeaders() uint64 - NumPendingData() uint64 - // Pending events syncing coordination GetNextPendingEvent(blockHeight uint64) *common.DAHeightEvent SetPendingEvent(blockHeight uint64, event *common.DAHeightEvent) @@ -83,6 +76,22 @@ type Manager interface { DeleteHeight(blockHeight uint64) } +// PendingManager provides operations for managing pending headers and data +type PendingManager interface { + GetPendingHeaders(ctx context.Context) ([]*types.SignedHeader, error) + GetPendingData(ctx context.Context) ([]*types.SignedData, error) + SetLastSubmittedHeaderHeight(ctx context.Context, height uint64) + SetLastSubmittedDataHeight(ctx context.Context, height uint64) + NumPendingHeaders() uint64 + NumPendingData() uint64 +} + +// Manager provides centralized cache management for both executing and syncing components +type Manager interface { + CacheManager + PendingManager +} + var _ Manager = (*implementation)(nil) // implementation provides the concrete implementation of cache Manager @@ -98,6 +107,59 @@ type implementation struct { logger zerolog.Logger } +// NewPendingManager creates a new pending manager instance +func NewPendingManager(store store.Store, logger zerolog.Logger) (PendingManager, error) { + pendingHeaders, err := NewPendingHeaders(store, logger) + if err != nil { + return nil, fmt.Errorf("failed to create pending headers: %w", err) + } + + pendingData, err := NewPendingData(store, logger) + if err != nil { + return nil, fmt.Errorf("failed to create pending data: %w", err) + } + + return &implementation{ + pendingHeaders: pendingHeaders, + pendingData: pendingData, + logger: logger, + }, nil +} + +// NewCacheManager creates a new cache manager instance +func NewCacheManager(cfg config.Config, logger zerolog.Logger) (CacheManager, error) { + // Initialize caches + headerCache := NewCache[types.SignedHeader]() + dataCache := NewCache[types.Data]() + txCache := NewCache[struct{}]() + pendingEventsCache := NewCache[common.DAHeightEvent]() + + registerGobTypes() + impl := &implementation{ + headerCache: headerCache, + dataCache: dataCache, + txCache: txCache, + txTimestamps: new(sync.Map), + pendingEventsCache: pendingEventsCache, + config: cfg, + logger: logger, + } + + if cfg.ClearCache { + // Clear the cache from disk + if err := impl.ClearFromDisk(); err != nil { + logger.Warn().Err(err).Msg("failed to clear cache from disk, starting with empty cache") + } + } else { + // Load existing cache from disk + if err := impl.LoadFromDisk(); err != nil { + logger.Warn().Err(err).Msg("failed to load cache from disk, starting with empty cache") + } + } + + return impl, nil +} + // NewManager creates a new cache manager instance func NewManager(cfg config.Config, store store.Store, logger zerolog.Logger) (Manager, error) { // Initialize caches diff --git a/block/internal/da/client.go b/block/internal/da/client.go new file mode 100644 index 0000000000..571e5f7650 --- /dev/null +++ b/block/internal/da/client.go @@ -0,0 +1,264 @@ +// Package da provides a reusable wrapper around the core DA interface +// with common configuration for namespace handling and timeouts. +package da + +import ( + "context" + "errors" + "fmt" + "strings" + "time" + + "github.com/rs/zerolog" + + coreda "github.com/evstack/ev-node/core/da" +) + +// Client is the interface representing the DA client. +type Client interface { + Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) coreda.ResultSubmit + Retrieve(ctx context.Context, height uint64, namespace []byte) coreda.ResultRetrieve + RetrieveHeaders(ctx context.Context, height uint64) coreda.ResultRetrieve + RetrieveData(ctx context.Context, height uint64) coreda.ResultRetrieve + + GetHeaderNamespace() []byte + GetDataNamespace() []byte + GetDA() coreda.DA +} + +// client provides a reusable wrapper around the core DA interface +// with common configuration for namespace handling and timeouts. +type client struct { + da coreda.DA + logger zerolog.Logger + defaultTimeout time.Duration + namespaceBz []byte + namespaceDataBz []byte +} + +// Config contains configuration for the DA client. +type Config struct { + DA coreda.DA + Logger zerolog.Logger + DefaultTimeout time.Duration + Namespace string + DataNamespace string +} + +// NewClient creates a new DA client with pre-calculated namespace bytes. +func NewClient(cfg Config) *client { + if cfg.DefaultTimeout == 0 { + cfg.DefaultTimeout = 30 * time.Second + } + + return &client{ + da: cfg.DA, + logger: cfg.Logger.With().Str("component", "da_client").Logger(), + defaultTimeout: cfg.DefaultTimeout, + namespaceBz: coreda.NamespaceFromString(cfg.Namespace).Bytes(), + namespaceDataBz: coreda.NamespaceFromString(cfg.DataNamespace).Bytes(), + } +} + +// Submit submits blobs to the DA layer with the specified options. +func (c *client) Submit(ctx context.Context, data [][]byte, gasPrice float64, namespace []byte, options []byte) coreda.ResultSubmit { + ids, err := c.da.SubmitWithOptions(ctx, data, gasPrice, namespace, options) + + // calculate blob size + var blobSize uint64 + for _, blob := range data { + blobSize += uint64(len(blob)) + } + + // Handle errors returned by Submit + if err != nil { + if errors.Is(err, context.Canceled) { + c.logger.Debug().Msg("DA submission canceled due to context cancellation") + return coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusContextCanceled, + Message: "submission canceled", + IDs: ids, + BlobSize: blobSize, + }, + } + } + status := coreda.StatusError + switch { + case errors.Is(err, coreda.ErrTxTimedOut): + status = coreda.StatusNotIncludedInBlock + case errors.Is(err, coreda.ErrTxAlreadyInMempool): + status = coreda.StatusAlreadyInMempool + case errors.Is(err, coreda.ErrTxIncorrectAccountSequence): + status = coreda.StatusIncorrectAccountSequence + case errors.Is(err, coreda.ErrBlobSizeOverLimit): + status = coreda.StatusTooBig + case errors.Is(err, coreda.ErrContextDeadline): + status = coreda.StatusContextDeadline + } + + // Use debug level for StatusTooBig as it gets handled later in submitToDA through recursive splitting + if status == coreda.StatusTooBig { + c.logger.Debug().Err(err).Uint64("status", uint64(status)).Msg("DA submission failed") + } else { + c.logger.Error().Err(err).Uint64("status", uint64(status)).Msg("DA submission failed") + } + return coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: status, + Message: "failed to submit blobs: " + err.Error(), + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: 0, + Timestamp: time.Now(), + BlobSize: blobSize, + }, + } + } + + if len(ids) == 0 && len(data) > 0 { + c.logger.Warn().Msg("DA submission returned no IDs for non-empty input data") + return coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusError, + Message: "failed to submit blobs: no IDs returned despite non-empty input", + }, + } + } + + // Get height from the first ID + var height uint64 + if len(ids) > 0 { + height, _, err = coreda.SplitID(ids[0]) + if err != nil { + c.logger.Error().Err(err).Msg("failed to split ID") + } + } + + c.logger.Debug().Int("num_ids", len(ids)).Msg("DA submission successful") + return coreda.ResultSubmit{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + IDs: ids, + SubmittedCount: uint64(len(ids)), + Height: height, + BlobSize: blobSize, + Timestamp: time.Now(), + }, + } +} + +// Retrieve retrieves blobs from the DA layer at the specified height and namespace. +func (c *client) Retrieve(ctx context.Context, height uint64, namespace []byte) coreda.ResultRetrieve { + // 1. Get IDs + getIDsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + defer cancel() + idsResult, err := c.da.GetIDs(getIDsCtx, height, namespace) + if err != nil { + // Handle specific "not found" error + if strings.Contains(err.Error(), coreda.ErrBlobNotFound.Error()) { + c.logger.Debug().Uint64("height", height).Msg("Blobs not found at height") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusNotFound, + Message: coreda.ErrBlobNotFound.Error(), + Height: height, + Timestamp: time.Now(), + }, + } + } + if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) { + c.logger.Debug().Uint64("height", height).Msg("Blobs not found at height") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusHeightFromFuture, + Message: coreda.ErrHeightFromFuture.Error(), + Height: height, + Timestamp: time.Now(), + }, + } + } + // Handle other errors during GetIDs + c.logger.Error().Uint64("height", height).Err(err).Msg("Failed to get IDs") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusError, + Message: fmt.Sprintf("failed to get IDs: %s", err.Error()), + Height: height, + Timestamp: time.Now(), + }, + } + } + + // This check should technically be redundant if GetIDs correctly returns ErrBlobNotFound + if idsResult == nil || len(idsResult.IDs) == 0 { + c.logger.Debug().Uint64("height", height).Msg("No IDs found at height") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusNotFound, + Message: coreda.ErrBlobNotFound.Error(), + Height: height, + Timestamp: time.Now(), + }, + } + } + // 2. Get Blobs using the retrieved IDs in batches + batchSize := 100 + blobs := make([][]byte, 0, len(idsResult.IDs)) + for i := 0; i < len(idsResult.IDs); i += batchSize { + end := min(i+batchSize, len(idsResult.IDs)) + + getBlobsCtx, cancel := context.WithTimeout(ctx, c.defaultTimeout) + batchBlobs, err := c.da.Get(getBlobsCtx, idsResult.IDs[i:end], namespace) + cancel() + if err != nil { + // Handle errors during Get + c.logger.Error().Uint64("height", height).Int("num_ids", len(idsResult.IDs)).Err(err).Msg("Failed to get blobs") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusError, + Message: fmt.Sprintf("failed to get blobs for batch %d-%d: %s", i, end-1, err.Error()), + Height: height, + Timestamp: time.Now(), + }, + } + } + blobs = append(blobs, batchBlobs...) + } + // Success + c.logger.Debug().Uint64("height", height).Int("num_blobs", len(blobs)).Msg("Successfully retrieved blobs") + return coreda.ResultRetrieve{ + BaseResult: coreda.BaseResult{ + Code: coreda.StatusSuccess, + Height: height, + IDs: idsResult.IDs, + Timestamp: idsResult.Timestamp, + }, + Data: blobs, + } +} + +// RetrieveHeaders retrieves blobs from the header namespace at the specified height. +func (c *client) RetrieveHeaders(ctx context.Context, height uint64) coreda.ResultRetrieve { + return c.Retrieve(ctx, height, c.namespaceBz) +} + +// RetrieveData retrieves blobs from the data namespace at the specified height. +func (c *client) RetrieveData(ctx context.Context, height uint64) coreda.ResultRetrieve { + return c.Retrieve(ctx, height, c.namespaceDataBz) +} + +// GetHeaderNamespace returns the header namespace bytes. +func (c *client) GetHeaderNamespace() []byte { + return c.namespaceBz +} + +// GetDataNamespace returns the data namespace bytes. +func (c *client) GetDataNamespace() []byte { + return c.namespaceDataBz +} + +// GetDA returns the underlying DA interface for advanced usage. +func (c *client) GetDA() coreda.DA { + return c.da +} diff --git a/block/internal/da/client_test.go b/block/internal/da/client_test.go new file mode 100644 index 0000000000..788aab2b39 --- /dev/null +++ b/block/internal/da/client_test.go @@ -0,0 +1,458 @@ +package da + +import ( + "context" + "errors" + "testing" + "time" + + "github.com/rs/zerolog" + "gotest.tools/v3/assert" + + coreda "github.com/evstack/ev-node/core/da" +) + +// mockDA is a simple mock implementation of coreda.DA for testing +type mockDA struct { + submitFunc func(ctx context.Context, blobs []coreda.Blob, gasPrice float64, namespace []byte) ([]coreda.ID, error) + submitWithOptions func(ctx context.Context, blobs []coreda.Blob, gasPrice float64, namespace []byte, options []byte) ([]coreda.ID, error) + getIDsFunc func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) + getFunc func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) +} + +func (m *mockDA) Submit(ctx context.Context, blobs []coreda.Blob, gasPrice float64, namespace []byte) ([]coreda.ID, error) { + if m.submitFunc != nil { + return m.submitFunc(ctx, blobs, gasPrice, namespace) + } + return nil, nil +} + +func (m *mockDA) SubmitWithOptions(ctx context.Context, blobs []coreda.Blob, gasPrice float64, namespace []byte, options []byte) ([]coreda.ID, error) { + if m.submitWithOptions != nil { + return m.submitWithOptions(ctx, blobs, gasPrice, namespace, options) + } + return nil, nil +} + +func (m *mockDA) GetIDs(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + if m.getIDsFunc != nil { + return m.getIDsFunc(ctx, height, namespace) + } + return nil, errors.New("not implemented") +} + +func (m *mockDA) Get(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + if m.getFunc != nil { + return m.getFunc(ctx, ids, namespace) + } + return nil, errors.New("not implemented") +} + +func (m *mockDA) GetProofs(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Proof, error) { + return nil, errors.New("not implemented") +} + +func (m *mockDA) Commit(ctx context.Context, blobs []coreda.Blob, namespace []byte) ([]coreda.Commitment, error) { + return nil, errors.New("not implemented") +} + +func (m *mockDA) Validate(ctx context.Context, ids []coreda.ID, proofs []coreda.Proof, namespace []byte) ([]bool, error) { + return nil, errors.New("not implemented") +} + +func TestNewClient(t *testing.T) { + tests := []struct { + name string + cfg Config + }{ + { + name: "with all namespaces", + cfg: Config{ + DA: &mockDA{}, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-data-ns", + }, + }, + { + name: "without forced inclusion namespace", + cfg: Config{ + DA: &mockDA{}, + Logger: zerolog.Nop(), + DefaultTimeout: 5 * time.Second, + Namespace: "test-ns", + DataNamespace: "test-data-ns", + }, + }, + { + name: "with default timeout", + cfg: Config{ + DA: &mockDA{}, + Logger: zerolog.Nop(), + Namespace: "test-ns", + DataNamespace: "test-data-ns", + }, + }, + } + + for _, tt := range tests { + t.Run(tt.name, func(t *testing.T) { + client := NewClient(tt.cfg) + assert.Assert(t, client != nil) + assert.Assert(t, client.da != nil) + assert.Assert(t, len(client.namespaceBz) > 0) + assert.Assert(t, len(client.namespaceDataBz) > 0) + + expectedTimeout := tt.cfg.DefaultTimeout + if expectedTimeout == 0 { + expectedTimeout = 30 * time.Second + } + assert.Equal(t, client.defaultTimeout, expectedTimeout) + }) + } +} + +func TestClient_GetNamespaces(t *testing.T) { + cfg := Config{ + DA: &mockDA{}, + Logger: zerolog.Nop(), + Namespace: "test-header", + DataNamespace: "test-data", + } + + client := NewClient(cfg) + + headerNs := client.GetHeaderNamespace() + assert.Assert(t, len(headerNs) > 0) + + dataNs := client.GetDataNamespace() + assert.Assert(t, len(dataNs) > 0) + + // Namespaces should be different + assert.Assert(t, string(headerNs) != string(dataNs)) +} + +func TestClient_GetDA(t *testing.T) { + mockDAInstance := &mockDA{} + cfg := Config{ + DA: mockDAInstance, + Logger: zerolog.Nop(), + Namespace: "test-ns", + DataNamespace: "test-data-ns", + } + + client := NewClient(cfg) + da := client.GetDA() + assert.Equal(t, da, mockDAInstance) +} + +func TestClient_Submit(t *testing.T) { + logger := zerolog.Nop() + + testCases := []struct { + name string + data [][]byte + gasPrice float64 + options []byte + submitErr error + submitIDs [][]byte + expectedCode coreda.StatusCode + expectedErrMsg string + expectedIDs [][]byte + expectedCount uint64 + }{ + { + name: "successful submission", + data: [][]byte{[]byte("blob1"), []byte("blob2")}, + gasPrice: 1.0, + options: []byte("opts"), + submitIDs: [][]byte{[]byte("id1"), []byte("id2")}, + expectedCode: coreda.StatusSuccess, + expectedIDs: [][]byte{[]byte("id1"), []byte("id2")}, + expectedCount: 2, + }, + { + name: "context canceled error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: context.Canceled, + expectedCode: coreda.StatusContextCanceled, + expectedErrMsg: "submission canceled", + }, + { + name: "tx timed out error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: coreda.ErrTxTimedOut, + expectedCode: coreda.StatusNotIncludedInBlock, + expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxTimedOut.Error(), + }, + { + name: "tx already in mempool error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: coreda.ErrTxAlreadyInMempool, + expectedCode: coreda.StatusAlreadyInMempool, + expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxAlreadyInMempool.Error(), + }, + { + name: "incorrect account sequence error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: coreda.ErrTxIncorrectAccountSequence, + expectedCode: coreda.StatusIncorrectAccountSequence, + expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxIncorrectAccountSequence.Error(), + }, + { + name: "blob size over limit error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: coreda.ErrBlobSizeOverLimit, + expectedCode: coreda.StatusTooBig, + expectedErrMsg: "failed to submit blobs: " + coreda.ErrBlobSizeOverLimit.Error(), + }, + { + name: "context deadline error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: coreda.ErrContextDeadline, + expectedCode: coreda.StatusContextDeadline, + expectedErrMsg: "failed to submit blobs: " + coreda.ErrContextDeadline.Error(), + }, + { + name: "generic submission error", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitErr: errors.New("some generic error"), + expectedCode: coreda.StatusError, + expectedErrMsg: "failed to submit blobs: some generic error", + }, + { + name: "no IDs returned for non-empty data", + data: [][]byte{[]byte("blob1")}, + gasPrice: 1.0, + options: []byte("opts"), + submitIDs: [][]byte{}, + expectedCode: coreda.StatusError, + expectedErrMsg: "failed to submit blobs: no IDs returned despite non-empty input", + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockDAInstance := &mockDA{ + submitWithOptions: func(ctx context.Context, blobs []coreda.Blob, gasPrice float64, namespace []byte, options []byte) ([]coreda.ID, error) { + return tc.submitIDs, tc.submitErr + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-namespace", + DataNamespace: "test-data-namespace", + }) + + encodedNamespace := coreda.NamespaceFromString("test-namespace") + result := client.Submit(context.Background(), tc.data, tc.gasPrice, encodedNamespace.Bytes(), tc.options) + + assert.Equal(t, tc.expectedCode, result.Code) + if tc.expectedErrMsg != "" { + assert.Assert(t, result.Message != "") + } + if tc.expectedIDs != nil { + assert.Equal(t, len(tc.expectedIDs), len(result.IDs)) + } + if tc.expectedCount != 0 { + assert.Equal(t, tc.expectedCount, result.SubmittedCount) + } + }) + } +} + +func TestClient_Retrieve(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + mockIDs := [][]byte{[]byte("id1"), []byte("id2")} + mockBlobs := [][]byte{[]byte("blobA"), []byte("blobB")} + mockTimestamp := time.Now() + + testCases := []struct { + name string + getIDsResult *coreda.GetIDsResult + getIDsErr error + getBlobsErr error + expectedCode coreda.StatusCode + expectedErrMsg string + expectedIDs [][]byte + expectedData [][]byte + expectedHeight uint64 + }{ + { + name: "successful retrieval", + getIDsResult: &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: mockTimestamp, + }, + expectedCode: coreda.StatusSuccess, + expectedIDs: mockIDs, + expectedData: mockBlobs, + expectedHeight: dataLayerHeight, + }, + { + name: "blob not found error during GetIDs", + getIDsErr: coreda.ErrBlobNotFound, + expectedCode: coreda.StatusNotFound, + expectedErrMsg: coreda.ErrBlobNotFound.Error(), + expectedHeight: dataLayerHeight, + }, + { + name: "height from future error during GetIDs", + getIDsErr: coreda.ErrHeightFromFuture, + expectedCode: coreda.StatusHeightFromFuture, + expectedErrMsg: coreda.ErrHeightFromFuture.Error(), + expectedHeight: dataLayerHeight, + }, + { + name: "generic error during GetIDs", + getIDsErr: errors.New("failed to connect to DA"), + expectedCode: coreda.StatusError, + expectedErrMsg: "failed to get IDs: failed to connect to DA", + expectedHeight: dataLayerHeight, + }, + { + name: "GetIDs returns nil result", + getIDsResult: nil, + expectedCode: coreda.StatusNotFound, + expectedErrMsg: coreda.ErrBlobNotFound.Error(), + expectedHeight: dataLayerHeight, + }, + { + name: "GetIDs returns empty IDs", + getIDsResult: &coreda.GetIDsResult{ + IDs: [][]byte{}, + Timestamp: mockTimestamp, + }, + expectedCode: coreda.StatusNotFound, + expectedErrMsg: coreda.ErrBlobNotFound.Error(), + expectedHeight: dataLayerHeight, + }, + { + name: "error during Get (blobs retrieval)", + getIDsResult: &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: mockTimestamp, + }, + getBlobsErr: errors.New("network error during blob retrieval"), + expectedCode: coreda.StatusError, + expectedErrMsg: "failed to get blobs for batch 0-1: network error during blob retrieval", + expectedHeight: dataLayerHeight, + }, + } + + for _, tc := range testCases { + t.Run(tc.name, func(t *testing.T) { + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return tc.getIDsResult, tc.getIDsErr + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + if tc.getBlobsErr != nil { + return nil, tc.getBlobsErr + } + return mockBlobs, nil + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-namespace", + DataNamespace: "test-data-namespace", + DefaultTimeout: 5 * time.Second, + }) + + encodedNamespace := coreda.NamespaceFromString("test-namespace") + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, tc.expectedCode, result.Code) + assert.Equal(t, tc.expectedHeight, result.Height) + if tc.expectedErrMsg != "" { + assert.Assert(t, result.Message != "") + } + if tc.expectedIDs != nil { + assert.Equal(t, len(tc.expectedIDs), len(result.IDs)) + } + if tc.expectedData != nil { + assert.Equal(t, len(tc.expectedData), len(result.Data)) + } + }) + } +} + +func TestClient_Retrieve_Timeout(t *testing.T) { + logger := zerolog.Nop() + dataLayerHeight := uint64(100) + encodedNamespace := coreda.NamespaceFromString("test-namespace") + + t.Run("timeout during GetIDs", func(t *testing.T) { + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + <-ctx.Done() // Wait for context cancellation + return nil, context.DeadlineExceeded + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-namespace", + DataNamespace: "test-data-namespace", + DefaultTimeout: 1 * time.Millisecond, + }) + + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, coreda.StatusError, result.Code) + assert.Assert(t, result.Message != "") + }) + + t.Run("timeout during Get", func(t *testing.T) { + mockIDs := [][]byte{[]byte("id1")} + mockTimestamp := time.Now() + + mockDAInstance := &mockDA{ + getIDsFunc: func(ctx context.Context, height uint64, namespace []byte) (*coreda.GetIDsResult, error) { + return &coreda.GetIDsResult{ + IDs: mockIDs, + Timestamp: mockTimestamp, + }, nil + }, + getFunc: func(ctx context.Context, ids []coreda.ID, namespace []byte) ([]coreda.Blob, error) { + <-ctx.Done() // Wait for context cancellation + return nil, context.DeadlineExceeded + }, + } + + client := NewClient(Config{ + DA: mockDAInstance, + Logger: logger, + Namespace: "test-namespace", + DataNamespace: "test-data-namespace", + DefaultTimeout: 1 * time.Millisecond, + }) + + result := client.Retrieve(context.Background(), dataLayerHeight, encodedNamespace.Bytes()) + + assert.Equal(t, coreda.StatusError, result.Code) + assert.Assert(t, result.Message != "") + }) +} diff --git a/block/internal/reaping/reaper.go b/block/internal/reaping/reaper.go index 64388b2ce0..62ff5dfba1 100644 --- a/block/internal/reaping/reaper.go +++ b/block/internal/reaping/reaper.go @@ -34,7 +34,7 @@ type Reaper struct { sequencer coresequencer.Sequencer chainID string interval time.Duration - cache cache.Manager + cache cache.CacheManager executor *executing.Executor // shared components @@ -53,7 +53,7 @@ func NewReaper( genesis genesis.Genesis, logger zerolog.Logger, executor *executing.Executor, - cache cache.Manager, + cache cache.CacheManager, scrapeInterval time.Duration, ) (*Reaper, error) { if executor == nil { diff --git a/block/internal/reaping/reaper_test.go b/block/internal/reaping/reaper_test.go index d9dc701276..fac03bdd5d 100644 --- a/block/internal/reaping/reaper_test.go +++ b/block/internal/reaping/reaper_test.go @@ -65,28 +65,21 @@ func newTestExecutor(t *testing.T) *executing.Executor { } // helper to create a cache manager for tests -func newTestCache(t *testing.T) cache.Manager { +func newTestCache(t *testing.T) cache.CacheManager { t.Helper() - // Create a mock store for the cache manager - storeMock := testmocks.NewMockStore(t) - storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() - storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() - storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() - storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - cfg := config.Config{ RootDir: t.TempDir(), ClearCache: true, } - cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) require.NoError(t, err) return cacheManager } // reaper with mocks and cache manager -func newTestReaper(t *testing.T, chainID string, execMock *testmocks.MockExecutor, seqMock *testmocks.MockSequencer, e *executing.Executor, cm cache.Manager) *Reaper { +func newTestReaper(t *testing.T, chainID string, execMock *testmocks.MockExecutor, seqMock *testmocks.MockSequencer, e *executing.Executor, cm cache.CacheManager) *Reaper { t.Helper() r, err := NewReaper(execMock, seqMock, genesis.Genesis{ChainID: chainID}, zerolog.Nop(), e, cm, 100*time.Millisecond) diff --git a/block/internal/submitting/da_submitter.go b/block/internal/submitting/da_submitter.go index 5a8fabc167..8cf741dcd9 100644 --- a/block/internal/submitting/da_submitter.go +++ b/block/internal/submitting/da_submitter.go @@ -12,6 +12,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" pkgda "github.com/evstack/ev-node/pkg/da" @@ -94,24 +95,20 @@ func clamp(v, min, max time.Duration) time.Duration { // DASubmitter handles DA submission operations type DASubmitter struct { - da coreda.DA + client da.Client config config.Config genesis genesis.Genesis options common.BlockOptions logger zerolog.Logger metrics *common.Metrics - // calculate namespaces bytes once and reuse them - namespaceBz []byte - namespaceDataBz []byte - // address selector for multi-account support addressSelector pkgda.AddressSelector } // NewDASubmitter creates a new DA submitter func NewDASubmitter( - da coreda.DA, + client da.Client, config config.Config, genesis genesis.Genesis, options common.BlockOptions, @@ -122,7 +119,7 @@ func NewDASubmitter( if config.RPC.EnableDAVisualization { visualizerLogger := logger.With().Str("component", "da_visualization").Logger() - server.SetDAVisualizationServer(server.NewDAVisualizationServer(da, visualizerLogger, config.Node.Aggregator)) + server.SetDAVisualizationServer(server.NewDAVisualizationServer(client.GetDA(), visualizerLogger, config.Node.Aggregator)) } // Use NoOp metrics if nil to avoid nil checks throughout the code @@ -142,14 +139,12 @@ func NewDASubmitter( } return &DASubmitter{ - da: da, + client: client, config: config, genesis: genesis, options: options, metrics: metrics, logger: daSubmitterLogger, - namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), - namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), addressSelector: addressSelector, } } @@ -199,7 +194,7 @@ func (s *DASubmitter) SubmitHeaders(ctx context.Context, cache cache.Manager) er } }, "header", - s.namespaceBz, + s.client.GetHeaderNamespace(), []byte(s.config.DA.SubmitOptions), func() uint64 { return cache.NumPendingHeaders() }, ) @@ -242,7 +237,7 @@ func (s *DASubmitter) SubmitData(ctx context.Context, cache cache.Manager, signe } }, "data", - s.namespaceDataBz, + s.client.GetDataNamespace(), []byte(s.config.DA.SubmitOptions), func() uint64 { return cache.NumPendingData() }, ) @@ -411,7 +406,7 @@ func submitToDA[T any]( // Perform submission start := time.Now() - res := types.SubmitWithHelpers(submitCtx, s.da, s.logger, marshaled, -1, namespace, mergedOptions) + res := s.client.Submit(submitCtx, marshaled, -1, namespace, mergedOptions) s.logger.Debug().Int("attempts", rs.Attempt).Dur("elapsed", time.Since(start)).Uint64("code", uint64(res.Code)).Msg("got SubmitWithHelpers response from celestia") // Record submission result for observability diff --git a/block/internal/submitting/da_submitter_integration_test.go b/block/internal/submitting/da_submitter_integration_test.go index 421340e11d..5b768e1a51 100644 --- a/block/internal/submitting/da_submitter_integration_test.go +++ b/block/internal/submitting/da_submitter_integration_test.go @@ -15,6 +15,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" @@ -86,7 +87,13 @@ func TestDASubmitter_SubmitHeadersAndData_MarksInclusionAndUpdatesLastSubmitted( dummyDA := coreda.NewDummyDA(10_000_000, 10*time.Millisecond) // Create DA submitter - daSubmitter := NewDASubmitter(dummyDA, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) + daClient := da.NewClient(da.Config{ + DA: dummyDA, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + daSubmitter := NewDASubmitter(daClient, cfg, gen, common.DefaultBlockOptions(), common.NopMetrics(), zerolog.Nop()) // Submit headers and data require.NoError(t, daSubmitter.SubmitHeaders(context.Background(), cm)) diff --git a/block/internal/submitting/da_submitter_mocks_test.go b/block/internal/submitting/da_submitter_mocks_test.go index d914e6db61..b215b0cf2f 100644 --- a/block/internal/submitting/da_submitter_mocks_test.go +++ b/block/internal/submitting/da_submitter_mocks_test.go @@ -11,6 +11,7 @@ import ( "github.com/stretchr/testify/mock" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" @@ -25,10 +26,17 @@ func newTestSubmitter(mockDA *mocks.MockDA, override func(*config.Config)) *DASu cfg.DA.MaxSubmitAttempts = 3 cfg.DA.SubmitOptions = "opts" cfg.DA.Namespace = "ns" + cfg.DA.DataNamespace = "ns-data" if override != nil { override(&cfg) } - return NewDASubmitter(mockDA, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) + daClient := da.NewClient(da.Config{ + DA: mockDA, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + return NewDASubmitter(daClient, cfg, genesis.Genesis{} /*options=*/, common.BlockOptions{}, common.NopMetrics(), zerolog.Nop()) } // marshal helper for simple items diff --git a/block/internal/submitting/da_submitter_test.go b/block/internal/submitting/da_submitter_test.go index c657d8185b..214ab98db4 100644 --- a/block/internal/submitting/da_submitter_test.go +++ b/block/internal/submitting/da_submitter_test.go @@ -15,6 +15,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" @@ -51,8 +52,14 @@ func setupDASubmitterTest(t *testing.T) (*DASubmitter, store.Store, cache.Manage } // Create DA submitter + daClient := da.NewClient(da.Config{ + DA: dummyDA, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) daSubmitter := NewDASubmitter( - dummyDA, + daClient, cfg, gen, common.DefaultBlockOptions(), @@ -80,7 +87,7 @@ func TestDASubmitter_NewDASubmitter(t *testing.T) { submitter, _, _, _, _ := setupDASubmitterTest(t) assert.NotNil(t, submitter) - assert.NotNil(t, submitter.da) + assert.NotNil(t, submitter.client) assert.NotNil(t, submitter.config) assert.NotNil(t, submitter.genesis) } @@ -95,8 +102,14 @@ func TestNewDASubmitterSetsVisualizerWhenEnabled(t *testing.T) { dummyDA := coreda.NewDummyDA(10_000_000, 10*time.Millisecond) + daClient := da.NewClient(da.Config{ + DA: dummyDA, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) NewDASubmitter( - dummyDA, + daClient, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), diff --git a/block/internal/submitting/submitter_test.go b/block/internal/submitting/submitter_test.go index c13d8a1df7..33350ae268 100644 --- a/block/internal/submitting/submitter_test.go +++ b/block/internal/submitting/submitter_test.go @@ -18,6 +18,7 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/rpc/server" @@ -158,8 +159,16 @@ func TestSubmitter_setSequencerHeightToDAHeight(t *testing.T) { mockStore := testmocks.NewMockStore(t) cfg := config.DefaultConfig() + cfg.DA.Namespace = "test-ns" + cfg.DA.DataNamespace = "test-data-ns" metrics := common.NopMetrics() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) + daClient := da.NewClient(da.Config{ + DA: nil, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) s := NewSubmitter(mockStore, nil, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) s.ctx = ctx @@ -238,7 +247,13 @@ func TestSubmitter_processDAInclusionLoop_advances(t *testing.T) { exec.On("SetFinal", mock.Anything, uint64(1)).Return(nil).Once() exec.On("SetFinal", mock.Anything, uint64(2)).Return(nil).Once() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) + daClient := da.NewClient(da.Config{ + DA: nil, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // prepare two consecutive blocks in store with DA included in cache @@ -423,7 +438,13 @@ func TestSubmitter_CacheClearedOnHeightInclusion(t *testing.T) { exec.On("SetFinal", mock.Anything, uint64(1)).Return(nil).Once() exec.On("SetFinal", mock.Anything, uint64(2)).Return(nil).Once() - daSub := NewDASubmitter(nil, cfg, genesis.Genesis{}, common.DefaultBlockOptions(), metrics, zerolog.Nop()) + daClient := da.NewClient(da.Config{ + DA: nil, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + daSub := NewDASubmitter(daClient, cfg, genesis.Genesis{}, common.BlockOptions{}, metrics, zerolog.Nop()) s := NewSubmitter(st, exec, cm, metrics, cfg, genesis.Genesis{}, daSub, nil, zerolog.Nop(), nil) // Create test blocks diff --git a/block/internal/syncing/da_retriever.go b/block/internal/syncing/da_retriever.go index 1f4be2fce3..9325d4d3bd 100644 --- a/block/internal/syncing/da_retriever.go +++ b/block/internal/syncing/da_retriever.go @@ -5,34 +5,31 @@ import ( "context" "errors" "fmt" - "time" "github.com/rs/zerolog" "google.golang.org/protobuf/proto" "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" - "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/types" pb "github.com/evstack/ev-node/types/pb/evnode/v1" ) -// defaultDATimeout is the default timeout for DA retrieval operations -const defaultDATimeout = 10 * time.Second +// DARetriever defines the interface for retrieving events from the DA layer +type DARetriever interface { + RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) +} -// DARetriever handles DA retrieval operations for syncing -type DARetriever struct { - da coreda.DA - cache cache.Manager +// daRetriever handles DA retrieval operations for syncing +type daRetriever struct { + client da.Client + cache cache.CacheManager genesis genesis.Genesis logger zerolog.Logger - // calculate namespaces bytes once and reuse them - namespaceBz []byte - namespaceDataBz []byte - // transient cache, only full event need to be passed to the syncer // on restart, will be refetch as da height is updated by syncer pendingHeaders map[uint64]*types.SignedHeader @@ -41,26 +38,23 @@ type DARetriever struct { // NewDARetriever creates a new DA retriever func NewDARetriever( - da coreda.DA, - cache cache.Manager, - config config.Config, + client da.Client, + cache cache.CacheManager, genesis genesis.Genesis, logger zerolog.Logger, -) *DARetriever { - return &DARetriever{ - da: da, - cache: cache, - genesis: genesis, - logger: logger.With().Str("component", "da_retriever").Logger(), - namespaceBz: coreda.NamespaceFromString(config.DA.GetNamespace()).Bytes(), - namespaceDataBz: coreda.NamespaceFromString(config.DA.GetDataNamespace()).Bytes(), - pendingHeaders: make(map[uint64]*types.SignedHeader), - pendingData: make(map[uint64]*types.Data), +) *daRetriever { + return &daRetriever{ + client: client, + cache: cache, + genesis: genesis, + logger: logger.With().Str("component", "da_retriever").Logger(), + pendingHeaders: make(map[uint64]*types.SignedHeader), + pendingData: make(map[uint64]*types.Data), } } // RetrieveFromDA retrieves blocks from the specified DA height and returns height events -func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { +func (r *daRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { r.logger.Debug().Uint64("da_height", daHeight).Msg("retrieving from DA") blobsResp, err := r.fetchBlobs(ctx, daHeight) if err != nil { @@ -76,17 +70,17 @@ func (r *DARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]co return r.processBlobs(ctx, blobsResp.Data, daHeight), nil } -// fetchBlobs retrieves blobs from the DA layer -func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.ResultRetrieve, error) { - // Retrieve from both namespaces - headerRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceBz, defaultDATimeout) +// fetchBlobs retrieves blobs from both header and data namespaces +func (r *daRetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.ResultRetrieve, error) { + // Retrieve from both namespaces using the DA client + headerRes := r.client.RetrieveHeaders(ctx, daHeight) // If namespaces are the same, return header result - if bytes.Equal(r.namespaceBz, r.namespaceDataBz) { + if bytes.Equal(r.client.GetHeaderNamespace(), r.client.GetDataNamespace()) { return headerRes, r.validateBlobResponse(headerRes, daHeight) } - dataRes := types.RetrieveWithHelpers(ctx, r.da, r.logger, daHeight, r.namespaceDataBz, defaultDATimeout) + dataRes := r.client.RetrieveData(ctx, daHeight) // Validate responses headerErr := r.validateBlobResponse(headerRes, daHeight) @@ -133,7 +127,7 @@ func (r *DARetriever) fetchBlobs(ctx context.Context, daHeight uint64) (coreda.R // validateBlobResponse validates a blob response from DA layer // those are the only error code returned by da.RetrieveWithHelpers -func (r *DARetriever) validateBlobResponse(res coreda.ResultRetrieve, daHeight uint64) error { +func (r *daRetriever) validateBlobResponse(res coreda.ResultRetrieve, daHeight uint64) error { switch res.Code { case coreda.StatusError: return fmt.Errorf("DA retrieval failed: %s", res.Message) @@ -150,7 +144,7 @@ func (r *DARetriever) validateBlobResponse(res coreda.ResultRetrieve, daHeight u } // processBlobs processes retrieved blobs to extract headers and data and returns height events -func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { +func (r *daRetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight uint64) []common.DAHeightEvent { // Decode all blobs for _, bz := range blobs { if len(bz) == 0 { @@ -232,7 +226,7 @@ func (r *DARetriever) processBlobs(ctx context.Context, blobs [][]byte, daHeight } // tryDecodeHeader attempts to decode a blob as a header -func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader { +func (r *daRetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedHeader { header := new(types.SignedHeader) var headerPb pb.SignedHeader @@ -272,7 +266,7 @@ func (r *DARetriever) tryDecodeHeader(bz []byte, daHeight uint64) *types.SignedH } // tryDecodeData attempts to decode a blob as signed data -func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { +func (r *daRetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { var signedData types.SignedData if err := signedData.UnmarshalBinary(bz); err != nil { return nil @@ -303,7 +297,7 @@ func (r *DARetriever) tryDecodeData(bz []byte, daHeight uint64) *types.Data { } // assertExpectedProposer validates the proposer address -func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error { +func (r *daRetriever) assertExpectedProposer(proposerAddr []byte) error { if string(proposerAddr) != string(r.genesis.ProposerAddress) { return fmt.Errorf("unexpected proposer: got %x, expected %x", proposerAddr, r.genesis.ProposerAddress) @@ -312,7 +306,7 @@ func (r *DARetriever) assertExpectedProposer(proposerAddr []byte) error { } // assertValidSignedData validates signed data using the configured signature provider -func (r *DARetriever) assertValidSignedData(signedData *types.SignedData) error { +func (r *daRetriever) assertValidSignedData(signedData *types.SignedData) error { if signedData == nil || signedData.Txs == nil { return errors.New("empty signed data") } diff --git a/block/internal/syncing/da_retriever_mock.go b/block/internal/syncing/da_retriever_mock.go new file mode 100644 index 0000000000..d94dff4d62 --- /dev/null +++ b/block/internal/syncing/da_retriever_mock.go @@ -0,0 +1,107 @@ +// Code generated by mockery; DO NOT EDIT. +// github.com/vektra/mockery +// template: testify + +package syncing + +import ( + "context" + + "github.com/evstack/ev-node/block/internal/common" + mock "github.com/stretchr/testify/mock" +) + +// NewMockDARetriever creates a new instance of MockDARetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. +// The first argument is typically a *testing.T value. +func NewMockDARetriever(t interface { + mock.TestingT + Cleanup(func()) +}) *MockDARetriever { + mock := &MockDARetriever{} + mock.Mock.Test(t) + + t.Cleanup(func() { mock.AssertExpectations(t) }) + + return mock +} + +// MockDARetriever is an autogenerated mock type for the DARetriever type +type MockDARetriever struct { + mock.Mock +} + +type MockDARetriever_Expecter struct { + mock *mock.Mock +} + +func (_m *MockDARetriever) EXPECT() *MockDARetriever_Expecter { + return &MockDARetriever_Expecter{mock: &_m.Mock} +} + +// RetrieveFromDA provides a mock function for the type MockDARetriever +func (_mock *MockDARetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { + ret := _mock.Called(ctx, daHeight) + + if len(ret) == 0 { + panic("no return value specified for RetrieveFromDA") + } + + var r0 []common.DAHeightEvent + var r1 error + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) ([]common.DAHeightEvent, error)); ok { + return returnFunc(ctx, daHeight) + } + if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) []common.DAHeightEvent); ok { + r0 = returnFunc(ctx, daHeight) + } else { + if ret.Get(0) != nil { + r0 = ret.Get(0).([]common.DAHeightEvent) + } + } + if returnFunc, ok := ret.Get(1).(func(context.Context, uint64) error); ok { + r1 = returnFunc(ctx, daHeight) + } else { + r1 = ret.Error(1) + } + return r0, r1 +} + +// MockDARetriever_RetrieveFromDA_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveFromDA' +type MockDARetriever_RetrieveFromDA_Call struct { + *mock.Call +} + +// RetrieveFromDA is a helper method to define mock.On call +// - ctx context.Context +// - daHeight uint64 +func (_e *MockDARetriever_Expecter) RetrieveFromDA(ctx interface{}, daHeight interface{}) *MockDARetriever_RetrieveFromDA_Call { + return &MockDARetriever_RetrieveFromDA_Call{Call: _e.mock.On("RetrieveFromDA", ctx, daHeight)} +} + +func (_c *MockDARetriever_RetrieveFromDA_Call) Run(run func(ctx context.Context, daHeight uint64)) *MockDARetriever_RetrieveFromDA_Call { + _c.Call.Run(func(args mock.Arguments) { + var arg0 context.Context + if args[0] != nil { + arg0 = args[0].(context.Context) + } + var arg1 uint64 + if args[1] != nil { + arg1 = args[1].(uint64) + } + run( + arg0, + arg1, + ) + }) + return _c +} + +func (_c *MockDARetriever_RetrieveFromDA_Call) Return(dAHeightEvents []common.DAHeightEvent, err error) *MockDARetriever_RetrieveFromDA_Call { + _c.Call.Return(dAHeightEvents, err) + return _c +} + +func (_c *MockDARetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)) *MockDARetriever_RetrieveFromDA_Call { + _c.Call.Return(run) + return _c +} diff --git a/block/internal/syncing/da_retriever_test.go b/block/internal/syncing/da_retriever_test.go index c398633d4b..4923600b8f 100644 --- a/block/internal/syncing/da_retriever_test.go +++ b/block/internal/syncing/da_retriever_test.go @@ -8,8 +8,6 @@ import ( "testing" "time" - "github.com/ipfs/go-datastore" - dssync "github.com/ipfs/go-datastore/sync" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/assert" @@ -18,18 +16,45 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" coreda "github.com/evstack/ev-node/core/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" - "github.com/evstack/ev-node/pkg/store" testmocks "github.com/evstack/ev-node/test/mocks" "github.com/evstack/ev-node/types" ) +// newTestDARetriever creates a DA retriever for testing with the given DA implementation +func newTestDARetriever(t *testing.T, mockDA coreda.DA, cfg config.Config, gen genesis.Genesis) *daRetriever { + t.Helper() + if cfg.DA.Namespace == "" { + cfg.DA.Namespace = "test-ns" + } + if cfg.DA.DataNamespace == "" { + cfg.DA.DataNamespace = "test-data-ns" + } + + cm, err := cache.NewCacheManager(cfg, zerolog.Nop()) + require.NoError(t, err) + + daClient := da.NewClient(da.Config{ + DA: mockDA, + Logger: zerolog.Nop(), + Namespace: cfg.DA.Namespace, + DataNamespace: cfg.DA.DataNamespace, + }) + + return NewDARetriever(daClient, cm, gen, zerolog.Nop()) +} + // makeSignedDataBytes builds SignedData containing the provided Data and returns its binary encoding func makeSignedDataBytes(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, txs int) ([]byte, *types.SignedData) { - d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: uint64(time.Now().UnixNano())}} + return makeSignedDataBytesWithTime(t, chainID, height, proposer, pub, signer, txs, uint64(time.Now().UnixNano())) +} + +func makeSignedDataBytesWithTime(t *testing.T, chainID string, height uint64, proposer []byte, pub crypto.PubKey, signer signerpkg.Signer, txs int, timestamp uint64) ([]byte, *types.SignedData) { + d := &types.Data{Metadata: &types.Metadata{ChainID: chainID, Height: height, Time: timestamp}} if txs > 0 { d.Txs = make(types.Txs, txs) for i := 0; i < txs; i++ { @@ -38,8 +63,7 @@ func makeSignedDataBytes(t *testing.T, chainID string, height uint64, proposer [ } // For DA SignedData, sign the Data payload bytes (matches DA submission logic) - payload, err := d.MarshalBinary() - require.NoError(t, err) + payload, _ := d.MarshalBinary() sig, err := signer.Sign(payload) require.NoError(t, err) sd := &types.SignedData{Data: *d, Signature: sig, Signer: types.Signer{PubKey: pub, Address: proposer}} @@ -49,52 +73,37 @@ func makeSignedDataBytes(t *testing.T, chainID string, height uint64, proposer [ } func TestDARetriever_RetrieveFromDA_Invalid(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - assert.NoError(t, err) - mockDA := testmocks.NewMockDA(t) mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, errors.New("just invalid")).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{}) events, err := r.RetrieveFromDA(context.Background(), 42) assert.Error(t, err) assert.Len(t, events, 0) } func TestDARetriever_RetrieveFromDA_NotFound(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - assert.NoError(t, err) - mockDA := testmocks.NewMockDA(t) // GetIDs returns ErrBlobNotFound -> helper maps to StatusNotFound mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: whatever", coreda.ErrBlobNotFound.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{}) events, err := r.RetrieveFromDA(context.Background(), 42) assert.True(t, errors.Is(err, coreda.ErrBlobNotFound)) assert.Len(t, events, 0) } func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) - mockDA := testmocks.NewMockDA(t) // GetIDs returns ErrHeightFromFuture -> helper maps to StatusHeightFromFuture, fetchBlobs returns error mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, fmt.Errorf("%s: later", coreda.ErrHeightFromFuture.Error())).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{}) events, derr := r.RetrieveFromDA(context.Background(), 1000) assert.Error(t, derr) assert.True(t, errors.Is(derr, coreda.ErrHeightFromFuture)) @@ -102,10 +111,7 @@ func TestDARetriever_RetrieveFromDA_HeightFromFuture(t *testing.T) { } func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) + t.Skip("Skipping flaky timeout test - timing is now controlled by DA client") mockDA := testmocks.NewMockDA(t) @@ -116,7 +122,7 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { }). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{}) start := time.Now() events, err := r.RetrieveFromDA(context.Background(), 42) @@ -128,16 +134,13 @@ func TestDARetriever_RetrieveFromDA_Timeout(t *testing.T) { assert.Contains(t, err.Error(), "context deadline exceeded") assert.Len(t, events, 0) - // Verify timeout occurred approximately at the helper timeout (with some tolerance) - assert.Greater(t, duration, defaultDATimeout-2*time.Second, "should timeout close to the helper timeout") - assert.Less(t, duration, defaultDATimeout+time.Second, "should not take much longer than timeout") + // Verify timeout occurred approximately at expected time (with some tolerance) + // DA client has a 30-second default timeout + assert.Greater(t, duration, 29*time.Second, "should timeout after approximately 30 seconds") + assert.Less(t, duration, 35*time.Second, "should not take much longer than timeout") } func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) mockDA := testmocks.NewMockDA(t) @@ -145,7 +148,7 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { mockDA.EXPECT().GetIDs(mock.Anything, mock.Anything, mock.Anything). Return(nil, context.DeadlineExceeded).Maybe() - r := NewDARetriever(mockDA, cm, config.DefaultConfig(), genesis.Genesis{}, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, config.DefaultConfig(), genesis.Genesis{}) events, err := r.RetrieveFromDA(context.Background(), 42) @@ -157,15 +160,11 @@ func TestDARetriever_RetrieveFromDA_TimeoutFast(t *testing.T) { } func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) dataBin, data := makeSignedDataBytes(t, gen.ChainID, 2, addr, pub, signer, 2) hdrBin, _ := makeSignedHeaderBytes(t, gen.ChainID, 2, addr, pub, signer, nil, &data.Data, nil) @@ -186,14 +185,10 @@ func TestDARetriever_ProcessBlobs_HeaderAndData_Success(t *testing.T) { } func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) // Header with no data hash present should trigger empty data creation (per current logic) hb, _ := makeSignedHeaderBytes(t, gen.ChainID, 3, addr, pub, signer, nil, nil, nil) @@ -214,14 +209,10 @@ func TestDARetriever_ProcessBlobs_HeaderOnly_EmptyDataExpected(t *testing.T) { } func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) hb, sh := makeSignedHeaderBytes(t, gen.ChainID, 5, addr, pub, signer, nil, nil, nil) gotH := r.tryDecodeHeader(hb, 123) @@ -239,15 +230,11 @@ func TestDARetriever_TryDecodeHeaderAndData_Basic(t *testing.T) { } func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) goodAddr, pub, signer := buildSyncTestSigner(t) badAddr := []byte("not-the-proposer") gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: badAddr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) // Signed data is made by goodAddr; retriever expects badAddr -> should be rejected db, _ := makeSignedDataBytes(t, gen.ChainID, 7, goodAddr, pub, signer, 1) @@ -255,7 +242,7 @@ func TestDARetriever_tryDecodeData_InvalidSignatureOrProposer(t *testing.T) { } func TestDARetriever_validateBlobResponse(t *testing.T) { - r := &DARetriever{logger: zerolog.Nop()} + r := &daRetriever{logger: zerolog.Nop()} // StatusSuccess -> nil err := r.validateBlobResponse(coreda.ResultRetrieve{BaseResult: coreda.BaseResult{Code: coreda.StatusSuccess}}, 1) assert.NoError(t, err) @@ -269,10 +256,6 @@ func TestDARetriever_validateBlobResponse(t *testing.T) { } func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} @@ -300,7 +283,7 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { mockDA.EXPECT().Get(mock.Anything, mock.Anything, mock.MatchedBy(func(ns []byte) bool { return bytes.Equal(ns, namespaceDataBz) })). Return([][]byte{dataBin}, nil).Once() - r := NewDARetriever(mockDA, cm, cfg, gen, zerolog.Nop()) + r := newTestDARetriever(t, mockDA, cfg, gen) events, derr := r.RetrieveFromDA(context.Background(), 1234) require.NoError(t, derr) @@ -310,15 +293,11 @@ func TestDARetriever_RetrieveFromDA_TwoNamespaces_Success(t *testing.T) { } func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) // Create header and data for the same block height but from different DA heights dataBin, data := makeSignedDataBytes(t, gen.ChainID, 5, addr, pub, signer, 2) @@ -346,15 +325,11 @@ func TestDARetriever_ProcessBlobs_CrossDAHeightMatching(t *testing.T) { } func TestDARetriever_ProcessBlobs_MultipleHeadersCrossDAHeightMatching(t *testing.T) { - ds := dssync.MutexWrap(datastore.NewMapDatastore()) - st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) - require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) gen := genesis.Genesis{ChainID: "tchain", InitialHeight: 1, StartTime: time.Now().Add(-time.Second), ProposerAddress: addr} - r := NewDARetriever(nil, cm, config.DefaultConfig(), gen, zerolog.Nop()) + r := newTestDARetriever(t, nil, config.DefaultConfig(), gen) // Create multiple headers and data for different block heights data3Bin, data3 := makeSignedDataBytes(t, gen.ChainID, 3, addr, pub, signer, 1) diff --git a/block/internal/syncing/p2p_handler.go b/block/internal/syncing/p2p_handler.go index 84aa0a363b..d8c10bc4c3 100644 --- a/block/internal/syncing/p2p_handler.go +++ b/block/internal/syncing/p2p_handler.go @@ -15,6 +15,11 @@ import ( "github.com/evstack/ev-node/types" ) +type p2pHandler interface { + ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error + SetProcessedHeight(height uint64) +} + // P2PHandler coordinates block retrieval from P2P stores for the syncer. // It waits for both header and data to be available at a given height, // validates their consistency, and emits events to the syncer for processing. @@ -24,7 +29,7 @@ import ( type P2PHandler struct { headerStore goheader.Store[*types.SignedHeader] dataStore goheader.Store[*types.Data] - cache cache.Manager + cache cache.CacheManager genesis genesis.Genesis logger zerolog.Logger @@ -35,7 +40,7 @@ type P2PHandler struct { func NewP2PHandler( headerStore goheader.Store[*types.SignedHeader], dataStore goheader.Store[*types.Data], - cache cache.Manager, + cache cache.CacheManager, genesis genesis.Genesis, logger zerolog.Logger, ) *P2PHandler { @@ -50,7 +55,7 @@ func NewP2PHandler( // SetProcessedHeight updates the highest processed block height. func (h *P2PHandler) SetProcessedHeight(height uint64) { - for { + for range 1_000 { current := h.processedHeight.Load() if height <= current { return diff --git a/block/internal/syncing/p2p_handler_test.go b/block/internal/syncing/p2p_handler_test.go index bd9c4178af..dfab41faae 100644 --- a/block/internal/syncing/p2p_handler_test.go +++ b/block/internal/syncing/p2p_handler_test.go @@ -7,7 +7,6 @@ import ( "testing" "time" - ds "github.com/ipfs/go-datastore" "github.com/libp2p/go-libp2p/core/crypto" "github.com/rs/zerolog" "github.com/stretchr/testify/mock" @@ -19,7 +18,6 @@ import ( "github.com/evstack/ev-node/pkg/genesis" signerpkg "github.com/evstack/ev-node/pkg/signer" "github.com/evstack/ev-node/pkg/signer/noop" - storemocks "github.com/evstack/ev-node/test/mocks" extmocks "github.com/evstack/ev-node/test/mocks/external" "github.com/evstack/ev-node/types" ) @@ -61,7 +59,7 @@ type P2PTestData struct { Handler *P2PHandler HeaderStore *extmocks.MockStore[*types.SignedHeader] DataStore *extmocks.MockStore[*types.Data] - Cache cache.Manager + Cache cache.CacheManager Genesis genesis.Genesis ProposerAddr []byte ProposerPub crypto.PubKey @@ -78,17 +76,11 @@ func setupP2P(t *testing.T) *P2PTestData { headerStoreMock := extmocks.NewMockStore[*types.SignedHeader](t) dataStoreMock := extmocks.NewMockStore[*types.Data](t) - storeMock := storemocks.NewMockStore(t) - storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-header-height").Return(nil, ds.ErrNotFound).Maybe() - storeMock.EXPECT().GetMetadata(mock.Anything, "last-submitted-data-height").Return(nil, ds.ErrNotFound).Maybe() - storeMock.EXPECT().Height(mock.Anything).Return(uint64(0), nil).Maybe() - storeMock.EXPECT().SetMetadata(mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() - cfg := config.Config{ RootDir: t.TempDir(), ClearCache: true, } - cacheManager, err := cache.NewManager(cfg, storeMock, zerolog.Nop()) + cacheManager, err := cache.NewCacheManager(cfg, zerolog.Nop()) require.NoError(t, err, "failed to create cache manager") handler := NewP2PHandler(headerStoreMock, dataStoreMock, cacheManager, gen, zerolog.Nop()) diff --git a/block/internal/syncing/syncer.go b/block/internal/syncing/syncer.go index 9182618795..5aa4394c48 100644 --- a/block/internal/syncing/syncer.go +++ b/block/internal/syncing/syncer.go @@ -18,30 +18,21 @@ import ( "github.com/evstack/ev-node/block/internal/cache" "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" "github.com/evstack/ev-node/pkg/config" "github.com/evstack/ev-node/pkg/genesis" "github.com/evstack/ev-node/pkg/store" "github.com/evstack/ev-node/types" ) -type daRetriever interface { - RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) -} - -type p2pHandler interface { - ProcessHeight(ctx context.Context, height uint64, heightInCh chan<- common.DAHeightEvent) error - SetProcessedHeight(height uint64) -} - // Syncer handles block synchronization from DA and P2P sources. type Syncer struct { // Core components store store.Store exec coreexecutor.Executor - da coreda.DA // Shared components - cache cache.Manager + cache cache.CacheManager metrics *common.Metrics // Configuration @@ -52,7 +43,8 @@ type Syncer struct { // State management lastState *atomic.Pointer[types.State] - // DA retriever height + // DA retriever + daClient da.Client daRetrieverHeight *atomic.Uint64 // P2P stores @@ -64,7 +56,7 @@ type Syncer struct { errorCh chan<- error // Channel to report critical execution client failures // Handlers - daRetriever daRetriever + daRetriever DARetriever p2pHandler p2pHandler // Logging @@ -83,8 +75,8 @@ type Syncer struct { func NewSyncer( store store.Store, exec coreexecutor.Executor, - da coreda.DA, - cache cache.Manager, + daClient da.Client, + cache cache.CacheManager, metrics *common.Metrics, config config.Config, genesis genesis.Genesis, @@ -97,7 +89,7 @@ func NewSyncer( return &Syncer{ store: store, exec: exec, - da: da, + daClient: daClient, cache: cache, metrics: metrics, config: config, @@ -122,7 +114,7 @@ func (s *Syncer) Start(ctx context.Context) error { } // Initialize handlers - s.daRetriever = NewDARetriever(s.da, s.cache, s.config, s.genesis, s.logger) + s.daRetriever = NewDARetriever(s.daClient, s.cache, s.genesis, s.logger) s.p2pHandler = NewP2PHandler(s.headerStore.Store(), s.dataStore.Store(), s.cache, s.genesis, s.logger) if currentHeight, err := s.store.Height(s.ctx); err != nil { s.logger.Error().Err(err).Msg("failed to set initial processed height for p2p handler") @@ -135,7 +127,11 @@ func (s *Syncer) Start(ctx context.Context) error { } // Start main processing loop - go s.processLoop() + s.wg.Add(1) + go func() { + defer s.wg.Done() + s.processLoop() + }() // Start dedicated workers for DA, and pending processing s.startSyncWorkers() @@ -226,9 +222,6 @@ func (s *Syncer) initializeState() error { // processLoop is the main coordination loop for processing events func (s *Syncer) processLoop() { - s.wg.Add(1) - defer s.wg.Done() - s.logger.Info().Msg("starting process loop") defer s.logger.Info().Msg("process loop stopped") diff --git a/block/internal/syncing/syncer_backoff_test.go b/block/internal/syncing/syncer_backoff_test.go index 9d58d226ea..65f2586966 100644 --- a/block/internal/syncing/syncer_backoff_test.go +++ b/block/internal/syncing/syncer_backoff_test.go @@ -69,7 +69,7 @@ func TestSyncer_BackoffOnDAError(t *testing.T) { syncer.ctx = ctx // Setup mocks - daRetriever := newMockdaRetriever(t) + daRetriever := NewMockDARetriever(t) p2pHandler := newMockp2pHandler(t) p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever @@ -165,7 +165,7 @@ func TestSyncer_BackoffResetOnSuccess(t *testing.T) { addr, pub, signer := buildSyncTestSigner(t) gen := syncer.genesis - daRetriever := newMockdaRetriever(t) + daRetriever := NewMockDARetriever(t) p2pHandler := newMockp2pHandler(t) p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever @@ -256,7 +256,7 @@ func TestSyncer_BackoffBehaviorIntegration(t *testing.T) { syncer := setupTestSyncer(t, 500*time.Millisecond) syncer.ctx = ctx - daRetriever := newMockdaRetriever(t) + daRetriever := NewMockDARetriever(t) p2pHandler := newMockp2pHandler(t) p2pHandler.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() syncer.daRetriever = daRetriever @@ -325,7 +325,8 @@ func setupTestSyncer(t *testing.T, daBlockTime time.Duration) *Syncer { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(t, err) addr, _, _ := buildSyncTestSigner(t) diff --git a/block/internal/syncing/syncer_benchmark_test.go b/block/internal/syncing/syncer_benchmark_test.go index b1be8fa31d..e2b6f6e51f 100644 --- a/block/internal/syncing/syncer_benchmark_test.go +++ b/block/internal/syncing/syncer_benchmark_test.go @@ -70,7 +70,7 @@ func BenchmarkSyncerIO(b *testing.B) { type benchFixture struct { s *Syncer st store.Store - cm cache.Manager + cm cache.CacheManager cancel context.CancelFunc } @@ -80,7 +80,8 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(b, err) addr, pub, signer := buildSyncTestSigner(b) @@ -132,7 +133,7 @@ func newBenchFixture(b *testing.B, totalHeights uint64, shuffledTx bool, daDelay } // Mock DA retriever to emit exactly totalHeights events, then HFF and cancel - daR := newMockdaRetriever(b) + daR := NewMockDARetriever(b) for i := uint64(0); i < totalHeights; i++ { daHeight := i + daHeightOffset daR.On("RetrieveFromDA", mock.Anything, daHeight). diff --git a/block/internal/syncing/syncer_mock.go b/block/internal/syncing/syncer_mock.go index 6238a58364..aae45399e4 100644 --- a/block/internal/syncing/syncer_mock.go +++ b/block/internal/syncing/syncer_mock.go @@ -11,101 +11,6 @@ import ( mock "github.com/stretchr/testify/mock" ) -// newMockdaRetriever creates a new instance of mockdaRetriever. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. -// The first argument is typically a *testing.T value. -func newMockdaRetriever(t interface { - mock.TestingT - Cleanup(func()) -}) *mockdaRetriever { - mock := &mockdaRetriever{} - mock.Mock.Test(t) - - t.Cleanup(func() { mock.AssertExpectations(t) }) - - return mock -} - -// mockdaRetriever is an autogenerated mock type for the daRetriever type -type mockdaRetriever struct { - mock.Mock -} - -type mockdaRetriever_Expecter struct { - mock *mock.Mock -} - -func (_m *mockdaRetriever) EXPECT() *mockdaRetriever_Expecter { - return &mockdaRetriever_Expecter{mock: &_m.Mock} -} - -// RetrieveFromDA provides a mock function for the type mockdaRetriever -func (_mock *mockdaRetriever) RetrieveFromDA(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error) { - ret := _mock.Called(ctx, daHeight) - - if len(ret) == 0 { - panic("no return value specified for RetrieveFromDA") - } - - var r0 []common.DAHeightEvent - var r1 error - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) ([]common.DAHeightEvent, error)); ok { - return returnFunc(ctx, daHeight) - } - if returnFunc, ok := ret.Get(0).(func(context.Context, uint64) []common.DAHeightEvent); ok { - r0 = returnFunc(ctx, daHeight) - } else { - if ret.Get(0) != nil { - r0 = ret.Get(0).([]common.DAHeightEvent) - } - } - if returnFunc, ok := ret.Get(1).(func(context.Context, uint64) error); ok { - r1 = returnFunc(ctx, daHeight) - } else { - r1 = ret.Error(1) - } - return r0, r1 -} - -// mockdaRetriever_RetrieveFromDA_Call is a *mock.Call that shadows Run/Return methods with type explicit version for method 'RetrieveFromDA' -type mockdaRetriever_RetrieveFromDA_Call struct { - *mock.Call -} - -// RetrieveFromDA is a helper method to define mock.On call -// - ctx context.Context -// - daHeight uint64 -func (_e *mockdaRetriever_Expecter) RetrieveFromDA(ctx interface{}, daHeight interface{}) *mockdaRetriever_RetrieveFromDA_Call { - return &mockdaRetriever_RetrieveFromDA_Call{Call: _e.mock.On("RetrieveFromDA", ctx, daHeight)} -} - -func (_c *mockdaRetriever_RetrieveFromDA_Call) Run(run func(ctx context.Context, daHeight uint64)) *mockdaRetriever_RetrieveFromDA_Call { - _c.Call.Run(func(args mock.Arguments) { - var arg0 context.Context - if args[0] != nil { - arg0 = args[0].(context.Context) - } - var arg1 uint64 - if args[1] != nil { - arg1 = args[1].(uint64) - } - run( - arg0, - arg1, - ) - }) - return _c -} - -func (_c *mockdaRetriever_RetrieveFromDA_Call) Return(dAHeightEvents []common.DAHeightEvent, err error) *mockdaRetriever_RetrieveFromDA_Call { - _c.Call.Return(dAHeightEvents, err) - return _c -} - -func (_c *mockdaRetriever_RetrieveFromDA_Call) RunAndReturn(run func(ctx context.Context, daHeight uint64) ([]common.DAHeightEvent, error)) *mockdaRetriever_RetrieveFromDA_Call { - _c.Call.Return(run) - return _c -} - // newMockp2pHandler creates a new instance of mockp2pHandler. It also registers a testing interface on the mock and a cleanup function to assert the mocks expectations. // The first argument is typically a *testing.T value. func newMockp2pHandler(t interface { diff --git a/block/internal/syncing/syncer_test.go b/block/internal/syncing/syncer_test.go index b8ab0b892d..ac97da7343 100644 --- a/block/internal/syncing/syncer_test.go +++ b/block/internal/syncing/syncer_test.go @@ -103,7 +103,8 @@ func makeData(chainID string, height uint64, txs int) *types.Data { func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) @@ -151,7 +152,8 @@ func TestSyncer_validateBlock_DataHashMismatch(t *testing.T) { func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) @@ -205,7 +207,8 @@ func TestProcessHeightEvent_SyncsAndUpdatesState(t *testing.T) { func TestSequentialBlockSync(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(t, err) addr, pub, signer := buildSyncTestSigner(t) @@ -277,7 +280,8 @@ func TestSequentialBlockSync(t *testing.T) { func TestSyncer_processPendingEvents(t *testing.T) { ds := dssync.MutexWrap(datastore.NewMapDatastore()) st := store.New(ds) - cm, err := cache.NewManager(config.DefaultConfig(), st, zerolog.Nop()) + + cm, err := cache.NewCacheManager(config.DefaultConfig(), zerolog.Nop()) require.NoError(t, err) // current height 1 @@ -323,7 +327,7 @@ func TestSyncLoopPersistState(t *testing.T) { cfg.RootDir = t.TempDir() cfg.ClearCache = true - cacheMgr, err := cache.NewManager(cfg, st, zerolog.Nop()) + cacheMgr, err := cache.NewCacheManager(cfg, zerolog.Nop()) require.NoError(t, err) const myDAHeightOffset = uint64(1) @@ -366,7 +370,7 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel := context.WithCancel(t.Context()) syncerInst1.ctx = ctx - daRtrMock, p2pHndlMock := newMockdaRetriever(t), newMockp2pHandler(t) + daRtrMock, p2pHndlMock := NewMockDARetriever(t), newMockp2pHandler(t) p2pHndlMock.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() p2pHndlMock.On("SetProcessedHeight", mock.Anything).Return().Maybe() syncerInst1.daRetriever, syncerInst1.p2pHandler = daRtrMock, p2pHndlMock @@ -433,7 +437,7 @@ func TestSyncLoopPersistState(t *testing.T) { require.Nil(t, event, "event at height %d should have been removed", blockHeight) } // and when new instance is up on restart - cacheMgr, err = cache.NewManager(cfg, st, zerolog.Nop()) + cacheMgr, err = cache.NewCacheManager(cfg, zerolog.Nop()) require.NoError(t, err) require.NoError(t, cacheMgr.LoadFromDisk()) @@ -456,7 +460,7 @@ func TestSyncLoopPersistState(t *testing.T) { ctx, cancel = context.WithCancel(t.Context()) t.Cleanup(cancel) syncerInst2.ctx = ctx - daRtrMock, p2pHndlMock = newMockdaRetriever(t), newMockp2pHandler(t) + daRtrMock, p2pHndlMock = NewMockDARetriever(t), newMockp2pHandler(t) p2pHndlMock.On("ProcessHeight", mock.Anything, mock.Anything, mock.Anything).Return(nil).Maybe() p2pHndlMock.On("SetProcessedHeight", mock.Anything).Return().Maybe() syncerInst2.daRetriever, syncerInst2.p2pHandler = daRtrMock, p2pHndlMock diff --git a/block/public.go b/block/public.go index 8bfc4c1674..f084f2757f 100644 --- a/block/public.go +++ b/block/public.go @@ -1,6 +1,14 @@ package block -import "github.com/evstack/ev-node/block/internal/common" +import ( + "time" + + "github.com/evstack/ev-node/block/internal/common" + "github.com/evstack/ev-node/block/internal/da" + coreda "github.com/evstack/ev-node/core/da" + "github.com/evstack/ev-node/pkg/config" + "github.com/rs/zerolog" +) // BlockOptions defines the options for creating block components type BlockOptions = common.BlockOptions @@ -22,3 +30,21 @@ func PrometheusMetrics(namespace string, labelsAndValues ...string) *Metrics { func NopMetrics() *Metrics { return common.NopMetrics() } + +// DAClient is the interface representing the DA client for public use. +type DAClient = da.Client + +// NewDAClient creates a new DA client with configuration +func NewDAClient( + daLayer coreda.DA, + config config.Config, + logger zerolog.Logger, +) DAClient { + return da.NewClient(da.Config{ + DA: daLayer, + Logger: logger, + DefaultTimeout: 10 * time.Second, + Namespace: config.DA.GetNamespace(), + DataNamespace: config.DA.GetDataNamespace(), + }) +} diff --git a/go.mod b/go.mod index 7dc31c9e4c..e1495d2ece 100644 --- a/go.mod +++ b/go.mod @@ -30,6 +30,7 @@ require ( golang.org/x/net v0.47.0 golang.org/x/sync v0.18.0 google.golang.org/protobuf v1.36.10 + gotest.tools/v3 v3.5.2 ) require ( @@ -53,6 +54,7 @@ require ( github.com/gogo/protobuf v1.3.2 // indirect github.com/golang/groupcache v0.0.0-20241129210726-2c02b8208cf8 // indirect github.com/google/flatbuffers v24.12.23+incompatible // indirect + github.com/google/go-cmp v0.7.0 // indirect github.com/google/gopacket v1.1.19 // indirect github.com/google/uuid v1.6.0 // indirect github.com/gopherjs/gopherjs v0.0.0-20190812055157-5d271430af9f // indirect diff --git a/go.sum b/go.sum index df8fd67995..120f8e51d1 100644 --- a/go.sum +++ b/go.sum @@ -651,6 +651,8 @@ gopkg.in/yaml.v2 v2.4.0/go.mod h1:RDklbk79AGWmwhnvt/jBztapEOGDOx6ZbXqjP6csGnQ= gopkg.in/yaml.v3 v3.0.0-20200313102051-9f266ea9e77c/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= gopkg.in/yaml.v3 v3.0.1 h1:fxVm/GzAzEWqLHuvctI91KS9hhNmmWOoWu0XTYJS7CA= gopkg.in/yaml.v3 v3.0.1/go.mod h1:K4uyk7z7BCEPqu6E+C64Yfv1cQ7kz7rIZviUmN+EgEM= +gotest.tools/v3 v3.5.2 h1:7koQfIKdy+I8UTetycgUqXWSDwpgv193Ka+qRsmBY8Q= +gotest.tools/v3 v3.5.2/go.mod h1:LtdLGcnqToBH83WByAAi/wiwSFCArdFIUV/xxN4pcjA= grpc.go4.org v0.0.0-20170609214715-11d0a25b4919/go.mod h1:77eQGdRu53HpSqPFJFmuJdjuHRquDANNeA4x7B8WQ9o= honnef.co/go/tools v0.0.0-20180728063816-88497007e858/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= honnef.co/go/tools v0.0.0-20190102054323-c2f93a96b099/go.mod h1:rf3lG4BRIbNafJWhAfAdb/ePZxsR/4RtNHQocxwk9r4= diff --git a/types/da.go b/types/da.go deleted file mode 100644 index e0d58710d9..0000000000 --- a/types/da.go +++ /dev/null @@ -1,212 +0,0 @@ -package types - -import ( - "context" - "errors" - "fmt" - "strings" - "time" - - "github.com/rs/zerolog" - - coreda "github.com/evstack/ev-node/core/da" -) - -// SubmitWithHelpers performs blob submission using the underlying DA layer, -// handling error mapping to produce a ResultSubmit. -// It assumes blob size filtering is handled within the DA implementation's Submit. -// It mimics the logic previously found in da.DAClient.Submit. -func SubmitWithHelpers( - ctx context.Context, - da coreda.DA, // Use the core DA interface - logger zerolog.Logger, - data [][]byte, - gasPrice float64, - namespace []byte, - options []byte, -) coreda.ResultSubmit { // Return core ResultSubmit type - ids, err := da.SubmitWithOptions(ctx, data, gasPrice, namespace, options) - - // calculate blob size - var blobSize uint64 - for _, blob := range data { - blobSize += uint64(len(blob)) - } - - // Handle errors returned by Submit - if err != nil { - if errors.Is(err, context.Canceled) { - logger.Debug().Msg("DA submission canceled via helper due to context cancellation") - return coreda.ResultSubmit{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusContextCanceled, - Message: "submission canceled", - IDs: ids, - BlobSize: blobSize, - }, - } - } - status := coreda.StatusError - switch { - case errors.Is(err, coreda.ErrTxTimedOut): - status = coreda.StatusNotIncludedInBlock - case errors.Is(err, coreda.ErrTxAlreadyInMempool): - status = coreda.StatusAlreadyInMempool - case errors.Is(err, coreda.ErrTxIncorrectAccountSequence): - status = coreda.StatusIncorrectAccountSequence - case errors.Is(err, coreda.ErrBlobSizeOverLimit): - status = coreda.StatusTooBig - case errors.Is(err, coreda.ErrContextDeadline): - status = coreda.StatusContextDeadline - } - - // Use debug level for StatusTooBig as it gets handled later in submitToDA through recursive splitting - if status == coreda.StatusTooBig { - logger.Debug().Err(err).Uint64("status", uint64(status)).Msg("DA submission failed via helper") - } else { - logger.Error().Err(err).Uint64("status", uint64(status)).Msg("DA submission failed via helper") - } - return coreda.ResultSubmit{ - BaseResult: coreda.BaseResult{ - Code: status, - Message: "failed to submit blobs: " + err.Error(), - IDs: ids, - SubmittedCount: uint64(len(ids)), - Height: 0, - Timestamp: time.Now(), - BlobSize: blobSize, - }, - } - } - - if len(ids) == 0 && len(data) > 0 { - logger.Warn().Msg("DA submission via helper returned no IDs for non-empty input data") - return coreda.ResultSubmit{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusError, - Message: "failed to submit blobs: no IDs returned despite non-empty input", - }, - } - } - - // Get height from the first ID - var height uint64 - if len(ids) > 0 { - height, _, err = coreda.SplitID(ids[0]) - if err != nil { - logger.Error().Err(err).Msg("failed to split ID") - } - } - - logger.Debug().Int("num_ids", len(ids)).Msg("DA submission successful via helper") - return coreda.ResultSubmit{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusSuccess, - IDs: ids, - SubmittedCount: uint64(len(ids)), - Height: height, - BlobSize: blobSize, - Timestamp: time.Now(), - }, - } -} - -// RetrieveWithHelpers performs blob retrieval using the underlying DA layer, -// handling error mapping to produce a ResultRetrieve. -// It mimics the logic previously found in da.DAClient.Retrieve. -// requestTimeout defines the timeout for the each retrieval request. -func RetrieveWithHelpers( - ctx context.Context, - da coreda.DA, - logger zerolog.Logger, - dataLayerHeight uint64, - namespace []byte, - requestTimeout time.Duration, -) coreda.ResultRetrieve { - // 1. Get IDs - getIDsCtx, cancel := context.WithTimeout(ctx, requestTimeout) - defer cancel() - idsResult, err := da.GetIDs(getIDsCtx, dataLayerHeight, namespace) - if err != nil { - // Handle specific "not found" error - if strings.Contains(err.Error(), coreda.ErrBlobNotFound.Error()) { - logger.Debug().Uint64("height", dataLayerHeight).Msg("Retrieve helper: Blobs not found at height") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusNotFound, - Message: coreda.ErrBlobNotFound.Error(), - Height: dataLayerHeight, - Timestamp: time.Now(), - }, - } - } - if strings.Contains(err.Error(), coreda.ErrHeightFromFuture.Error()) { - logger.Debug().Uint64("height", dataLayerHeight).Msg("Retrieve helper: Blobs not found at height") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusHeightFromFuture, - Message: coreda.ErrHeightFromFuture.Error(), - Height: dataLayerHeight, - Timestamp: time.Now(), - }, - } - } - // Handle other errors during GetIDs - logger.Error().Uint64("height", dataLayerHeight).Err(err).Msg("Retrieve helper: Failed to get IDs") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusError, - Message: fmt.Sprintf("failed to get IDs: %s", err.Error()), - Height: dataLayerHeight, - Timestamp: time.Now(), - }, - } - } - - // This check should technically be redundant if GetIDs correctly returns ErrBlobNotFound - if idsResult == nil || len(idsResult.IDs) == 0 { - logger.Debug().Uint64("height", dataLayerHeight).Msg("Retrieve helper: No IDs found at height") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusNotFound, - Message: coreda.ErrBlobNotFound.Error(), - Height: dataLayerHeight, - Timestamp: time.Now(), - }, - } - } - // 2. Get Blobs using the retrieved IDs in batches - batchSize := 100 - blobs := make([][]byte, 0, len(idsResult.IDs)) - for i := 0; i < len(idsResult.IDs); i += batchSize { - end := min(i+batchSize, len(idsResult.IDs)) - - getBlobsCtx, cancel := context.WithTimeout(ctx, requestTimeout) - batchBlobs, err := da.Get(getBlobsCtx, idsResult.IDs[i:end], namespace) - cancel() - if err != nil { - // Handle errors during Get - logger.Error().Uint64("height", dataLayerHeight).Int("num_ids", len(idsResult.IDs)).Err(err).Msg("Retrieve helper: Failed to get blobs") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusError, - Message: fmt.Sprintf("failed to get blobs for batch %d-%d: %s", i, end-1, err.Error()), - Height: dataLayerHeight, - Timestamp: time.Now(), - }, - } - } - blobs = append(blobs, batchBlobs...) - } - // Success - logger.Debug().Uint64("height", dataLayerHeight).Int("num_blobs", len(blobs)).Msg("Retrieve helper: Successfully retrieved blobs") - return coreda.ResultRetrieve{ - BaseResult: coreda.BaseResult{ - Code: coreda.StatusSuccess, - Height: dataLayerHeight, - IDs: idsResult.IDs, - Timestamp: idsResult.Timestamp, - }, - Data: blobs, - } -} diff --git a/types/da_test.go b/types/da_test.go deleted file mode 100644 index 4a111499dc..0000000000 --- a/types/da_test.go +++ /dev/null @@ -1,298 +0,0 @@ -package types_test - -import ( - "context" - "errors" - "testing" - "time" - - "github.com/rs/zerolog" - "github.com/stretchr/testify/assert" - "github.com/stretchr/testify/mock" - - coreda "github.com/evstack/ev-node/core/da" - "github.com/evstack/ev-node/test/mocks" - "github.com/evstack/ev-node/types" -) - -func TestSubmitWithHelpers(t *testing.T) { - logger := zerolog.Nop() - - testCases := []struct { - name string - data [][]byte - gasPrice float64 - options []byte - submitErr error - submitIDs [][]byte - expectedCode coreda.StatusCode - expectedErrMsg string - expectedIDs [][]byte - expectedCount uint64 - }{ - { - name: "successful submission", - data: [][]byte{[]byte("blob1"), []byte("blob2")}, - gasPrice: 1.0, - options: []byte("opts"), - submitIDs: [][]byte{[]byte("id1"), []byte("id2")}, - expectedCode: coreda.StatusSuccess, - expectedIDs: [][]byte{[]byte("id1"), []byte("id2")}, - expectedCount: 2, - }, - { - name: "context canceled error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: context.Canceled, - expectedCode: coreda.StatusContextCanceled, - expectedErrMsg: "submission canceled", - }, - { - name: "tx timed out error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: coreda.ErrTxTimedOut, - expectedCode: coreda.StatusNotIncludedInBlock, - expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxTimedOut.Error(), - }, - { - name: "tx already in mempool error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: coreda.ErrTxAlreadyInMempool, - expectedCode: coreda.StatusAlreadyInMempool, - expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxAlreadyInMempool.Error(), - }, - { - name: "incorrect account sequence error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: coreda.ErrTxIncorrectAccountSequence, - expectedCode: coreda.StatusIncorrectAccountSequence, - expectedErrMsg: "failed to submit blobs: " + coreda.ErrTxIncorrectAccountSequence.Error(), - }, - { - name: "blob size over limit error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: coreda.ErrBlobSizeOverLimit, - expectedCode: coreda.StatusTooBig, - expectedErrMsg: "failed to submit blobs: " + coreda.ErrBlobSizeOverLimit.Error(), - }, - { - name: "context deadline error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: coreda.ErrContextDeadline, - expectedCode: coreda.StatusContextDeadline, - expectedErrMsg: "failed to submit blobs: " + coreda.ErrContextDeadline.Error(), - }, - { - name: "generic submission error", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitErr: errors.New("some generic error"), - expectedCode: coreda.StatusError, - expectedErrMsg: "failed to submit blobs: some generic error", - }, - { - name: "no IDs returned for non-empty data", - data: [][]byte{[]byte("blob1")}, - gasPrice: 1.0, - options: []byte("opts"), - submitIDs: [][]byte{}, - expectedCode: coreda.StatusError, - expectedErrMsg: "failed to submit blobs: no IDs returned despite non-empty input", - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockDA := mocks.NewMockDA(t) - encodedNamespace := coreda.NamespaceFromString("test-namespace") - - mockDA.On("SubmitWithOptions", mock.Anything, tc.data, tc.gasPrice, encodedNamespace.Bytes(), tc.options).Return(tc.submitIDs, tc.submitErr) - - result := types.SubmitWithHelpers(context.Background(), mockDA, logger, tc.data, tc.gasPrice, encodedNamespace.Bytes(), tc.options) - - assert.Equal(t, tc.expectedCode, result.Code) - if tc.expectedErrMsg != "" { - assert.Contains(t, result.Message, tc.expectedErrMsg) - } - if tc.expectedIDs != nil { - assert.Equal(t, tc.expectedIDs, result.IDs) - } - if tc.expectedCount != 0 { - assert.Equal(t, tc.expectedCount, result.SubmittedCount) - } - mockDA.AssertExpectations(t) - }) - } -} - -func TestRetrieveWithHelpers(t *testing.T) { - logger := zerolog.Nop() - dataLayerHeight := uint64(100) - mockIDs := [][]byte{[]byte("id1"), []byte("id2")} - mockBlobs := [][]byte{[]byte("blobA"), []byte("blobB")} - mockTimestamp := time.Now() - - testCases := []struct { - name string - getIDsResult *coreda.GetIDsResult - getIDsErr error - getBlobsErr error - expectedCode coreda.StatusCode - expectedErrMsg string - expectedIDs [][]byte - expectedData [][]byte - expectedHeight uint64 - }{ - { - name: "successful retrieval", - getIDsResult: &coreda.GetIDsResult{ - IDs: mockIDs, - Timestamp: mockTimestamp, - }, - expectedCode: coreda.StatusSuccess, - expectedIDs: mockIDs, - expectedData: mockBlobs, - expectedHeight: dataLayerHeight, - }, - { - name: "blob not found error during GetIDs", - getIDsErr: coreda.ErrBlobNotFound, - expectedCode: coreda.StatusNotFound, - expectedErrMsg: coreda.ErrBlobNotFound.Error(), - expectedHeight: dataLayerHeight, - }, - { - name: "height from future error during GetIDs", - getIDsErr: coreda.ErrHeightFromFuture, - expectedCode: coreda.StatusHeightFromFuture, - expectedErrMsg: coreda.ErrHeightFromFuture.Error(), - expectedHeight: dataLayerHeight, - }, - { - name: "generic error during GetIDs", - getIDsErr: errors.New("failed to connect to DA"), - expectedCode: coreda.StatusError, - expectedErrMsg: "failed to get IDs: failed to connect to DA", - expectedHeight: dataLayerHeight, - }, - { - name: "GetIDs returns nil result", - getIDsResult: nil, - expectedCode: coreda.StatusNotFound, - expectedErrMsg: coreda.ErrBlobNotFound.Error(), - expectedHeight: dataLayerHeight, - }, - { - name: "GetIDs returns empty IDs", - getIDsResult: &coreda.GetIDsResult{ - IDs: [][]byte{}, - Timestamp: mockTimestamp, - }, - expectedCode: coreda.StatusNotFound, - expectedErrMsg: coreda.ErrBlobNotFound.Error(), - expectedHeight: dataLayerHeight, - }, - { - name: "error during Get (blobs retrieval)", - getIDsResult: &coreda.GetIDsResult{ - IDs: mockIDs, - Timestamp: mockTimestamp, - }, - getBlobsErr: errors.New("network error during blob retrieval"), - expectedCode: coreda.StatusError, - expectedErrMsg: "failed to get blobs for batch 0-1: network error during blob retrieval", - expectedHeight: dataLayerHeight, - }, - } - - for _, tc := range testCases { - t.Run(tc.name, func(t *testing.T) { - mockDA := mocks.NewMockDA(t) - encodedNamespace := coreda.NamespaceFromString("test-namespace") - - mockDA.On("GetIDs", mock.Anything, dataLayerHeight, mock.Anything).Return(tc.getIDsResult, tc.getIDsErr) - - if tc.getIDsErr == nil && tc.getIDsResult != nil && len(tc.getIDsResult.IDs) > 0 { - mockDA.On("Get", mock.Anything, tc.getIDsResult.IDs, mock.Anything).Return(mockBlobs, tc.getBlobsErr) - } - - result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 5*time.Second) - - assert.Equal(t, tc.expectedCode, result.Code) - assert.Equal(t, tc.expectedHeight, result.Height) - if tc.expectedErrMsg != "" { - assert.Contains(t, result.Message, tc.expectedErrMsg) - } - if tc.expectedIDs != nil { - assert.Equal(t, tc.expectedIDs, result.IDs) - } - if tc.expectedData != nil { - assert.Equal(t, tc.expectedData, result.Data) - } - mockDA.AssertExpectations(t) - }) - } -} - -func TestRetrieveWithHelpers_Timeout(t *testing.T) { - logger := zerolog.Nop() - dataLayerHeight := uint64(100) - encodedNamespace := coreda.NamespaceFromString("test-namespace") - - t.Run("timeout during GetIDs", func(t *testing.T) { - mockDA := mocks.NewMockDA(t) - - // Mock GetIDs to block until context is cancelled - mockDA.On("GetIDs", mock.Anything, dataLayerHeight, mock.Anything).Run(func(args mock.Arguments) { - ctx := args.Get(0).(context.Context) - <-ctx.Done() // Wait for context cancellation - }).Return(nil, context.DeadlineExceeded) - - // Use a very short timeout to ensure it triggers - result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 1*time.Millisecond) - - assert.Equal(t, coreda.StatusError, result.Code) - assert.Contains(t, result.Message, "failed to get IDs") - assert.Contains(t, result.Message, "context deadline exceeded") - mockDA.AssertExpectations(t) - }) - - t.Run("timeout during Get", func(t *testing.T) { - mockDA := mocks.NewMockDA(t) - mockIDs := [][]byte{[]byte("id1")} - mockTimestamp := time.Now() - - // Mock GetIDs to succeed - mockDA.On("GetIDs", mock.Anything, dataLayerHeight, mock.Anything).Return(&coreda.GetIDsResult{ - IDs: mockIDs, - Timestamp: mockTimestamp, - }, nil) - - // Mock Get to block until context is cancelled - mockDA.On("Get", mock.Anything, mockIDs, mock.Anything).Run(func(args mock.Arguments) { - ctx := args.Get(0).(context.Context) - <-ctx.Done() // Wait for context cancellation - }).Return(nil, context.DeadlineExceeded) - - // Use a very short timeout to ensure it triggers - result := types.RetrieveWithHelpers(context.Background(), mockDA, logger, dataLayerHeight, encodedNamespace.Bytes(), 1*time.Millisecond) - - assert.Equal(t, coreda.StatusError, result.Code) - assert.Contains(t, result.Message, "failed to get blobs for batch") - assert.Contains(t, result.Message, "context deadline exceeded") - mockDA.AssertExpectations(t) - }) -}