Skip to content

Commit

Permalink
fix(inputs.gnmi): Register connection statistics before creating clie…
Browse files Browse the repository at this point in the history
…nt (#16171)
  • Loading branch information
Mrflatt authored Nov 12, 2024
1 parent b4fdd52 commit e5e52f0
Show file tree
Hide file tree
Showing 2 changed files with 21 additions and 19 deletions.
9 changes: 8 additions & 1 deletion plugins/inputs/gnmi/gnmi.go
Original file line number Diff line number Diff line change
Expand Up @@ -7,6 +7,7 @@ import (
_ "embed"
"errors"
"fmt"
"net"
"strings"
"sync"
"time"
Expand Down Expand Up @@ -272,8 +273,14 @@ func (c *GNMI) Start(acc telegraf.Accumulator) error {
go func(addr string) {
defer c.wg.Done()

host, port, err := net.SplitHostPort(addr)
if err != nil {
acc.AddError(fmt.Errorf("unable to parse address %s: %w", addr, err))
return
}
h := handler{
address: addr,
host: host,
port: port,
aliases: c.internalAliases,
tagsubs: c.TagSubscriptions,
maxMsgSize: int(c.MaxMsgSize),
Expand Down
31 changes: 13 additions & 18 deletions plugins/inputs/gnmi/handler.go
Original file line number Diff line number Diff line change
Expand Up @@ -33,7 +33,8 @@ import (
const eidJuniperTelemetryHeader = 1

type handler struct {
address string
host string
port string
aliases map[*pathInfo]string
tagsubs []tagSubscription
maxMsgSize int
Expand Down Expand Up @@ -72,7 +73,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
opts = append(opts, grpc.WithKeepaliveParams(h.ClientParameters))
}

client, err := grpc.NewClient(h.address, opts...)
// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.host})
defer connectStat.Set(0)

address := net.JoinHostPort(h.host, h.port)
client, err := grpc.NewClient(address, opts...)
if err != nil {
return fmt.Errorf("failed to dial: %w", err)
}
Expand All @@ -88,21 +96,14 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
if err := subscribeClient.Send(request); err != nil && !errors.Is(err, io.EOF) {
return fmt.Errorf("failed to send subscription request: %w", err)
}

h.log.Debugf("Connection to gNMI device %s established", h.address)

// Used to report the status of the TCP connection to the device. If the
// GNMI connection goes down, but TCP is still up this will still report
// connected until the TCP connection times out.
connectStat := selfstat.Register("gnmi", "grpc_connection_status", map[string]string{"source": h.address})
connectStat.Set(1)
h.log.Debugf("Connection to gNMI device %s established", address)

defer h.log.Debugf("Connection to gNMI device %s closed", h.address)
defer h.log.Debugf("Connection to gNMI device %s closed", address)
for ctx.Err() == nil {
var reply *gnmi.SubscribeResponse
if reply, err = subscribeClient.Recv(); err != nil {
if !errors.Is(err, io.EOF) && ctx.Err() == nil {
connectStat.Set(0)
return fmt.Errorf("aborted gNMI subscription: %w", err)
}
break
Expand All @@ -121,8 +122,6 @@ func (h *handler) subscribeGNMI(ctx context.Context, acc telegraf.Accumulator, t
h.handleSubscribeResponseUpdate(acc, response, reply.GetExtension())
}
}

connectStat.Set(0)
return nil
}

Expand Down Expand Up @@ -164,11 +163,7 @@ func (h *handler) handleSubscribeResponseUpdate(acc telegraf.Accumulator, respon
prefix := newInfoFromPath(response.Update.Prefix)

// Add info to the tags
var err error
headerTags["source"], _, err = net.SplitHostPort(h.address)
if err != nil {
h.log.Errorf("unable to parse address %s: %v", h.address, err)
}
headerTags["source"] = h.host
if !prefix.empty() {
headerTags["path"] = prefix.fullPath()
}
Expand Down

0 comments on commit e5e52f0

Please sign in to comment.