Skip to content
This repository was archived by the owner on Nov 5, 2021. It is now read-only.
Merged
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
42 changes: 21 additions & 21 deletions go.mod
Original file line number Diff line number Diff line change
@@ -1,32 +1,32 @@
module github.com/google/cloudprober

go 1.13
go 1.15

require (
cloud.google.com/go v0.48.0
cloud.google.com/go/bigquery v1.0.1
cloud.google.com/go/logging v1.0.0
cloud.google.com/go/pubsub v1.0.1
github.com/aws/aws-sdk-go v1.25.37
cloud.google.com/go v0.68.0
cloud.google.com/go/bigquery v1.8.0
cloud.google.com/go/logging v1.1.0
cloud.google.com/go/pubsub v1.8.1
github.com/aws/aws-sdk-go v1.35.7
github.com/golang/glog v0.0.0-20160126235308-23def4e6c14b
github.com/golang/groupcache v0.0.0-20191027212112-611e8accdfc9 // indirect
github.com/golang/protobuf v1.4.2
github.com/google/shlex v0.0.0-20191202100458-e7afc7fbc510
github.com/hashicorp/golang-lru v0.5.3 // indirect
github.com/hoisie/redis v0.0.0-20160730154456-b5c6e81454e0
github.com/jstemmer/go-junit-report v0.9.1 // indirect
github.com/kylelemons/godebug v1.1.0
github.com/lib/pq v1.2.0
github.com/miekg/dns v1.1.22
go.opencensus.io v0.22.2 // indirect
golang.org/x/crypto v0.0.0-20191112222119-e1110fd1c708 // indirect
golang.org/x/net v0.0.0-20191116160921-f9c825593386
golang.org/x/oauth2 v0.0.0-20190604053449-0f29369cfe45
golang.org/x/sync v0.0.0-20190911185100-cd5d95a43a6e // indirect
golang.org/x/sys v0.0.0-20191115151921-52ab43148777
golang.org/x/tools v0.0.0-20191116214431-80313e1ba718 // indirect
google.golang.org/api v0.13.0
google.golang.org/appengine v1.6.5 // indirect
google.golang.org/genproto v0.0.0-20191115221424-83cc0476cb11
google.golang.org/grpc v1.27.0
google.golang.org/protobuf v1.23.0
github.com/lib/pq v1.8.0
github.com/miekg/dns v1.1.33
go.opencensus.io v0.22.5 // indirect
golang.org/x/crypto v0.0.0-20201012173705-84dcc777aaee // indirect
golang.org/x/net v0.0.0-20201010224723-4f7140c49acb
golang.org/x/oauth2 v0.0.0-20200902213428-5d25da1a8d43
golang.org/x/sync v0.0.0-20201008141435-b3e1573b7520 // indirect
golang.org/x/sys v0.0.0-20201013132646-2da7054afaeb
golang.org/x/tools v0.0.0-20201013201025-64a9e34f3752 // indirect
google.golang.org/api v0.33.0
google.golang.org/appengine v1.6.7 // indirect
google.golang.org/genproto v0.0.0-20201013134114-7f9ee70cb474
google.golang.org/grpc v1.33.0
google.golang.org/protobuf v1.25.0
)
269 changes: 269 additions & 0 deletions go.sum

Large diffs are not rendered by default.

2 changes: 2 additions & 0 deletions metrics/eventmetrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -50,6 +50,8 @@ type EventMetrics struct {
// e.g. ptype=ping, dst=google.com, etc.
labels map[string]string
labelsKeys []string

LatencyUnit time.Duration
}

// NewEventMetrics return a new EventMetrics object with internals maps initialized.
Expand Down
2 changes: 2 additions & 0 deletions probes/common/statskeeper/statskeeper.go
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,8 @@ func StatsKeeper(ctx context.Context, ptype, name string, opts *options.Options,
em.AddLabel("dst", t.Name)
em.Timestamp = ts

em.LatencyUnit = opts.LatencyUnit

for _, al := range opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(t.Name))
}
Expand Down
2 changes: 2 additions & 0 deletions probes/external/external.go
Original file line number Diff line number Diff line change
Expand Up @@ -326,6 +326,8 @@ func (p *Probe) defaultMetrics(target string, result *result) *metrics.EventMetr
AddLabel("probe", p.name).
AddLabel("dst", target)

em.LatencyUnit = p.opts.LatencyUnit

