Skip to content

Commit

Permalink
feat: expose rebalance and membership metrics #29
Browse files Browse the repository at this point in the history
  • Loading branch information
erayarslan committed Mar 26, 2023
1 parent e8f7c87 commit a399095
Show file tree
Hide file tree
Showing 5 changed files with 167 additions and 24 deletions.
9 changes: 7 additions & 2 deletions api.go
Original file line number Diff line number Diff line change
Expand Up @@ -72,10 +72,15 @@ func (s *api) followers(c *fiber.Ctx) error {
return c.JSON(s.serviceDiscovery.GetAll())
}

func NewAPI(config *helpers.Config, client gDcp.Client, stream Stream, serviceDiscovery servicediscovery.ServiceDiscovery) API {
func NewAPI(config *helpers.Config,
client gDcp.Client,
stream Stream,
serviceDiscovery servicediscovery.ServiceDiscovery,
vBucketDiscovery VBucketDiscovery,
) API {
app := fiber.New(fiber.Config{DisableStartupMessage: true})

metricMiddleware, err := NewMetricMiddleware(app, config, stream, client)
metricMiddleware, err := NewMetricMiddleware(app, config, stream, client, vBucketDiscovery)

if err == nil {
app.Use(metricMiddleware)
Expand Down
2 changes: 1 addition & 1 deletion dcp.go
Original file line number Diff line number Diff line change
Expand Up @@ -130,7 +130,7 @@ func (s *dcp) Start() {
s.api.Shutdown()
}()

s.api = NewAPI(s.config, s.client, s.stream, s.serviceDiscovery)
s.api = NewAPI(s.config, s.client, s.stream, s.serviceDiscovery, s.vBucketDiscovery)
s.api.Listen()
}()
}
Expand Down
131 changes: 120 additions & 11 deletions metric.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,8 +15,9 @@ import (
)

type metricCollector struct {
stream Stream
client gDcp.Client
stream Stream
client gDcp.Client
vBucketDiscovery VBucketDiscovery

mutation *prometheus.Desc
deletion *prometheus.Desc
Expand All @@ -27,8 +28,16 @@ type metricCollector struct {
endSeqNo *prometheus.Desc

averageProcessMs *prometheus.Desc
rebalanceCount *prometheus.Desc

lag *prometheus.Desc

totalMembers *prometheus.Desc
memberNumber *prometheus.Desc
membershipType *prometheus.Desc
vBucketCount *prometheus.Desc
vBucketRangeStart *prometheus.Desc
vBucketRangeEnd *prometheus.Desc
}

func (s *metricCollector) Describe(ch chan<- *prometheus.Desc) {
Expand Down Expand Up @@ -75,21 +84,21 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
for vbID, offset := range offsets {
ch <- prometheus.MustNewConstMetric(
s.currentSeqNo,
prometheus.CounterValue,
prometheus.GaugeValue,
float64(offset.SeqNo),
strconv.Itoa(int(vbID)),
)

ch <- prometheus.MustNewConstMetric(
s.startSeqNo,
prometheus.CounterValue,
prometheus.GaugeValue,
float64(offset.StartSeqNo),
strconv.Itoa(int(vbID)),
)

ch <- prometheus.MustNewConstMetric(
s.endSeqNo,
prometheus.CounterValue,
prometheus.GaugeValue,
float64(offset.EndSeqNo),
strconv.Itoa(int(vbID)),
)
Expand All @@ -108,7 +117,7 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
} else {
ch <- prometheus.MustNewConstMetric(
s.lag,
prometheus.CounterValue,
prometheus.GaugeValue,
lag,
strconv.Itoa(int(vbID)),
)
Expand All @@ -125,12 +134,65 @@ func (s *metricCollector) Collect(ch chan<- prometheus.Metric) {
streamMetric.AverageProcessMs.Value(),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.rebalanceCount,
prometheus.CounterValue,
float64(streamMetric.RebalanceCount),
[]string{}...,
)

vBucketDiscoveryMetric := s.vBucketDiscovery.GetMetric()

ch <- prometheus.MustNewConstMetric(
s.totalMembers,
prometheus.GaugeValue,
float64(vBucketDiscoveryMetric.TotalMembers),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.memberNumber,
prometheus.GaugeValue,
float64(vBucketDiscoveryMetric.MemberNumber),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.membershipType,
prometheus.GaugeValue,
0,
[]string{vBucketDiscoveryMetric.Type}...,
)

ch <- prometheus.MustNewConstMetric(
s.vBucketCount,
prometheus.GaugeValue,
float64(vBucketDiscoveryMetric.VBucketCount),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.vBucketRangeStart,
prometheus.GaugeValue,
float64(vBucketDiscoveryMetric.VBucketRangeStart),
[]string{}...,
)

ch <- prometheus.MustNewConstMetric(
s.vBucketRangeEnd,
prometheus.GaugeValue,
float64(vBucketDiscoveryMetric.VBucketRangeEnd),
[]string{}...,
)
}

func newMetricCollector(client gDcp.Client, stream Stream) *metricCollector {
//nolint:funlen
func newMetricCollector(client gDcp.Client, stream Stream, vBucketDiscovery VBucketDiscovery) *metricCollector {
return &metricCollector{
stream: stream,
client: client,
stream: stream,
client: client,
vBucketDiscovery: vBucketDiscovery,

mutation: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "mutation", "total"),
Expand Down Expand Up @@ -180,11 +242,58 @@ func newMetricCollector(client gDcp.Client, stream Stream) *metricCollector {
[]string{},
nil,
),
rebalanceCount: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "rebalance_count", "current"),
"Rebalance count",
[]string{},
nil,
),
totalMembers: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "total_members", "current"),
"Total members",
[]string{},
nil,
),
memberNumber: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "member_number", "current"),
"Member number",
[]string{},
nil,
),
membershipType: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "membership_type", "current"),
"Membership type",
[]string{"type"},
nil,
),
vBucketCount: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "vbucket_count", "current"),
"VBucket count",
[]string{},
nil,
),
vBucketRangeStart: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "vbucket_range_start", "current"),
"VBucket range start",
[]string{},
nil,
),
vBucketRangeEnd: prometheus.NewDesc(
prometheus.BuildFQName(helpers.Name, "vbucket_range_end", "current"),
"VBucket range end",
[]string{},
nil,
),
}
}

