Skip to content

Commit

Permalink
Prometheus: Implement selective collection
Browse files Browse the repository at this point in the history
As the Prometheus scraper (prometheus side) is single threaded, it might
be usefull to only poll 1 vcenter.
This commit adds a /scrape page, that has a target parameter.
The parameter is the same as the hostname of a vcenter.

Then only the metrics of that vcenter are returned!
  • Loading branch information
dupondje committed Jun 4, 2021
1 parent bb768c9 commit 0798644
Show file tree
Hide file tree
Showing 5 changed files with 100 additions and 31 deletions.
36 changes: 35 additions & 1 deletion backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,7 @@ import (
// Channels are use for unscheduled backend
type Channels struct {
Request *chan Point
Target string
}

// Backend Interface
Expand Down Expand Up @@ -146,12 +147,14 @@ func (backend *Config) Init() (*chan Channels, error) {
//Initialize Prometheus client
log.Printf("backend %s: initializing\n", backendType)
registry := prometheus.NewRegistry()
err := registry.Register(backend)
err := registry.Register(&PrometheusBackend{Config: backend})
if err != nil {
log.Printf("backend %s: error creating registry - %s\n", backendType, err)
return queries, err
}
http.Handle("/metrics", promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError}))
http.HandleFunc("/scrape", backend.scrapeHandler)
backend.promCollectors = make(map[string]*PrometheusBackend)
go func() error {
address := ""
if len(backend.Hostname) > 0 {
Expand Down Expand Up @@ -217,6 +220,13 @@ func (backend *Config) Init() (*chan Channels, error) {
}
}

// InitPrometheus : Init Multiple channels for prometheus
func (backend *Config) InitPrometheus(vcenter string) error {
backend.promCollectors[vcenter] = &PrometheusBackend{Config: backend, Target: vcenter}

return nil
}

// Clean : take actions on backend when cycle finished
func (backend *Config) Clean() {
switch backendType := strings.ToLower(backend.Type); backendType {
Expand Down Expand Up @@ -448,3 +458,27 @@ func (backend *Config) HasMetadata() bool {
return true
}
}

func (backend *Config) scrapeHandler(w http.ResponseWriter, r *http.Request) {
target := r.URL.Query().Get("target")
if target == "" {
http.Error(w, "'target' parameter must be specified", 400)
return
}

if _, found := backend.promCollectors[target]; !found {
http.Error(w, "VCenter not found", 400)
return
}

log.Printf("Setting target to %s\n", target)
registry := prometheus.NewRegistry()
err := registry.Register(backend.promCollectors[target])
if err != nil {
http.Error(w, "error creating registry ", 400)
return
}

h := promhttp.HandlerFor(registry, promhttp.HandlerOpts{ErrorHandling: promhttp.ContinueOnError})
h.ServeHTTP(w, r)
}
39 changes: 23 additions & 16 deletions backend/config.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,20 +12,27 @@ import (

// Config : storage backend
type Config struct {
Hostname string
ValueField string
Database string
Username string
Password string
Type string
Prefix string
Port int
NoArray bool
Encrypted bool
carbon *graphite.Graphite
influx *influxclient.Client
thininfluxdb *thininfluxclient.ThinInfluxClient
elastic *elastic.Client
fluent *fluent.Fluent
kafka *kafka.Writer
Hostname string
ValueField string
Database string
Username string
Password string
Type string
Prefix string
Port int
NoArray bool
Encrypted bool
carbon *graphite.Graphite
influx *influxclient.Client
thininfluxdb *thininfluxclient.ThinInfluxClient
elastic *elastic.Client
fluent *fluent.Fluent
promCollectors map[string]*PrometheusBackend
kafka *kafka.Writer
}

// PrometheusBackend : Extend prometheus.Collector
type PrometheusBackend struct {
Config *Config
Target string
}
16 changes: 8 additions & 8 deletions backend/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -9,17 +9,17 @@ import (
)

// Describe : Implementation of Prometheus Collector.Describe
func (backend *Config) Describe(ch chan<- *prometheus.Desc) {
func (backend *PrometheusBackend) Describe(ch chan<- *prometheus.Desc) {
prometheus.NewGauge(prometheus.GaugeOpts{Name: "Dummy", Help: "Dummy"}).Describe(ch)
}

// Collect : Implementation of Prometheus Collector.Collect
func (backend *Config) Collect(ch chan<- prometheus.Metric) {
func (backend *PrometheusBackend) Collect(ch chan<- prometheus.Metric) {

log.Println("prometheus: requesting metrics")
log.Printf("prometheus: requesting metrics for %s\n", backend.Target)

request := make(chan Point, 100)
channels := Channels{Request: &request}
request := make(chan Point, 10000)
channels := Channels{Request: &request, Target: backend.Target}

select {
case *queries <- channels:
Expand All @@ -41,8 +41,8 @@ func (backend *Config) Collect(ch chan<- prometheus.Metric) {
}

//PrometheusSend sends a point to prometheus
func (backend *Config) PrometheusSend(ch chan<- prometheus.Metric, point Point) {
tags := point.GetTags(backend.NoArray, ",")
func (backend *PrometheusBackend) PrometheusSend(ch chan<- prometheus.Metric, point Point) {
tags := point.GetTags(backend.Config.NoArray, ",")
labelNames := make([]string, len(tags))
labelValues := make([]string, len(tags))
i := 0
Expand All @@ -51,7 +51,7 @@ func (backend *Config) PrometheusSend(ch chan<- prometheus.Metric, point Point)
labelValues[i] = value
i++
}
key := fmt.Sprintf("%s_%s_%s_%s", backend.Prefix, point.Group, point.Counter, point.Rollup)
key := fmt.Sprintf("%s_%s_%s_%s", backend.Config.Prefix, point.Group, point.Counter, point.Rollup)
desc := prometheus.NewDesc(key, "vSphere collected metric", labelNames, nil)
metric, err := prometheus.NewConstMetric(desc, prometheus.GaugeValue, float64(point.Value), labelValues...)
if err != nil {
Expand Down
13 changes: 11 additions & 2 deletions backend/thinprometheusclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -46,13 +46,22 @@ func (client *ThinPrometheusClient) ListenAndServe() error {
}

func requestHandler(ctx *fasthttp.RequestCtx) {
if string(ctx.Path()) != "/metrics" {
if string(ctx.Path()) != "/metrics" && string(ctx.Path()) != "/scrape" {
ctx.Error("Unsupported path", fasthttp.StatusNotFound)
return
}
target := ""
// id /scape a target should be added
if string(ctx.Path()) == "/scrape" {
target = string(ctx.QueryArgs().Peek("target"))
if target == "" {
ctx.Error("'target' parameter must be specified", fasthttp.StatusNotFound)
return
}
}
// prepare the channels for the request
request := make(chan Point, 100)
channels := Channels{Request: &request}
channels := Channels{Request: &request, Target: target}
// create a buffer to organise metrics per type
buffer := map[string][]string{}
log.Println("thinprom: sending query request")
Expand Down
27 changes: 23 additions & 4 deletions vsphere-graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -242,6 +242,15 @@ func (service *Service) Manage() (string, error) {
}
defer conf.Backend.Disconnect()

if conf.Backend.Type == "prometheus" {
for _, vcenter := range conf.VCenters {
err := conf.Backend.InitPrometheus(vcenter.Hostname)
if err != nil {
return "Could not initialize backend", err
}
}
}

//check properties in function of backend support of metadata
if !conf.Backend.HasMetadata() {
properties := []string{}
Expand Down Expand Up @@ -323,12 +332,22 @@ func (service *Service) Manage() (string, error) {
// wait group for non scheduled metric retrival
var wg sync.WaitGroup

log.Println("adhoc metric retrieval")
wg.Add(len(conf.VCenters))
for _, vcenter := range conf.VCenters {
go queryVCenter(*vcenter, conf, request.Request, &wg)
log.Printf("adhoc metric retrieval for %s\n", request.Target)
if request.Target != "" {
for _, vcenter := range conf.VCenters {
if request.Target == vcenter.Hostname {
wg.Add(1)
go queryVCenter(*vcenter, conf, request.Request, &wg)
}
}
} else {
wg.Add(len(conf.VCenters))
for _, vcenter := range conf.VCenters {
go queryVCenter(*vcenter, conf, request.Request, &wg)
}
}
wg.Wait()
log.Printf("Completed adhoc metric retrieval for %s\n", request.Target)
close(*request.Request)
cleanup <- true
}()
Expand Down

0 comments on commit 0798644

Please sign in to comment.