for _, al := range p.opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(target))
}
Expand Down
1 change: 1 addition & 0 deletions probes/grpc/grpc.go
Original file line number Diff line number Diff line change
Expand Up @@ -339,6 +339,7 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
AddLabel("probe", p.name).
AddLabel("dst", targetName)
result.Unlock()
em.LatencyUnit = p.opts.LatencyUnit
for _, al := range p.opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(targetName))
}
Expand Down
2 changes: 2 additions & 0 deletions probes/http/http.go
Original file line number Diff line number Diff line change
Expand Up @@ -421,6 +421,8 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
em.AddMetric("connect_event", metrics.NewInt(result.connEvent))
}

em.LatencyUnit = p.opts.LatencyUnit

for _, al := range p.opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(target.Name))
}
Expand Down
2 changes: 2 additions & 0 deletions probes/ping/ping.go
Original file line number Diff line number Diff line change
Expand Up @@ -483,6 +483,8 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
AddLabel("probe", p.name).
AddLabel("dst", target.Name)

em.LatencyUnit = p.opts.LatencyUnit

for _, al := range p.opts.AdditionalLabels {
em.AddLabel(al.KeyValueForTarget(target.Name))
}
Expand Down
1 change: 1 addition & 0 deletions probes/udp/udp.go
Original file line number Diff line number Diff line change
Expand Up @@ -471,6 +471,7 @@ func (p *Probe) Start(ctx context.Context, dataChan chan *metrics.EventMetrics)
case <-statsExportTicker.C:
for f, result := range p.res {
em := result.eventMetrics(p.name, p.opts, f, p.c)
em.LatencyUnit = p.opts.LatencyUnit
p.opts.LogMetrics(em)
dataChan <- em
}
Expand Down
70 changes: 58 additions & 12 deletions surfacers/stackdriver/stackdriver.go
Original file line number Diff line number Diff line change
Expand Up @@ -54,7 +54,8 @@ type SDSurfacer struct {
allowedMetricsRegex *regexp.Regexp

// Internal cache for saving metric data until a batch is sent
cache map[string]*monitoring.TimeSeries
cache map[string]*monitoring.TimeSeries
knownMetrics map[string]bool

// Channel for writing the data without blocking
writeChan chan *metrics.EventMetrics
Expand Down Expand Up @@ -85,12 +86,13 @@ func New(ctx context.Context, config *configpb.SurfacerConf, l *logger.Logger) (
// Create a cache, which is used for batching write requests together,
// and a channel for writing data.
s := SDSurfacer{
cache: make(map[string]*monitoring.TimeSeries),
writeChan: make(chan *metrics.EventMetrics, 1000),
c: config,
projectName: config.GetProject(),
startTime: time.Now(),
l: l,
cache: make(map[string]*monitoring.TimeSeries),
knownMetrics: make(map[string]bool),
writeChan: make(chan *metrics.EventMetrics, 1000),
c: config,
projectName: config.GetProject(),
startTime: time.Now(),
l: l,
}

if s.c.GetAllowedMetricsRegex() != "" {
Expand Down Expand Up @@ -153,6 +155,31 @@ func (s *SDSurfacer) Write(_ context.Context, em *metrics.EventMetrics) {
}
}

// createMetricDescriptor creates metric descriptor for the given timeseries.
// We create metric descriptors explicitly, instead of relying on auto-
// creation by creating timeseries, because auto-creation doesn't add units to
// the metric.
func (s *SDSurfacer) createMetricDescriptor(ts *monitoring.TimeSeries) error {
var labels []*monitoring.LabelDescriptor
for k := range ts.Metric.Labels {
labels = append(labels, &monitoring.LabelDescriptor{
Key: k,
ValueType: "STRING",
})
}

_, err := s.client.Projects.MetricDescriptors.Create("projects/"+s.projectName, &monitoring.MetricDescriptor{
Name: "projects/" + s.projectName + "/metricDescriptors/" + ts.Metric.Type,
Type: ts.Metric.Type,
MetricKind: ts.MetricKind,
Labels: labels,
Unit: ts.Unit,
ValueType: ts.ValueType,
}).Do()

return err
}

// writeBatch polls the writeChan and the sendChan waiting for either a new
// write packet or a new context. If data comes in on the writeChan, then
// the data is pulled off and put into the cache (if there is already an
Expand Down Expand Up @@ -190,6 +217,13 @@ func (s *SDSurfacer) writeBatch(ctx context.Context) {

var ts []*monitoring.TimeSeries
for _, v := range s.cache {
if !s.knownMetrics[v.Metric.Type] && v.Unit != "" {
if err := s.createMetricDescriptor(v); err != nil {
s.l.Warningf("Error creating metric descriptor for: %s, err: %v", v.Metric.Type, err)
continue
}
s.knownMetrics[v.Metric.Type] = true
}
ts = append(ts, v)
}

Expand Down Expand Up @@ -233,11 +267,12 @@ func (s *SDSurfacer) writeBatch(ctx context.Context) {
//
// More information on the object and specific fields can be found here:
// https://cloud.google.com/monitoring/api/ref_v3/rest/v3/TimeSeries
func (s *SDSurfacer) recordTimeSeries(metricKind, metricName, msgType string, labels map[string]string, timestamp time.Time, tv *monitoring.TypedValue, cacheKey string) *monitoring.TimeSeries {
func (s *SDSurfacer) recordTimeSeries(metricKind, metricName, msgType string, labels map[string]string, timestamp time.Time, tv *monitoring.TypedValue, unit, cacheKey string) *monitoring.TimeSeries {
startTime := s.startTime.Format(time.RFC3339Nano)
if metricKind == "GAUGE" {
startTime = timestamp.Format(time.RFC3339Nano)
}

ts := &monitoring.TimeSeries{
// The URL address for our custom metric, must match the
// name we used in the MetricDescriptor.
Expand All @@ -249,6 +284,7 @@ func (s *SDSurfacer) recordTimeSeries(metricKind, metricName, msgType string, la
// Must match the MetricKind and ValueType of the MetricDescriptor.
MetricKind: metricKind,
ValueType: msgType,
Unit: unit,

// Create a single data point, this could be utilized to create
// a batch of points instead of a single point if the write
Expand Down Expand Up @@ -366,10 +402,20 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
// Create the correct TimeSeries object based on the incoming data
val := em.Metric(k)

unit := "1" // "1" is the default unit for numbers.
if k == "latency" {
unit = map[time.Duration]string{
time.Second: "s",
time.Millisecond: "ms",
time.Microsecond: "us",
time.Nanosecond: "ns",
}[em.LatencyUnit]
}

// If metric value is of type numerical value.
if v, ok := val.(metrics.NumValue); ok {
f := float64(v.Int64())
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, cacheKey))
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
continue
}

Expand All @@ -383,7 +429,7 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
// for stackdriver.
mLabels["val"] = strings.Trim(v.String(), "\"")
f := float64(1)
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, cacheKey))
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
continue
}

Expand All @@ -398,14 +444,14 @@ func (s *SDSurfacer) recordEventMetrics(em *metrics.EventMetrics) (ts []*monitor
}
mmLabels[mapValue.MapName] = mapKey
f := float64(mapValue.GetKey(mapKey).Int64())
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mmLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, cacheKey))
ts = append(ts, s.recordTimeSeries(metricKind, name, "DOUBLE", mmLabels, em.Timestamp, &monitoring.TypedValue{DoubleValue: &f}, unit, cacheKey))
}
continue
}

