-
-
Notifications
You must be signed in to change notification settings - Fork 352
Commit
This commit does not belong to any branch on this repository, and may belong to a fork outside of the repository.
* add prometheus support * refactored to manage own registry * go mod tidy * removed ProcessCollector as this is only supported on linux
- Loading branch information
Showing
5 changed files
with
262 additions
and
17 deletions.
There are no files selected for viewing
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,151 @@ | ||
package monitor | ||
|
||
import ( | ||
"context" | ||
"net/http" | ||
"strings" | ||
"time" | ||
|
||
"github.com/emitter-io/emitter/internal/async" | ||
"github.com/emitter-io/stats" | ||
"github.com/prometheus/client_golang/prometheus" | ||
"github.com/prometheus/client_golang/prometheus/promhttp" | ||
) | ||
|
||
// Noop implements Storage contract. | ||
var _ Storage = new(Prometheus) | ||
|
||
// Prometheus represents a storage which publishes stats to a statsd sink. | ||
type Prometheus struct { | ||
registry *prometheus.Registry // Prometheus registry | ||
reader stats.Snapshotter // The reader which reads the snapshot of stats. | ||
cancel context.CancelFunc // The cancellation function. | ||
gauges map[string]prometheus.Gauge // The gauges created | ||
histograms map[string]prometheus.Histogram // The histograms created | ||
} | ||
|
||
// NewPrometheus creates a new prometheus endpoint. | ||
func NewPrometheus(snapshotter stats.Snapshotter, mux *http.ServeMux) *Prometheus { | ||
|
||
// manage own prometheus registry | ||
registry := prometheus.NewRegistry() | ||
registry.MustRegister(prometheus.NewGoCollector()) | ||
|
||
mux.Handle("/metrics", promhttp.InstrumentMetricHandler(registry, promhttp.HandlerFor(registry, promhttp.HandlerOpts{}))) | ||
|
||
return &Prometheus{ | ||
registry: registry, | ||
reader: snapshotter, | ||
gauges: make(map[string]prometheus.Gauge, 0), | ||
histograms: make(map[string]prometheus.Histogram, 0), | ||
} | ||
} | ||
|
||
// Name returns the name of the provider. | ||
func (p *Prometheus) Name() string { | ||
return "prometheus" | ||
} | ||
|
||
// Configure configures the storage. The config parameter provided is | ||
// loosely typed, since various storage mechanisms will require different | ||
// configurations. | ||
func (p *Prometheus) Configure(config map[string]interface{}) (err error) { | ||
|
||
// Get the interval from the provider configuration | ||
interval := defaultInterval | ||
if v, ok := config["interval"]; ok { | ||
if i, ok := v.(float64); ok { | ||
interval = time.Duration(i) * time.Millisecond | ||
} | ||
} | ||
|
||
p.cancel = async.Repeat(context.Background(), interval, p.write) | ||
|
||
return | ||
} | ||
|
||
// Flush reads and writes stats into this stats sink. | ||
func (p *Prometheus) write() { | ||
// Create a snapshot and restore it straight away | ||
snapshot := p.reader.Snapshot() | ||
m, err := stats.Restore(snapshot) | ||
if err != nil { | ||
return | ||
} | ||
|
||
// Send the node and process-level metrics through | ||
metrics := m.ToMap() | ||
p.gauge(metrics, "node.peers") | ||
p.gauge(metrics, "node.conns") | ||
p.gauge(metrics, "node.subs") | ||
|
||
for name := range metrics { | ||
prefix := strings.Split(name, ".")[0] | ||
switch prefix { | ||
case "rcv", "send": | ||
p.histogram(metrics, name) | ||
} | ||
} | ||
} | ||
|
||
// addGauge creates a gauge and maps it to a metric name | ||
func (p *Prometheus) addGauge(metric string) prometheus.Gauge { | ||
opts := prometheus.GaugeOpts{ | ||
Name: strings.Replace(metric, ".", "_", -1), | ||
} | ||
|
||
g := prometheus.NewGauge(opts) | ||
if err := p.registry.Register(g); err != nil { | ||
panic(err) | ||
} | ||
|
||
p.gauges[metric] = g | ||
|
||
return g | ||
} | ||
|
||
func (p *Prometheus) addHistogram(metric string) prometheus.Histogram { | ||
opts := prometheus.HistogramOpts{ | ||
Name: strings.Replace(metric, ".", "_", -1), | ||
} | ||
h := prometheus.NewHistogram(opts) | ||
if err := p.registry.Register(h); err != nil { | ||
panic(err) | ||
} | ||
p.histograms[metric] = h | ||
return h | ||
} | ||
|
||
// sends the metric as a gauge | ||
func (p *Prometheus) gauge(source map[string]stats.Snapshot, metric string) { | ||
if v, ok := source[metric]; ok { | ||
g, ok := p.gauges[metric] | ||
if !ok { | ||
g = p.addGauge(metric) | ||
} | ||
g.Set(float64(v.Max())) | ||
} | ||
} | ||
|
||
// sends the metric as a histogram | ||
func (p *Prometheus) histogram(source map[string]stats.Snapshot, metric string) { | ||
if v, ok := source[metric]; ok { | ||
for _, sample := range v.Sample { | ||
h, ok := p.histograms[metric] | ||
if !ok { | ||
h = p.addHistogram(metric) | ||
} | ||
h.Observe(float64(sample)) | ||
} | ||
} | ||
} | ||
|
||
// Close gracefully terminates the storage and ensures that every related | ||
// resource is properly disposed. | ||
func (p *Prometheus) Close() error { | ||
if p.cancel != nil { | ||
p.cancel() | ||
} | ||
|
||
return nil | ||
} |
This file contains bidirectional Unicode text that may be interpreted or compiled differently than what appears below. To review, open the file in an editor that reveals hidden Unicode characters.
Learn more about bidirectional Unicode characters
Original file line number | Diff line number | Diff line change |
---|---|---|
@@ -0,0 +1,84 @@ | ||
package monitor | ||
|
||
import ( | ||
"io/ioutil" | ||
"log" | ||
"net/http" | ||
"net/http/httptest" | ||
"testing" | ||
|
||
"github.com/emitter-io/stats" | ||
"github.com/stretchr/testify/assert" | ||
) | ||
|
||
func TestPrometheus_HappyPath(t *testing.T) { | ||
m := stats.New() | ||
for i := int32(0); i < 100; i++ { | ||
m.Measure("proc.test", i) | ||
m.Measure("node.test", i) | ||
m.Measure("rcv.test", i) | ||
} | ||
|
||
mux := http.NewServeMux() | ||
|
||
s := NewPrometheus(m, mux) | ||
defer s.Close() | ||
|
||
err := s.Configure(map[string]interface{}{ | ||
"interval": 1000000.00, | ||
}) | ||
assert.NoError(t, err) | ||
assert.NotPanics(t, func() { | ||
s.write() | ||
}) | ||
} | ||
|
||
func TestPrometheus_Request(t *testing.T) { | ||
|
||
m := stats.New() | ||
for i := int32(0); i < 100; i++ { | ||
m.Measure("proc.test", i) | ||
m.Measure("node.test", i) | ||
m.Measure("rcv.test", i/10) | ||
m.Measure("node.peers", 2) | ||
m.Measure("node.conns", i) | ||
m.Measure("node.subs", i) | ||
} | ||
|
||
mux := http.NewServeMux() | ||
s := NewPrometheus(m, mux) | ||
defer s.Close() | ||
|
||
err := s.Configure(map[string]interface{}{ | ||
"interval": 1000000.00, | ||
}) | ||
|
||
ts := httptest.NewServer(mux) | ||
defer ts.Close() | ||
|
||
res, err := http.Get(ts.URL + "/metrics") | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
content, err := ioutil.ReadAll(res.Body) | ||
res.Body.Close() | ||
if err != nil { | ||
log.Fatal(err) | ||
} | ||
|
||
// assert gauges | ||
assert.Contains(t, string(content), "node_peers 2") | ||
assert.Contains(t, string(content), "node_subs 99") | ||
assert.Contains(t, string(content), "node_conns 99") | ||
|
||
// assert histograms | ||
assert.Contains(t, string(content), "rcv_test_bucket{le=\"0.01\"} 10") | ||
assert.Contains(t, string(content), "rcv_test_sum 450") | ||
assert.Contains(t, string(content), "rcv_test_count 100") | ||
|
||
// from InstrumentMetricHandler | ||
assert.Contains(t, string(content), "promhttp_metric_handler_requests_total") | ||
|
||
// from the NewGoCollector | ||
assert.Contains(t, string(content), "go_threads") | ||
} |