diff --git a/README.md b/README.md index cb99f3d17..bf82a35e7 100644 --- a/README.md +++ b/README.md @@ -2,13 +2,11 @@ [![Build Status](https://travis-ci.com/ChainSafe/ChainBridge.svg?branch=master)](https://travis-ci.com/ChainSafe/ChainBridge) -

[WIP]

# Contents - [Installation](#installation) - [Configuration](#configuration) -- [Running](#running) - [Chain Implementations](#chain-implementations) - [Testing](#testing) - [Simulations](#simulations) @@ -97,6 +95,12 @@ To import private keys as keystores, use `chainbridge account import --privateKe For testing purposes, chainbridge provides 5 test keys. The can be used with `--testkey `, where `name` is one of `Alice`, `Bob`, `Charlie`, `Dave`, or `Eve`. +## Metrics + +A basic health status check can be enabled with the `--metrics` flag (default port `8001`, use `--metricsPort` to specify). + +The endpoint `/health` will return the current block height and a timestamp of when it was processed. If the timestamp is at least 120 seconds old an error will be returned. + # Chain Implementations - Ethereum (Solidity): [chainbridge-solidity](https://github.com/ChainSafe/chainbridge-solidity) @@ -109,7 +113,7 @@ For testing purposes, chainbridge provides 5 test keys. The can be used with `-- A substrate pallet that can be integrated into a chain, as well as an example pallet to demonstrate chain integration. -# MKdocs +# Docs MKdocs will generate static HTML files for Chainsafe markdown files located in `Chainbridge/docs/` diff --git a/chains/ethereum/chain.go b/chains/ethereum/chain.go index e718d0aec..8a7bf9862 100644 --- a/chains/ethereum/chain.go +++ b/chains/ethereum/chain.go @@ -34,6 +34,7 @@ import ( "github.com/ChainSafe/ChainBridge/crypto/secp256k1" "github.com/ChainSafe/ChainBridge/keystore" msg "github.com/ChainSafe/ChainBridge/message" + metrics "github.com/ChainSafe/ChainBridge/metrics/types" "github.com/ChainSafe/ChainBridge/router" "github.com/ChainSafe/log15" "github.com/ethereum/go-ethereum/accounts/abi/bind" @@ -205,6 +206,10 @@ func (c *Chain) Name() string { return c.cfg.Name } +func (c *Chain) LatestBlock() metrics.LatestBlock { + return c.listener.latestBlock +} + // Stop signals to any running routines to exit func (c *Chain) Stop() { close(c.stop) diff --git a/chains/ethereum/listener.go b/chains/ethereum/listener.go index 42ae53881..171149e53 100644 --- a/chains/ethereum/listener.go +++ b/chains/ethereum/listener.go @@ -17,6 +17,7 @@ import ( "github.com/ChainSafe/ChainBridge/blockstore" "github.com/ChainSafe/ChainBridge/chains" msg "github.com/ChainSafe/ChainBridge/message" + metrics "github.com/ChainSafe/ChainBridge/metrics/types" utils "github.com/ChainSafe/ChainBridge/shared/ethereum" "github.com/ChainSafe/log15" eth "github.com/ethereum/go-ethereum" @@ -41,17 +42,19 @@ type listener struct { blockstore blockstore.Blockstorer stop <-chan int sysErr chan<- error // Reports fatal error to core + latestBlock metrics.LatestBlock } // NewListener creates and returns a listener func NewListener(conn Connection, cfg *Config, log log15.Logger, bs blockstore.Blockstorer, stop <-chan int, sysErr chan<- error) *listener { return &listener{ - cfg: *cfg, - conn: conn, - log: log, - blockstore: bs, - stop: stop, - sysErr: sysErr, + cfg: *cfg, + conn: conn, + log: log, + blockstore: bs, + stop: stop, + sysErr: sysErr, + latestBlock: metrics.LatestBlock{LastUpdated: time.Now()}, } } @@ -132,6 +135,8 @@ func (l *listener) pollBlocks() error { // Goto next block and reset retry counter currentBlock.Add(currentBlock, big.NewInt(1)) + l.latestBlock.Height = big.NewInt(0).Set(latestBlock) + l.latestBlock.LastUpdated = time.Now() retry = BlockRetryLimit } } diff --git a/chains/substrate/chain.go b/chains/substrate/chain.go index beed65c3a..a6f5c33c9 100644 --- a/chains/substrate/chain.go +++ b/chains/substrate/chain.go @@ -29,6 +29,7 @@ import ( "github.com/ChainSafe/ChainBridge/crypto/sr25519" "github.com/ChainSafe/ChainBridge/keystore" msg "github.com/ChainSafe/ChainBridge/message" + metrics "github.com/ChainSafe/ChainBridge/metrics/types" "github.com/ChainSafe/ChainBridge/router" "github.com/ChainSafe/log15" ) @@ -132,6 +133,10 @@ func (c *Chain) SetRouter(r *router.Router) { c.listener.setRouter(r) } +func (c *Chain) LatestBlock() metrics.LatestBlock { + return c.listener.latestBlock +} + func (c *Chain) Id() msg.ChainId { return c.cfg.Id } diff --git a/chains/substrate/listener.go b/chains/substrate/listener.go index fd0593782..5ad1d9be0 100644 --- a/chains/substrate/listener.go +++ b/chains/substrate/listener.go @@ -12,6 +12,7 @@ import ( "github.com/ChainSafe/ChainBridge/blockstore" "github.com/ChainSafe/ChainBridge/chains" msg "github.com/ChainSafe/ChainBridge/message" + metrics "github.com/ChainSafe/ChainBridge/metrics/types" utils "github.com/ChainSafe/ChainBridge/shared/substrate" "github.com/ChainSafe/log15" "github.com/centrifuge/go-substrate-rpc-client/types" @@ -28,6 +29,7 @@ type listener struct { log log15.Logger stop <-chan int sysErr chan<- error + latestBlock metrics.LatestBlock } // Frequency of polling for a new block @@ -45,6 +47,7 @@ func NewListener(conn *Connection, name string, id msg.ChainId, startBlock uint6 log: log, stop: stop, sysErr: sysErr, + latestBlock: metrics.LatestBlock{LastUpdated: time.Now()}, } } @@ -159,6 +162,8 @@ func (l *listener) pollBlocks() error { } currentBlock++ + l.latestBlock.Height = big.NewInt(0).SetUint64(currentBlock) + l.latestBlock.LastUpdated = time.Now() retry = BlockRetryLimit } } diff --git a/cmd/chainbridge/main.go b/cmd/chainbridge/main.go index b9efa7efe..af8db3d19 100644 --- a/cmd/chainbridge/main.go +++ b/cmd/chainbridge/main.go @@ -17,6 +17,7 @@ import ( "github.com/ChainSafe/ChainBridge/config" "github.com/ChainSafe/ChainBridge/core" msg "github.com/ChainSafe/ChainBridge/message" + "github.com/ChainSafe/ChainBridge/metrics/health" log "github.com/ChainSafe/log15" "github.com/urfave/cli/v2" ) @@ -30,6 +31,8 @@ var cliFlags = []cli.Flag{ config.BlockstorePathFlag, config.FreshStartFlag, config.LatestBlockFlag, + config.MetricsFlag, + config.MetricsPort, } var generateFlags = []cli.Flag{ @@ -188,6 +191,12 @@ func run(ctx *cli.Context) error { c.AddChain(newChain) } + // Start metrics server + if ctx.Bool(config.MetricsFlag.Name) { + port := ctx.Int(config.MetricsPort.Name) + go health.Start(port, c.Registry) + } + c.Start() return nil diff --git a/config/flags.go b/config/flags.go index 945384b3e..61eaa88c1 100644 --- a/config/flags.go +++ b/config/flags.go @@ -43,6 +43,20 @@ var ( } ) +// Metrics flags +var ( + MetricsFlag = &cli.BoolFlag{ + Name: "metrics", + Usage: "Enables metric server", + } + + MetricsPort = &cli.IntFlag{ + Name: "metricsPort", + Usage: "Port to serve metrics on", + Value: 8001, + } +) + // Generate subcommand flags var ( PasswordFlag = &cli.StringFlag{ diff --git a/core/chain.go b/core/chain.go index def795b29..62f45fd47 100644 --- a/core/chain.go +++ b/core/chain.go @@ -5,6 +5,7 @@ package core import ( msg "github.com/ChainSafe/ChainBridge/message" + metrics "github.com/ChainSafe/ChainBridge/metrics/types" "github.com/ChainSafe/ChainBridge/router" ) @@ -13,6 +14,7 @@ type Chain interface { SetRouter(*router.Router) Id() msg.ChainId Name() string + LatestBlock() metrics.LatestBlock Stop() } diff --git a/core/core.go b/core/core.go index cc83ac16e..937c79cac 100644 --- a/core/core.go +++ b/core/core.go @@ -9,14 +9,13 @@ import ( "os/signal" "syscall" - msg "github.com/ChainSafe/ChainBridge/message" "github.com/ChainSafe/ChainBridge/router" "github.com/ChainSafe/log15" ) type Core struct { - registry map[msg.ChainId]Chain + Registry []Chain route *router.Router log log15.Logger sysErr <-chan error @@ -24,22 +23,22 @@ type Core struct { func NewCore(sysErr <-chan error) *Core { return &Core{ - registry: make(map[msg.ChainId]Chain), + Registry: make([]Chain, 0), route: router.NewRouter(log15.New("system", "router")), log: log15.New("system", "core"), sysErr: sysErr, } } -// AddChain registers the chain in the registry and calls Chain.SetRouter() +// AddChain registers the chain in the Registry and calls Chain.SetRouter() func (c *Core) AddChain(chain Chain) { - c.registry[chain.Id()] = chain + c.Registry = append(c.Registry, chain) chain.SetRouter(c.route) } // Start will call all registered chains' Start methods and block forever (or until signal is received) func (c *Core) Start() { - for _, chain := range c.registry { + for _, chain := range c.Registry { err := chain.Start() if err != nil { c.log.Error( @@ -65,7 +64,7 @@ func (c *Core) Start() { } // Signal chains to shutdown - for _, chain := range c.registry { + for _, chain := range c.Registry { chain.Stop() } } diff --git a/metrics/health/health.go b/metrics/health/health.go new file mode 100644 index 000000000..e1752baa7 --- /dev/null +++ b/metrics/health/health.go @@ -0,0 +1,142 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: LGPL-3.0-only + +package health + +import ( + "encoding/json" + "fmt" + "math/big" + "net/http" + "strconv" + "time" + + "github.com/ChainSafe/ChainBridge/core" + msg "github.com/ChainSafe/ChainBridge/message" + log "github.com/ChainSafe/log15" +) + +// After this duration with no changes a chain will return an error +const BlockTimeout = 20 + +type httpMetricServer struct { + port int + timeDelay int + chains []core.Chain + stats []ChainInfo +} + +type httpMetricOptions struct { + port int + timeDelay int + chains []core.Chain +} + +type httpResponse struct { + Chains []ChainInfo `json:"chains,omitempty"` + Error string `json:"error,omitempty"` +} + +type ChainInfo struct { + ChainId msg.ChainId `json:"chainId"` + Height *big.Int `json:"height"` + LastUpdated time.Time `json:"lastUpdated"` +} + +// Start spins up the metrics server +func Start(port int, chains []core.Chain) { + opts := httpMetricOptions{ + port: port, + timeDelay: BlockTimeout, + chains: chains, + } + httpServer := newhttpMetricServer(opts) + httpServer.Start() +} + +func newhttpMetricServer(opts httpMetricOptions) *httpMetricServer { + return &httpMetricServer{ + port: opts.port, + chains: opts.chains, + timeDelay: opts.timeDelay, + stats: make([]ChainInfo, len(opts.chains)), + } +} + +// Start starts the http metrics server +func (s httpMetricServer) Start() { + log.Info("Health status server started listening on", "port", s.port) + + // Setup routes + http.HandleFunc("/health", s.healthStatus) + + // Start http server + err := http.ListenAndServe(":"+strconv.Itoa(s.port), nil) + + if err == http.ErrServerClosed { + log.Info("Health status server is shutting down", err) + } else { + log.Error("Health status server shutting down, server error: ", err) + } +} + +// healthStatus is a catch-all update that grabs the latest updates on the running chains +// It assumes that the configuration was set correctly, therefore the relevant chains are +// only those that are in the core.Core registry. +func (s httpMetricServer) healthStatus(w http.ResponseWriter, _ *http.Request) { + w.Header().Set("Content-Type", "application/json") + + // Iterate through their block heads and update the cache accordingly + for i, chain := range s.chains { + current := chain.LatestBlock() + prev := s.stats[i] + if s.stats[i].LastUpdated.IsZero() { + // First time we've received a block for this chain + s.stats[chain.Id()] = ChainInfo{ + ChainId: chain.Id(), + Height: current.Height, + LastUpdated: current.LastUpdated, + } + } else { + now := time.Now() + timeDiff := now.Sub(prev.LastUpdated) + // If block has changed, update it + if current.Height.Cmp(prev.Height) == 1 { + s.stats[chain.Id()].LastUpdated = current.LastUpdated + s.stats[chain.Id()].Height = current.Height + } else if int(timeDiff.Seconds()) >= s.timeDelay { // Error if we exceeded the time limit + response := &httpResponse{ + Chains: []ChainInfo{}, + Error: fmt.Sprintf("chain %d height hasn't changed for %f seconds. Current Height: %s", prev.ChainId, timeDiff.Seconds(), current.Height), + } + w.WriteHeader(http.StatusInternalServerError) + err := json.NewEncoder(w).Encode(response) + if err != nil { + log.Error("Failed to write metrics", "err", err) + } + return + } else if current.Height != nil && prev.Height != nil && current.Height.Cmp(prev.Height) == -1 { // Error for having a smaller blockheight than previous + response := &httpResponse{ + Chains: []ChainInfo{}, + Error: fmt.Sprintf("unexpected block height. previous = %s current = %s", prev.Height, current.Height), + } + w.WriteHeader(http.StatusInternalServerError) + err := json.NewEncoder(w).Encode(response) + if err != nil { + log.Error("Failed to write metrics", "err", err) + } + return + } + } + } + + response := &httpResponse{ + Chains: s.stats, + Error: "", + } + w.WriteHeader(http.StatusOK) + err := json.NewEncoder(w).Encode(response) + if err != nil { + log.Error("Failed to serve metrics") + } +} diff --git a/metrics/types/types.go b/metrics/types/types.go new file mode 100644 index 000000000..cf0fb4dfa --- /dev/null +++ b/metrics/types/types.go @@ -0,0 +1,14 @@ +// Copyright 2020 ChainSafe Systems +// SPDX-License-Identifier: LGPL-3.0-only + +package types + +import ( + "math/big" + "time" +) + +type LatestBlock struct { + Height *big.Int + LastUpdated time.Time +}