Skip to content
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
2 changes: 1 addition & 1 deletion aggregator/pkg/subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,7 @@ func (agg *Aggregator) SubscribeToNewTasks() *chainio.ErrorPair {
}

func (agg *Aggregator) subscribeToNewTasks() *chainio.ErrorPair {
errorPair := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber)
errorPair := agg.avsSubscriber.SubscribeToNewTasksV3(agg.NewBatchChan, agg.taskSubscriber, agg.AggregatorConfig.Aggregator.PollLatestBatchInterval)

if errorPair != nil {
agg.AggregatorConfig.BaseConfig.Logger.Info("Failed to create task subscriber", "err", errorPair)
Expand Down
1 change: 1 addition & 0 deletions config-files/config-aggregator-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -38,3 +38,4 @@ aggregator:
# The Gas formula is percentage (gas_base_bump_percentage + gas_bump_incremental_percentage * i) / 100) is checked against this value
# If it is higher, it will default to `gas_bump_percentage_limit`
time_to_wait_before_bump: 72s # The time to wait for the receipt when responding to task. Suggested value 72 seconds (6 blocks)
poll_latest_batch_interval: 20s # The interval to poll for latest batches. Default: 20s
1 change: 1 addition & 0 deletions config-files/config-aggregator-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -34,3 +34,4 @@ aggregator:
gas_base_bump_percentage: 10 # How much to bump gas price when responding to task. Suggested value 10%
gas_bump_incremental_percentage: 2 # An extra percentage to bump every retry i*2 when responding to task. Suggested value 2%
time_to_wait_before_bump: 36s # The time to wait for the receipt when responding to task. Suggested value 36 seconds (3 blocks)
poll_latest_batch_interval: 20s # The interval to poll for latest batches. Default: 20s
19 changes: 1 addition & 18 deletions config-files/config-aggregator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -45,21 +45,4 @@ aggregator:
# The Gas formula is percentage (gas_base_bump_percentage + gas_bump_incremental_percentage * i) / 100) is checked against this value
# If it is higher, it will default to `gas_bump_percentage_limit`
time_to_wait_before_bump: 72s # The time to wait for the receipt when responding to task. Suggested value 72 seconds (6 blocks)

## Operator Configurations
# operator:
# aggregator_rpc_server_ip_port_address: localhost:8090
# address: '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266'
# earnings_receiver_address: '0xf39Fd6e51aad88F6F4ce6aB8827279cffFb92266'
# delegation_approver_address: "0x0000000000000000000000000000000000000000"
# staker_opt_out_window_blocks: 0
# metadata_url: "https://yetanotherco.github.io/operator_metadata/metadata.json"
# enable_metrics: true
# metrics_ip_port_address: localhost:9092
# max_batch_size: 268435456 # 256 MiB
# # Operators variables needed for register it in EigenLayer
# el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
# private_key_store_path: config-files/anvil.ecdsa.key.json
# bls_private_key_store_path: config-files/anvil.bls.key.json
# signer_type: local_keystore
# chain_id: 31337
poll_latest_batch_interval: 20s # The interval to poll for latest batches. Default: 20s
1 change: 1 addition & 0 deletions config-files/config-operator-1-ethereum-package.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator-1.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-1.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator-1.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-2.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ operator:
metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json'
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator-2.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-3.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ operator:
metadata_url: 'https://yetanotherco.github.io/operator_metadata/metadata.json'
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator-3.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified

# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: '0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9'
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-docker.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: config-files/operator.last_processed_batch.json
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified
# Operators variables needed for register it in EigenLayer
el_delegation_manager_address: "0xCf7Ed3AccA5a467e9e704C703E8D87F634fB0Fc9"
private_key_store_path: config-files/anvil.ecdsa.key.json
Expand Down
1 change: 1 addition & 0 deletions config-files/config-operator-holesky.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified
1 change: 1 addition & 0 deletions config-files/config-operator-mainnet.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified
1 change: 1 addition & 0 deletions config-files/config-operator.yaml
Original file line number Diff line number Diff line change
Expand Up @@ -32,3 +32,4 @@ operator:
metrics_ip_port_address: localhost:9092
max_batch_size: 268435456 # 256 MiB
last_processed_batch_filepath: 'config-files/operator.last_processed_batch.json'
poll_latest_batch_interval: 20s # Optional: The interval to poll for latest batches. Default: 20s if not specified
158 changes: 2 additions & 156 deletions core/chainio/avs_subscriber.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,7 +24,6 @@ const (
MaxRetries = 100
RetryInterval = 1 * time.Second
BlockInterval uint64 = 1000
PollLatestBatchInterval = 5 * time.Second
RemoveBatchFromSetInterval = 5 * time.Minute
)

Expand Down Expand Up @@ -68,84 +67,7 @@ func NewAvsSubscriberFromConfig(baseConfig *config.BaseConfig) (*AvsSubscriber,
}, nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV2(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, errorPairChannel chan ErrorPair) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2)

