diff --git a/README.md b/README.md
index cb99f3d17..bf82a35e7 100644
--- a/README.md
+++ b/README.md
@@ -2,13 +2,11 @@
[](https://travis-ci.com/ChainSafe/ChainBridge)
-
[WIP]
# Contents
- [Installation](#installation)
- [Configuration](#configuration)
-- [Running](#running)
- [Chain Implementations](#chain-implementations)
- [Testing](#testing)
- [Simulations](#simulations)
@@ -97,6 +95,12 @@ To import private keys as keystores, use `chainbridge account import --privateKe
For testing purposes, chainbridge provides 5 test keys. The can be used with `--testkey `, where `name` is one of `Alice`, `Bob`, `Charlie`, `Dave`, or `Eve`.
+## Metrics
+
+A basic health status check can be enabled with the `--metrics` flag (default port `8001`, use `--metricsPort` to specify).
+
+The endpoint `/health` will return the current block height and a timestamp of when it was processed. If the timestamp is at least 120 seconds old an error will be returned.
+
# Chain Implementations
- Ethereum (Solidity): [chainbridge-solidity](https://github.com/ChainSafe/chainbridge-solidity)
@@ -109,7 +113,7 @@ For testing purposes, chainbridge provides 5 test keys. The can be used with `--
A substrate pallet that can be integrated into a chain, as well as an example pallet to demonstrate chain integration.
-# MKdocs
+# Docs
MKdocs will generate static HTML files for Chainsafe markdown files located in `Chainbridge/docs/`
diff --git a/chains/ethereum/chain.go b/chains/ethereum/chain.go
index e718d0aec..8a7bf9862 100644
--- a/chains/ethereum/chain.go
+++ b/chains/ethereum/chain.go
@@ -34,6 +34,7 @@ import (
"github.com/ChainSafe/ChainBridge/crypto/secp256k1"
"github.com/ChainSafe/ChainBridge/keystore"
msg "github.com/ChainSafe/ChainBridge/message"
+ metrics "github.com/ChainSafe/ChainBridge/metrics/types"
"github.com/ChainSafe/ChainBridge/router"
"github.com/ChainSafe/log15"
"github.com/ethereum/go-ethereum/accounts/abi/bind"
@@ -205,6 +206,10 @@ func (c *Chain) Name() string {
return c.cfg.Name
}
+func (c *Chain) LatestBlock() metrics.LatestBlock {
+ return c.listener.latestBlock
+}
+
// Stop signals to any running routines to exit
func (c *Chain) Stop() {
close(c.stop)
diff --git a/chains/ethereum/listener.go b/chains/ethereum/listener.go
index 42ae53881..171149e53 100644
--- a/chains/ethereum/listener.go
+++ b/chains/ethereum/listener.go
@@ -17,6 +17,7 @@ import (
"github.com/ChainSafe/ChainBridge/blockstore"
"github.com/ChainSafe/ChainBridge/chains"
msg "github.com/ChainSafe/ChainBridge/message"
+ metrics "github.com/ChainSafe/ChainBridge/metrics/types"
utils "github.com/ChainSafe/ChainBridge/shared/ethereum"
"github.com/ChainSafe/log15"
eth "github.com/ethereum/go-ethereum"
@@ -41,17 +42,19 @@ type listener struct {
blockstore blockstore.Blockstorer
stop <-chan int
sysErr chan<- error // Reports fatal error to core
+ latestBlock metrics.LatestBlock
}
// NewListener creates and returns a listener
func NewListener(conn Connection, cfg *Config, log log15.Logger, bs blockstore.Blockstorer, stop <-chan int, sysErr chan<- error) *listener {
return &listener{
- cfg: *cfg,
- conn: conn,
- log: log,
- blockstore: bs,
- stop: stop,
- sysErr: sysErr,
+ cfg: *cfg,
+ conn: conn,
+ log: log,
+ blockstore: bs,
+ stop: stop,
+ sysErr: sysErr,
+ latestBlock: metrics.LatestBlock{LastUpdated: time.Now()},
}
}
@@ -132,6 +135,8 @@ func (l *listener) pollBlocks() error {
// Goto next block and reset retry counter
currentBlock.Add(currentBlock, big.NewInt(1))
+ l.latestBlock.Height = big.NewInt(0).Set(latestBlock)
+ l.latestBlock.LastUpdated = time.Now()
retry = BlockRetryLimit
}
}
diff --git a/chains/substrate/chain.go b/chains/substrate/chain.go
index beed65c3a..a6f5c33c9 100644
--- a/chains/substrate/chain.go
+++ b/chains/substrate/chain.go
@@ -29,6 +29,7 @@ import (
"github.com/ChainSafe/ChainBridge/crypto/sr25519"
"github.com/ChainSafe/ChainBridge/keystore"
msg "github.com/ChainSafe/ChainBridge/message"
+ metrics "github.com/ChainSafe/ChainBridge/metrics/types"
"github.com/ChainSafe/ChainBridge/router"
"github.com/ChainSafe/log15"
)
@@ -132,6 +133,10 @@ func (c *Chain) SetRouter(r *router.Router) {
c.listener.setRouter(r)
}
+func (c *Chain) LatestBlock() metrics.LatestBlock {
+ return c.listener.latestBlock
+}
+
func (c *Chain) Id() msg.ChainId {
return c.cfg.Id
}
diff --git a/chains/substrate/listener.go b/chains/substrate/listener.go
index fd0593782..5ad1d9be0 100644
--- a/chains/substrate/listener.go
+++ b/chains/substrate/listener.go
@@ -12,6 +12,7 @@ import (
"github.com/ChainSafe/ChainBridge/blockstore"
"github.com/ChainSafe/ChainBridge/chains"
msg "github.com/ChainSafe/ChainBridge/message"
+ metrics "github.com/ChainSafe/ChainBridge/metrics/types"
utils "github.com/ChainSafe/ChainBridge/shared/substrate"
"github.com/ChainSafe/log15"
"github.com/centrifuge/go-substrate-rpc-client/types"
@@ -28,6 +29,7 @@ type listener struct {
log log15.Logger
stop <-chan int
sysErr chan<- error
+ latestBlock metrics.LatestBlock
}
// Frequency of polling for a new block
@@ -45,6 +47,7 @@ func NewListener(conn *Connection, name string, id msg.ChainId, startBlock uint6
log: log,
stop: stop,
sysErr: sysErr,
+ latestBlock: metrics.LatestBlock{LastUpdated: time.Now()},
}
}
@@ -159,6 +162,8 @@ func (l *listener) pollBlocks() error {
}
currentBlock++
+ l.latestBlock.Height = big.NewInt(0).SetUint64(currentBlock)
+ l.latestBlock.LastUpdated = time.Now()
retry = BlockRetryLimit
}
}
diff --git a/cmd/chainbridge/main.go b/cmd/chainbridge/main.go
index b9efa7efe..af8db3d19 100644
--- a/cmd/chainbridge/main.go
+++ b/cmd/chainbridge/main.go
@@ -17,6 +17,7 @@ import (
"github.com/ChainSafe/ChainBridge/config"
"github.com/ChainSafe/ChainBridge/core"
msg "github.com/ChainSafe/ChainBridge/message"
+ "github.com/ChainSafe/ChainBridge/metrics/health"
log "github.com/ChainSafe/log15"
"github.com/urfave/cli/v2"
)
@@ -30,6 +31,8 @@ var cliFlags = []cli.Flag{
config.BlockstorePathFlag,
config.FreshStartFlag,
config.LatestBlockFlag,
+ config.MetricsFlag,
+ config.MetricsPort,
}
var generateFlags = []cli.Flag{
@@ -188,6 +191,12 @@ func run(ctx *cli.Context) error {
c.AddChain(newChain)
}
+ // Start metrics server
+ if ctx.Bool(config.MetricsFlag.Name) {
+ port := ctx.Int(config.MetricsPort.Name)
+ go health.Start(port, c.Registry)
+ }
+
c.Start()
return nil
diff --git a/config/flags.go b/config/flags.go
index 945384b3e..61eaa88c1 100644
--- a/config/flags.go
+++ b/config/flags.go
@@ -43,6 +43,20 @@ var (
}
)
+// Metrics flags
+var (
+ MetricsFlag = &cli.BoolFlag{
+ Name: "metrics",
+ Usage: "Enables metric server",
+ }
+
+ MetricsPort = &cli.IntFlag{
+ Name: "metricsPort",
+ Usage: "Port to serve metrics on",
+ Value: 8001,
+ }
+)
+
// Generate subcommand flags
var (
PasswordFlag = &cli.StringFlag{
diff --git a/core/chain.go b/core/chain.go
index def795b29..62f45fd47 100644
--- a/core/chain.go
+++ b/core/chain.go
@@ -5,6 +5,7 @@ package core
import (
msg "github.com/ChainSafe/ChainBridge/message"
+ metrics "github.com/ChainSafe/ChainBridge/metrics/types"
"github.com/ChainSafe/ChainBridge/router"
)
@@ -13,6 +14,7 @@ type Chain interface {
SetRouter(*router.Router)
Id() msg.ChainId
Name() string
+ LatestBlock() metrics.LatestBlock
Stop()
}
diff --git a/core/core.go b/core/core.go
index cc83ac16e..937c79cac 100644
--- a/core/core.go
+++ b/core/core.go
@@ -9,14 +9,13 @@ import (
"os/signal"
"syscall"
- msg "github.com/ChainSafe/ChainBridge/message"
"github.com/ChainSafe/ChainBridge/router"
"github.com/ChainSafe/log15"
)
type Core struct {
- registry map[msg.ChainId]Chain
+ Registry []Chain
route *router.Router
log log15.Logger
sysErr <-chan error
@@ -24,22 +23,22 @@ type Core struct {
func NewCore(sysErr <-chan error) *Core {
return &Core{
- registry: make(map[msg.ChainId]Chain),
+ Registry: make([]Chain, 0),
route: router.NewRouter(log15.New("system", "router")),
log: log15.New("system", "core"),
sysErr: sysErr,
}
}
-// AddChain registers the chain in the registry and calls Chain.SetRouter()
+// AddChain registers the chain in the Registry and calls Chain.SetRouter()
func (c *Core) AddChain(chain Chain) {
- c.registry[chain.Id()] = chain
+ c.Registry = append(c.Registry, chain)
chain.SetRouter(c.route)
}
// Start will call all registered chains' Start methods and block forever (or until signal is received)
func (c *Core) Start() {
- for _, chain := range c.registry {
+ for _, chain := range c.Registry {
err := chain.Start()
if err != nil {
c.log.Error(
@@ -65,7 +64,7 @@ func (c *Core) Start() {
}
// Signal chains to shutdown
- for _, chain := range c.registry {
+ for _, chain := range c.Registry {
chain.Stop()
}
}
diff --git a/metrics/health/health.go b/metrics/health/health.go
new file mode 100644
index 000000000..e1752baa7
--- /dev/null
+++ b/metrics/health/health.go
@@ -0,0 +1,142 @@
+// Copyright 2020 ChainSafe Systems
+// SPDX-License-Identifier: LGPL-3.0-only
+
+package health
+
+import (
+ "encoding/json"
+ "fmt"
+ "math/big"
+ "net/http"
+ "strconv"
+ "time"
+
+ "github.com/ChainSafe/ChainBridge/core"
+ msg "github.com/ChainSafe/ChainBridge/message"
+ log "github.com/ChainSafe/log15"
+)
+
+// After this duration with no changes a chain will return an error
+const BlockTimeout = 20
+
+type httpMetricServer struct {
+ port int
+ timeDelay int
+ chains []core.Chain
+ stats []ChainInfo
+}
+
+type httpMetricOptions struct {
+ port int
+ timeDelay int
+ chains []core.Chain
+}
+
+type httpResponse struct {
+ Chains []ChainInfo `json:"chains,omitempty"`
+ Error string `json:"error,omitempty"`
+}
+
+type ChainInfo struct {
+ ChainId msg.ChainId `json:"chainId"`
+ Height *big.Int `json:"height"`
+ LastUpdated time.Time `json:"lastUpdated"`
+}
+
+// Start spins up the metrics server
+func Start(port int, chains []core.Chain) {
+ opts := httpMetricOptions{
+ port: port,
+ timeDelay: BlockTimeout,
+ chains: chains,
+ }
+ httpServer := newhttpMetricServer(opts)
+ httpServer.Start()
+}
+
+func newhttpMetricServer(opts httpMetricOptions) *httpMetricServer {
+ return &httpMetricServer{
+ port: opts.port,
+ chains: opts.chains,
+ timeDelay: opts.timeDelay,
+ stats: make([]ChainInfo, len(opts.chains)),
+ }
+}
+
+// Start starts the http metrics server
+func (s httpMetricServer) Start() {
+ log.Info("Health status server started listening on", "port", s.port)
+
+ // Setup routes
+ http.HandleFunc("/health", s.healthStatus)
+
+ // Start http server
+ err := http.ListenAndServe(":"+strconv.Itoa(s.port), nil)
+
+ if err == http.ErrServerClosed {
+ log.Info("Health status server is shutting down", err)
+ } else {
+ log.Error("Health status server shutting down, server error: ", err)
+ }
+}
+
+// healthStatus is a catch-all update that grabs the latest updates on the running chains
+// It assumes that the configuration was set correctly, therefore the relevant chains are
+// only those that are in the core.Core registry.
+func (s httpMetricServer) healthStatus(w http.ResponseWriter, _ *http.Request) {
+ w.Header().Set("Content-Type", "application/json")
+
+ // Iterate through their block heads and update the cache accordingly
+ for i, chain := range s.chains {
+ current := chain.LatestBlock()
+ prev := s.stats[i]
+ if s.stats[i].LastUpdated.IsZero() {
+ // First time we've received a block for this chain
+ s.stats[chain.Id()] = ChainInfo{
+ ChainId: chain.Id(),
+ Height: current.Height,
+ LastUpdated: current.LastUpdated,
+ }
+ } else {
+ now := time.Now()
+ timeDiff := now.Sub(prev.LastUpdated)
+ // If block has changed, update it
+ if current.Height.Cmp(prev.Height) == 1 {
+ s.stats[chain.Id()].LastUpdated = current.LastUpdated
+ s.stats[chain.Id()].Height = current.Height
+ } else if int(timeDiff.Seconds()) >= s.timeDelay { // Error if we exceeded the time limit
+ response := &httpResponse{
+ Chains: []ChainInfo{},
+ Error: fmt.Sprintf("chain %d height hasn't changed for %f seconds. Current Height: %s", prev.ChainId, timeDiff.Seconds(), current.Height),
+ }
+ w.WriteHeader(http.StatusInternalServerError)
+ err := json.NewEncoder(w).Encode(response)
+ if err != nil {
+ log.Error("Failed to write metrics", "err", err)
+ }
+ return
+ } else if current.Height != nil && prev.Height != nil && current.Height.Cmp(prev.Height) == -1 { // Error for having a smaller blockheight than previous
+ response := &httpResponse{
+ Chains: []ChainInfo{},
+ Error: fmt.Sprintf("unexpected block height. previous = %s current = %s", prev.Height, current.Height),
+ }
+ w.WriteHeader(http.StatusInternalServerError)
+ err := json.NewEncoder(w).Encode(response)
+ if err != nil {
+ log.Error("Failed to write metrics", "err", err)
+ }
+ return
+ }
+ }
+ }
+
+ response := &httpResponse{
+ Chains: s.stats,
+ Error: "",
+ }
+ w.WriteHeader(http.StatusOK)
+ err := json.NewEncoder(w).Encode(response)
+ if err != nil {
+ log.Error("Failed to serve metrics")
+ }
+}
diff --git a/metrics/types/types.go b/metrics/types/types.go
new file mode 100644
index 000000000..cf0fb4dfa
--- /dev/null
+++ b/metrics/types/types.go
@@ -0,0 +1,14 @@
+// Copyright 2020 ChainSafe Systems
+// SPDX-License-Identifier: LGPL-3.0-only
+
+package types
+
+import (
+ "math/big"
+ "time"
+)
+
+type LatestBlock struct {
+ Height *big.Int
+ LastUpdated time.Time
+}