Skip to content

Commit

Permalink
Make the channels way easier
Browse files Browse the repository at this point in the history
  • Loading branch information
dupondje committed Mar 27, 2020
1 parent 259d562 commit d1d2c85
Show file tree
Hide file tree
Showing 5 changed files with 21 additions and 84 deletions.
1 change: 0 additions & 1 deletion backend/backend.go
Original file line number Diff line number Diff line change
Expand Up @@ -26,7 +26,6 @@ import (
// Channels are use for unscheduled backend
type Channels struct {
Request *chan Point
Done *chan bool
}

// Backend Interface
Expand Down
46 changes: 7 additions & 39 deletions backend/prometheus.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,7 +4,6 @@ package backend
import (
"fmt"
"log"
"time"

"github.com/prometheus/client_golang/prometheus"
)
Expand All @@ -20,8 +19,7 @@ func (backend *Config) Collect(ch chan<- prometheus.Metric) {
log.Println("prometheus: requesting metrics")

request := make(chan Point, 100)
done := make(chan bool)
channels := Channels{Request: &request, Done: &done}
channels := Channels{Request: &request}

select {
case *queries <- channels:
Expand All @@ -33,43 +31,13 @@ func (backend *Config) Collect(ch chan<- prometheus.Metric) {

// points received
points := 0
// handle timeout between point reception
rectimer := time.NewTimer(100 * time.Millisecond)
// check that the collection threads have finished
recdone := false
for {
select {
case point := <-*channels.Request:
// reset timer
if !rectimer.Stop() {
select {
case <-rectimer.C:
default:
}
}
rectimer.Reset(100 * time.Millisecond)
// increase points
points++
// send point to prometheus
backend.PrometheusSend(ch, point)
case <-*channels.Done:
recdone = true
// reset timer
if !rectimer.Stop() {
select {
case <-rectimer.C:
default:
}
}
rectimer.Reset(100 * time.Millisecond)
case <-rectimer.C:
// only exit when done and timeout
if recdone {
log.Printf("prometheus: sent %d points", points)
return
}
}
for point := range *channels.Request {
// increase points
points++
// send point to prometheus
backend.PrometheusSend(ch, point)
}
log.Printf("prometheus: sent %d points", points)
}

//PrometheusSend sends a point to prometheus
Expand Down
53 changes: 11 additions & 42 deletions backend/thinprometheusclient.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@ import (
"log"
"sort"
"strings"
"time"

"github.com/cblomart/vsphere-graphite/utils"
"github.com/valyala/fasthttp"
Expand Down Expand Up @@ -53,8 +52,7 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
}
// prepare the channels for the request
request := make(chan Point, 100)
done := make(chan bool)
channels := Channels{Request: &request, Done: &done}
channels := Channels{Request: &request}
// create a buffer to organise metrics per type
buffer := map[string][]string{}
log.Println("thinprom: sending query request")
Expand All @@ -66,49 +64,20 @@ func requestHandler(ctx *fasthttp.RequestCtx) {
ctx.Error("Query buffer full", fasthttp.StatusConflict)
return
}
// start a timeout
timeout := time.NewTimer(100 * time.Millisecond)

// collected points
points := 0
// receive done
recdone := false

log.Println("thinprom: waiting for query results")
L:
for {
select {
case point := <-*channels.Request:
// reset timer
if !timeout.Stop() {
select {
case <-timeout.C:
default:
}
}
timeout.Reset(100 * time.Millisecond)
// increased received points
points++
// add point to the buffer
addToThinPrometheusBuffer(buffer, &point)
case <-*channels.Done:
// finish consuming metrics and break loop
log.Println("thinprom: signaled the end of the collection")
recdone = true
// reset timer
if !timeout.Stop() {
select {
case <-timeout.C:
default:
}
}
timeout.Reset(100 * time.Millisecond)
case <-timeout.C:
// stop timer
if recdone {
log.Printf("thinprom: sent %d points", points)
break L
}
}
for point := range *channels.Request {
// increased received points
points++
// add point to the buffer
addToThinPrometheusBuffer(buffer, &point)
}
log.Println("thinprom: signaled the end of the collection")
log.Printf("thinprom: sent %d points", points)

ctx.SetContentType("text/plain; charset=utf8")
var outbuff bytes.Buffer
for key, vals := range buffer {
Expand Down
2 changes: 2 additions & 0 deletions vsphere-graphite-example.json
Original file line number Diff line number Diff line change
@@ -1,6 +1,8 @@
{
"Domain": ".contoso.com",
"ReplacePoint": false,
"CPUProfiling": false,
"MEMProfiling": false,
"Properties": [ "all" ],
"Interval": 60,
"FlushSize": 1000,
Expand Down
3 changes: 1 addition & 2 deletions vsphere-graphite.go
Original file line number Diff line number Diff line change
Expand Up @@ -329,8 +329,7 @@ func (service *Service) Manage() (string, error) {
go queryVCenter(*vcenter, conf, request.Request, &wg)
}
wg.Wait()
//time.Sleep(5 * time.Second)
*request.Done <- true
close(*request.Request)
cleanup <- true
}()
case <-ticker.C:
Expand Down

0 comments on commit d1d2c85

Please sign in to comment.