// Subscribe to new tasks
sub, errMain := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errMain", fmt.Sprintf("%v", errMain))
}

subFallback, errFallback := SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback != nil {
s.logger.Error(fmt.Sprintf("Fallback failed to subscribe to new AlignedLayer V3 tasks after %d retries", MaxRetries), "errFallback", fmt.Sprintf("%v", errFallback))
}

if errMain != nil && errFallback != nil {
s.logger.Error("Failed to subscribe to new AlignedLayer V2 tasks with both RPCs", "errMain", errMain, "errFallback", errFallback)
return &ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}

s.logger.Info("Subscribed to new AlignedLayer V2 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)

// Forward the new tasks to the provided channel
go func() {
defer pollLatestBatchTicker.Stop()
newBatchMutex := &sync.Mutex{}
batchesSet := make(map[[32]byte]struct{})
for {
select {
case newBatch := <-internalChannel:
s.processNewBatchV2(newBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
case <-pollLatestBatchTicker.C:
latestBatch, err := s.getLatestNotRespondedTaskFromEthereumV2()
if err != nil {
s.logger.Debug("Failed to get latest task from blockchain", "err", err)
continue
}
if latestBatch != nil {
s.processNewBatchV2(latestBatch, batchesSet, newBatchMutex, newTaskCreatedChan)
}
}
}

}()

// Handle errors and resubscribe
go func() {
var errMain, errFallback error
var auxSub, auxSubFallback event.Subscription
for errMain == nil || errFallback == nil { //while one is active
select {
case err := <-sub.Err():
s.logger.Warn("Error in new task subscription of main connection", "err", err)

auxSub, errMain = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManager, internalChannel, nil, retry.NetworkRetryParams())
if errMain == nil {
sub = auxSub // update the subscription only if it was successful
s.logger.Info("Main connection resubscribed to new task subscription")
}
case err := <-subFallback.Err():
s.logger.Warn("Error in new task subscription of fallback connection", "err", err)

auxSubFallback, errFallback = SubscribeToNewTasksV2Retryable(&bind.WatchOpts{}, s.AvsContractBindings.ServiceManagerFallback, internalChannel, nil, retry.NetworkRetryParams())
if errFallback == nil {
subFallback = auxSubFallback // update the subscription only if it was successful
s.logger.Info("Resubscribed to fallback new task subscription")
}
}
}
errorPairChannel <- ErrorPair{ErrorMainRPC: errMain, ErrorFallbackRPC: errFallback}
}()

return nil
}

func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair) *ErrorPair {
func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, errorPairChannel chan ErrorPair, pollInterval time.Duration) *ErrorPair {
// Create a new channel to receive new tasks
internalChannel := make(chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3)

Expand All @@ -168,7 +90,7 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema

s.logger.Info("Subscribed to new AlignedLayer V3 tasks")

pollLatestBatchTicker := time.NewTicker(PollLatestBatchInterval)
pollLatestBatchTicker := time.NewTicker(pollInterval)

// Forward the new tasks to the provided channel
go func() {
Expand Down Expand Up @@ -224,32 +146,6 @@ func (s *AvsSubscriber) SubscribeToNewTasksV3(newTaskCreatedChan chan *servicema
return nil
}

func (s *AvsSubscriber) processNewBatchV2(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2) {
newBatchMutex.Lock()
defer newBatchMutex.Unlock()

batchIdentifier := append(batch.BatchMerkleRoot[:], batch.SenderAddress[:]...)
var batchIdentifierHash = *(*[32]byte)(crypto.Keccak256(batchIdentifier))

if _, ok := batchesSet[batchIdentifierHash]; !ok {
s.logger.Info("Received new task",
"batchMerkleRoot", hex.EncodeToString(batch.BatchMerkleRoot[:]),
"senderAddress", hex.EncodeToString(batch.SenderAddress[:]),
"batchIdentifierHash", hex.EncodeToString(batchIdentifierHash[:]))

batchesSet[batchIdentifierHash] = struct{}{}
newTaskCreatedChan <- batch

// Remove the batch from the set after RemoveBatchFromSetInterval time
go func() {
time.Sleep(RemoveBatchFromSetInterval)
newBatchMutex.Lock()
delete(batchesSet, batchIdentifierHash)
newBatchMutex.Unlock()
}()
}
}

func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, batchesSet map[[32]byte]struct{}, newBatchMutex *sync.Mutex, newTaskCreatedChan chan<- *servicemanager.ContractAlignedLayerServiceManagerNewBatchV3) {
newBatchMutex.Lock()
defer newBatchMutex.Unlock()
Expand All @@ -276,56 +172,6 @@ func (s *AvsSubscriber) processNewBatchV3(batch *servicemanager.ContractAlignedL
}
}

// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV2() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2, error) {

latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
if err != nil {
return nil, err
}

var fromBlock uint64

if latestBlock < BlockInterval {
fromBlock = 0
} else {
fromBlock = latestBlock - BlockInterval
}

logs, err := s.FilterBatchV2Retryable(&bind.FilterOpts{Start: fromBlock, End: nil, Context: context.Background()}, nil, retry.NetworkRetryParams())
if err != nil {
return nil, err
}

var lastLog *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2

// Iterate over the logs until the end
for logs.Next() {
lastLog = logs.Event
}

if err := logs.Error(); err != nil {
return nil, err
}

if lastLog == nil {
return nil, nil
}

batchIdentifier := append(lastLog.BatchMerkleRoot[:], lastLog.SenderAddress[:]...)
batchIdentifierHash := *(*[32]byte)(crypto.Keccak256(batchIdentifier))
state, err := s.BatchesStateRetryable(nil, batchIdentifierHash, retry.NetworkRetryParams())
if err != nil {
return nil, err
}

if state.Responded {
return nil, nil
}

return lastLog, nil
}