// If metric value is of type Distribution.
if distValue, ok := val.(*metrics.Distribution); ok {
ts = append(ts, s.recordTimeSeries(metricKind, name, "DISTRIBUTION", mLabels, em.Timestamp, distValue.StackdriverTypedValue(), cacheKey))
ts = append(ts, s.recordTimeSeries(metricKind, name, "DISTRIBUTION", mLabels, em.Timestamp, distValue.StackdriverTypedValue(), unit, cacheKey))
continue
}

Expand Down
4 changes: 4 additions & 0 deletions surfacers/stackdriver/stackdriver_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -80,6 +80,7 @@ func TestTimeSeries(t *testing.T) {
},
MetricKind: "CUMULATIVE",
ValueType: "DOUBLE",
Unit: "1",
Points: []*monitoring.Point{
{
Interval: &monitoring.TimeInterval{
Expand Down Expand Up @@ -120,6 +121,7 @@ func TestTimeSeries(t *testing.T) {
},
MetricKind: "CUMULATIVE",
ValueType: "DOUBLE",
Unit: "1",
Points: []*monitoring.Point{
{
Interval: &monitoring.TimeInterval{
Expand Down Expand Up @@ -160,6 +162,7 @@ func TestTimeSeries(t *testing.T) {
},
MetricKind: "CUMULATIVE",
ValueType: "DOUBLE",
Unit: "1",
Points: []*monitoring.Point{
{
Interval: &monitoring.TimeInterval{
Expand Down Expand Up @@ -190,6 +193,7 @@ func TestTimeSeries(t *testing.T) {
},
MetricKind: "CUMULATIVE",
ValueType: "DOUBLE",
Unit: "1",
Points: []*monitoring.Point{
{
Interval: &monitoring.TimeInterval{
Expand Down