Skip to content

Commit

Permalink
[RSDK-8292] Create Single App Connection and Use for Net Appender and…
Browse files Browse the repository at this point in the history
… Restart Checker (viamrobotics#4746)
  • Loading branch information
bashar-515 authored Jan 29, 2025
1 parent 85e76a9 commit a5a072f
Show file tree
Hide file tree
Showing 4 changed files with 123 additions and 19 deletions.
8 changes: 5 additions & 3 deletions config/reader.go
Original file line number Diff line number Diff line change
Expand Up @@ -182,7 +182,9 @@ func isLocationSecretsEqual(prevCloud, cloud *Cloud) bool {
return true
}

func getTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) {
// GetTimeoutCtx returns a context [and its cancel function] with a timeout value determined by whether we are behind a proxy and whether a
// cached config exists.
func GetTimeoutCtx(ctx context.Context, shouldReadFromCache bool, id string) (context.Context, func()) {
timeout := readTimeout
// When environment indicates we are behind a proxy, bump timeout. Network
// operations tend to take longer when behind a proxy.
Expand Down Expand Up @@ -258,7 +260,7 @@ func readFromCloud(
if !cfg.Cloud.SignalingInsecure && (checkForNewCert || tls.certificate == "" || tls.privateKey == "") {
logger.Debug("reading tlsCertificate from the cloud")

ctxWithTimeout, cancel := getTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
certData, err := readCertificateDataFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
if err != nil {
cancel()
Expand Down Expand Up @@ -649,7 +651,7 @@ func processConfig(unprocessedConfig *Config, fromCloud bool, logger logging.Log
func getFromCloudOrCache(ctx context.Context, cloudCfg *Cloud, shouldReadFromCache bool, logger logging.Logger) (*Config, bool, error) {
var cached bool

ctxWithTimeout, cancel := getTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
ctxWithTimeout, cancel := GetTimeoutCtx(ctx, shouldReadFromCache, cloudCfg.ID)
defer cancel()

cfg, errorShouldCheckCache, err := getFromCloudGRPC(ctxWithTimeout, cloudCfg, logger)
Expand Down
104 changes: 104 additions & 0 deletions grpc/app_conn.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,104 @@
package grpc

import (
"context"
"net/url"
"time"

"go.viam.com/utils"
"go.viam.com/utils/rpc"

"go.viam.com/rdk/config"
"go.viam.com/rdk/logging"
)

// AppConn maintains an underlying client connection meant to be used globally to connect to App. The `AppConn` constructor repeatedly
// attempts to dial App until a connection is successfully established.
type AppConn struct {
ReconfigurableClientConn

dialer *utils.StoppableWorkers
}

// NewAppConn creates an `AppConn` instance with a gRPC client connection to App. An initial dial attempt blocks. If it errors, the error
// is returned. If it times out, an `AppConn` object with a nil underlying client connection will return. Serialized attempts at
// establishing a connection to App will continue to occur, however, in a background Goroutine. These attempts will continue until a
// connection is made. If `cloud` is nil, an `AppConn` with a nil underlying connection will return, and the background dialer will not
// start.
func NewAppConn(ctx context.Context, cloud *config.Cloud, logger logging.Logger) (*AppConn, error) {
appConn := &AppConn{}

if cloud == nil {
return appConn, nil
}

grpcURL, err := url.Parse(cloud.AppAddress)
if err != nil {
return nil, err
}

dialOpts := dialOpts(cloud)

if grpcURL.Scheme == "http" {
dialOpts = append(dialOpts, rpc.WithInsecure())
}

ctxWithTimeout, ctxWithTimeoutCancel := config.GetTimeoutCtx(ctx, true, cloud.ID)
defer ctxWithTimeoutCancel()

// lock not necessary here because call is blocking
appConn.conn, err = rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...)
if err == nil {
return appConn, nil
}

appConn.dialer = utils.NewStoppableWorkers(ctx)

appConn.dialer.Add(func(ctx context.Context) {
for {
if ctx.Err() != nil {
return
}

ctxWithTimeout, ctxWithTimeoutCancel := context.WithTimeout(ctx, 5*time.Second)
conn, err := rpc.DialDirectGRPC(ctxWithTimeout, grpcURL.Host, logger, dialOpts...)
ctxWithTimeoutCancel()
if err != nil {
logger.Debugw("error while dialing App. Could not establish global, unified connection", "error", err)

continue
}

appConn.connMu.Lock()
appConn.conn = conn
appConn.connMu.Unlock()

return
}
})

return appConn, nil
}

// Close attempts to close the underlying connection and stops background dialing attempts.
func (ac *AppConn) Close() error {
if ac.dialer != nil {
ac.dialer.Stop()
}

return ac.ReconfigurableClientConn.Close()
}

func dialOpts(cloud *config.Cloud) []rpc.DialOption {
dialOpts := make([]rpc.DialOption, 0, 2)
// Only add credentials when secret is set.
if cloud.Secret != "" {
dialOpts = append(dialOpts, rpc.WithEntityCredentials(cloud.ID,
rpc.Credentials{
Type: "robot-secret",
Payload: cloud.Secret,
},
))
}
return dialOpts
}
21 changes: 12 additions & 9 deletions web/server/entrypoint.go
Original file line number Diff line number Diff line change
Expand Up @@ -24,6 +24,7 @@ import (
"go.viam.com/utils/rpc"

"go.viam.com/rdk/config"
"go.viam.com/rdk/grpc"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/robot"
Expand Down Expand Up @@ -58,6 +59,7 @@ type robotServer struct {
args Arguments
logger logging.Logger
registry *logging.Registry
conn rpc.ClientConn
}

func logViamEnvVariables(logger logging.Logger) {
Expand Down Expand Up @@ -188,6 +190,13 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error)
defer exporter.Stop()
}

// the underlying connection in `appConn` can be nil. In this case, a background Goroutine is kicked off to reattempt dials in a
// serialized manner
appConn, err := grpc.NewAppConn(ctx, cfgFromDisk.Cloud, logger.Sublogger("networking").Sublogger("app_connection"))
if err != nil {
return err
}

// Start remote logging with config from disk.
// This is to ensure we make our best effort to write logs for failures loading the remote config.
if cfgFromDisk.Cloud != nil && (cfgFromDisk.Cloud.LogPath != "" || cfgFromDisk.Cloud.AppAddress != "") {
Expand All @@ -197,7 +206,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error)
ID: cfgFromDisk.Cloud.ID,
Secret: cfgFromDisk.Cloud.Secret,
},
nil, false, logger.Sublogger("networking").Sublogger("netlogger"),
appConn, false, logger.Sublogger("networking").Sublogger("netlogger"),
)
if err != nil {
return err
Expand All @@ -214,6 +223,7 @@ func RunServer(ctx context.Context, args []string, _ logging.Logger) (err error)
logger: logger,
args: argsParsed,
registry: registry,
conn: appConn,
}

// Run the server with remote logging enabled.
Expand Down Expand Up @@ -457,14 +467,7 @@ func (s *robotServer) serveWeb(ctx context.Context, cfg *config.Config) (err err
cloudRestartCheckerActive = make(chan struct{})
utils.PanicCapturingGo(func() {
defer close(cloudRestartCheckerActive)
restartCheck, err := newRestartChecker(ctx, cfg.Cloud, s.logger)
if err != nil {
if errors.Is(err, context.Canceled) {
return
}
s.logger.Errorw("error creating restart checker", "error", err)
panic(fmt.Sprintf("error creating restart checker: %v", err))
}
restartCheck := newRestartChecker(cfg.Cloud, s.logger, s.conn)
defer restartCheck.close()
restartInterval := defaultNeedsRestartCheckInterval

Expand Down
9 changes: 2 additions & 7 deletions web/server/restart_checker.go
Original file line number Diff line number Diff line change
Expand Up @@ -59,15 +59,10 @@ func (c *needsRestartCheckerGRPC) needsRestart(ctx context.Context) (bool, time.
return res.MustRestart, restartInterval, nil
}

func newRestartChecker(ctx context.Context, cfg *config.Cloud, logger logging.Logger) (needsRestartChecker, error) {
client, err := config.CreateNewGRPCClient(ctx, cfg, logger)
if err != nil {
return nil, err
}

func newRestartChecker(cfg *config.Cloud, logger logging.Logger, client rpc.ClientConn) needsRestartChecker {
return &needsRestartCheckerGRPC{
cfg: cfg,
logger: logger,
client: client,
}, nil
}
}

0 comments on commit a5a072f

Please sign in to comment.