// getLatestNotRespondedTaskFromEthereum queries the blockchain for the latest not responded task using the FilterNewBatch method.
func (s *AvsSubscriber) getLatestNotRespondedTaskFromEthereumV3() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3, error) {
latestBlock, err := s.BlockNumberRetryable(context.Background(), retry.NetworkRetryParams())
Expand Down
37 changes: 2 additions & 35 deletions core/chainio/retryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -126,30 +126,17 @@ func (s *AvsSubscriber) BlockNumberRetryable(ctx context.Context, config *retry.
return retry.RetryWithData(latestBlock_func, config)
}

/*
FilterBatchV2Retryable
Get NewBatchV2 logs from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
*/
func (s *AvsSubscriber) FilterBatchV2Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte, config *retry.RetryParams) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) {
filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV2Iterator, error) {
return s.AvsContractBindings.ServiceManager.FilterNewBatchV2(opts, batchMerkleRoot)
}
return retry.RetryWithData(filterNewBatchV2_func, config)
}

/*
FilterBatchV3Retryable
Get NewBatchV3 logs from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
*/
func (s *AvsSubscriber) FilterBatchV3Retryable(opts *bind.FilterOpts, batchMerkleRoot [][32]byte, config *retry.RetryParams) (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
filterNewBatchV2_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
filterNewBatchV3_func := func() (*servicemanager.ContractAlignedLayerServiceManagerNewBatchV3Iterator, error) {
return s.AvsContractBindings.ServiceManager.FilterNewBatchV3(opts, batchMerkleRoot)
}
return retry.RetryWithData(filterNewBatchV2_func, config)
return retry.RetryWithData(filterNewBatchV3_func, config)
}

/*
Expand Down Expand Up @@ -193,26 +180,6 @@ func (s *AvsSubscriber) SubscribeNewHeadRetryable(ctx context.Context, c chan<-
return retry.RetryWithData(subscribeNewHead_func, config)
}

/*
SubscribeToNewTasksV2Retryable
Subscribe to NewBatchV2 logs from the AVS contract.
- All errors are considered Transient Errors
- Retry times (3 retries): 1 sec, 2 sec, 4 sec.
*/
func SubscribeToNewTasksV2Retryable(
opts *bind.WatchOpts,
serviceManager *servicemanager.ContractAlignedLayerServiceManager,
newTaskCreatedChan chan *servicemanager.ContractAlignedLayerServiceManagerNewBatchV2,
batchMerkleRoot [][32]byte,
config *retry.RetryParams,
) (event.Subscription, error) {
subscribe_func := func() (event.Subscription, error) {
log.Info().Msg("Subscribing to NewBatchV2")
return serviceManager.WatchNewBatchV2(opts, newTaskCreatedChan, batchMerkleRoot)
}
return retry.RetryWithData(subscribe_func, config)
}

/*
SubscribeToNewTasksV3Retryable
Subscribe to NewBatchV3 logs from the AVS contract.
Expand Down
3 changes: 3 additions & 0 deletions core/config/aggregator.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ type AggregatorConfig struct {
GasBumpIncrementalPercentage uint
GasBumpPercentageLimit uint
TimeToWaitBeforeBump time.Duration
PollLatestBatchInterval time.Duration
}
}

Expand All @@ -48,6 +49,7 @@ type AggregatorConfigFromYaml struct {
GasBumpIncrementalPercentage uint `yaml:"gas_bump_incremental_percentage"`
GasBumpPercentageLimit uint `yaml:"gas_bump_percentage_limit"`
TimeToWaitBeforeBump time.Duration `yaml:"time_to_wait_before_bump"`
PollLatestBatchInterval time.Duration `yaml:"poll_latest_batch_interval"`
} `yaml:"aggregator"`
}

Expand Down Expand Up @@ -97,6 +99,7 @@ func NewAggregatorConfig(configFilePath string) *AggregatorConfig {
GasBumpIncrementalPercentage uint
GasBumpPercentageLimit uint
TimeToWaitBeforeBump time.Duration
PollLatestBatchInterval time.Duration
}(aggregatorConfigFromYaml.Aggregator),
}
}
Loading
Loading