func NewMetricMiddleware(app *fiber.App, config *helpers.Config, stream Stream, client gDcp.Client) (func(ctx *fiber.Ctx) error, error) {
err := prometheus.DefaultRegisterer.Register(newMetricCollector(client, stream))
func NewMetricMiddleware(app *fiber.App,
config *helpers.Config,
stream Stream,
client gDcp.Client,
vBucketDiscovery VBucketDiscovery,
) (func(ctx *fiber.Ctx) error, error) {
err := prometheus.DefaultRegisterer.Register(newMetricCollector(client, stream, vBucketDiscovery))
if err != nil {
return nil, err
}
Expand Down
11 changes: 7 additions & 4 deletions stream.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,19 +24,20 @@ type Stream interface {
UnlockOffsets()
GetOffsets() (map[uint16]*models.Offset, map[uint16]bool, bool)
GetObserver() gDcp.Observer
GetMetric() StreamMetric
GetMetric() *StreamMetric
UnmarkDirtyOffsets()
}

type StreamMetric struct {
AverageProcessMs ewma.MovingAverage
RebalanceCount int
}

type stream struct {
client gDcp.Client
metadata Metadata
checkpoint Checkpoint
metric StreamMetric
metric *StreamMetric
observer gDcp.Observer
vBucketDiscovery VBucketDiscovery
collectionIDs map[uint32]string
Expand Down Expand Up @@ -162,6 +163,8 @@ func (s *stream) rebalance() {
s.balancing = false

logger.Log.Printf("rebalance is finished")

s.metric.RebalanceCount++
}

func (s *stream) Rebalance() {
Expand Down Expand Up @@ -249,7 +252,7 @@ func (s *stream) GetObserver() gDcp.Observer {
return s.observer
}

func (s *stream) GetMetric() StreamMetric {
func (s *stream) GetMetric() *StreamMetric {
return s.metric
}

Expand Down Expand Up @@ -277,7 +280,7 @@ func NewStream(client gDcp.Client,
collectionIDs: collectionIDs,
activeStreams: &sync.WaitGroup{},
stopCh: stopCh,
metric: StreamMetric{
metric: &StreamMetric{
AverageProcessMs: ewma.NewMovingAverage(config.Metric.AverageWindowSec),
},
}
Expand Down
38 changes: 32 additions & 6 deletions vbucket_discovery.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,11 +15,22 @@ import (
type VBucketDiscovery interface {
Get() []uint16
Close()
GetMetric() *VBucketDiscoveryMetric
}

type vBucketDiscovery struct {
membership membership.Membership
vBucketNumber int
membership membership.Membership
vBucketNumber int
vBucketDiscoveryMetric *VBucketDiscoveryMetric
}

type VBucketDiscoveryMetric struct {
TotalMembers int
MemberNumber int
Type string
VBucketCount int
VBucketRangeStart uint16
VBucketRangeEnd uint16
}

func (s *vBucketDiscovery) Get() []uint16 {
Expand All @@ -32,14 +43,21 @@ func (s *vBucketDiscovery) Get() []uint16 {
receivedInfo := s.membership.GetInfo()

readyToStreamVBuckets := helpers.ChunkSlice[uint16](vBuckets, receivedInfo.TotalMembers)[receivedInfo.MemberNumber-1]

start := readyToStreamVBuckets[0]
end := readyToStreamVBuckets[len(readyToStreamVBuckets)-1]

logger.Log.Printf(
"member: %v/%v, vbucket range: %v-%v",
receivedInfo.MemberNumber,
receivedInfo.TotalMembers,
readyToStreamVBuckets[0],
readyToStreamVBuckets[len(readyToStreamVBuckets)-1],
receivedInfo.MemberNumber, receivedInfo.TotalMembers,
start, end,
)

s.vBucketDiscoveryMetric.TotalMembers = receivedInfo.TotalMembers
s.vBucketDiscoveryMetric.MemberNumber = receivedInfo.MemberNumber
s.vBucketDiscoveryMetric.VBucketRangeStart = start
s.vBucketDiscoveryMetric.VBucketRangeEnd = end

return readyToStreamVBuckets
}

Expand All @@ -48,6 +66,10 @@ func (s *vBucketDiscovery) Close() {
logger.Log.Printf("vbucket discovery closed")
}

func (s *vBucketDiscovery) GetMetric() *VBucketDiscoveryMetric {
return s.vBucketDiscoveryMetric
}

func NewVBucketDiscovery(client gDcp.Client,
config *helpers.Config,
vBucketNumber int,
Expand Down Expand Up @@ -75,5 +97,9 @@ func NewVBucketDiscovery(client gDcp.Client,
return &vBucketDiscovery{
vBucketNumber: vBucketNumber,
membership: ms,
vBucketDiscoveryMetric: &VBucketDiscoveryMetric{
VBucketCount: vBucketNumber,
Type: config.Dcp.Group.Membership.Type,
},
}
}

0 comments on commit a399095

Please sign in to comment.