From 2af1019716cd73aa2b8db68e8c9e1ac3bc6b5e53 Mon Sep 17 00:00:00 2001 From: Devin Hilly Date: Mon, 22 May 2023 16:10:50 -0400 Subject: [PATCH] RSDK-2880: Add timestamp information to the replay camera (#2371) --- components/camera/client_test.go | 65 +++++++++++++++++++ components/camera/replaypcd/replaypcd.go | 25 ++++++- components/camera/replaypcd/replaypcd_test.go | 43 ++++++++++++ .../camera/replaypcd/replaypcd_utils_test.go | 16 ++++- robot/client/client.go | 2 + session/context_test.go | 21 ++---- testutils/rpc.go | 34 ++++++++++ utils/contextutils/context.go | 65 +++++++++++++++++++ utils/contextutils/context_test.go | 51 +++++++++++++++ 9 files changed, 302 insertions(+), 20 deletions(-) create mode 100644 utils/contextutils/context.go create mode 100644 utils/contextutils/context_test.go diff --git a/components/camera/client_test.go b/components/camera/client_test.go index 6723e5276a6..f3e6d8d8d4a 100644 --- a/components/camera/client_test.go +++ b/components/camera/client_test.go @@ -14,6 +14,8 @@ import ( "github.com/edaniels/gostream" "go.viam.com/test" "go.viam.com/utils/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "go.viam.com/rdk/components/camera" viamgrpc "go.viam.com/rdk/grpc" @@ -24,6 +26,7 @@ import ( "go.viam.com/rdk/testutils" "go.viam.com/rdk/testutils/inject" rutils "go.viam.com/rdk/utils" + "go.viam.com/rdk/utils/contextutils" ) func TestClient(t *testing.T) { @@ -390,3 +393,65 @@ func TestClientLazyImage(t *testing.T) { test.That(t, conn.Close(), test.ShouldBeNil) } + +func TestClientWithInterceptor(t *testing.T) { + // Set up gRPC server + logger := golog.NewTestLogger(t) + listener1, err := net.Listen("tcp", "localhost:0") + test.That(t, err, test.ShouldBeNil) + rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated()) + test.That(t, err, test.ShouldBeNil) + + // Set up camera that adds timestamps into the gRPC response header. + injectCamera := &inject.Camera{} + + pcA := pointcloud.New() + err = pcA.Set(pointcloud.NewVector(5, 5, 5), nil) + test.That(t, err, test.ShouldBeNil) + + k, v := "hello", "world" + injectCamera.NextPointCloudFunc = func(ctx context.Context) (pointcloud.PointCloud, error) { + var grpcMetadata metadata.MD = make(map[string][]string) + grpcMetadata.Set(k, v) + grpc.SendHeader(ctx, grpcMetadata) + return pcA, nil + } + + // Register CameraService API in our gRPC server. + resources := map[resource.Name]camera.Camera{ + camera.Named(testCameraName): injectCamera, + } + cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources) + test.That(t, err, test.ShouldBeNil) + resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API) + test.That(t, err, test.ShouldBeNil) + test.That(t, ok, test.ShouldBeTrue) + test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil) + + // Start serving requests. + go rpcServer.Serve(listener1) + defer rpcServer.Stop() + + // Set up gRPC client with context with metadata interceptor. + conn, err := viamgrpc.Dial( + context.Background(), + listener1.Addr().String(), + logger, + rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor), + ) + test.That(t, err, test.ShouldBeNil) + camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger) + test.That(t, err, test.ShouldBeNil) + + // Construct a ContextWithMetadata to pass into NextPointCloud and check that the + // interceptor correctly injected the metadata from the gRPC response header into the + // context. + ctx, md := contextutils.ContextWithMetadata(context.Background()) + pcB, err := camera1Client.NextPointCloud(ctx) + test.That(t, err, test.ShouldBeNil) + _, got := pcB.At(5, 5, 5) + test.That(t, got, test.ShouldBeTrue) + test.That(t, md[k][0], test.ShouldEqual, v) + + test.That(t, conn.Close(), test.ShouldBeNil) +} diff --git a/components/camera/replaypcd/replaypcd.go b/components/camera/replaypcd/replaypcd.go index 4e55a6937be..73454034112 100644 --- a/components/camera/replaypcd/replaypcd.go +++ b/components/camera/replaypcd/replaypcd.go @@ -14,6 +14,8 @@ import ( datapb "go.viam.com/api/app/data/v1" goutils "go.viam.com/utils" "go.viam.com/utils/rpc" + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" "google.golang.org/protobuf/types/known/timestamppb" "go.viam.com/rdk/components/camera" @@ -21,6 +23,7 @@ import ( "go.viam.com/rdk/pointcloud" "go.viam.com/rdk/resource" "go.viam.com/rdk/rimage/transform" + "go.viam.com/rdk/utils/contextutils" ) // Model is the model of a replay camera. @@ -146,8 +149,28 @@ func (replay *pcdCamera) NextPointCloud(ctx context.Context) (pointcloud.PointCl return nil, errEndOfDataset } + // If the caller is communicating with the replay camera over gRPC, set the timestamps on + // the gRPC header. + md := resp.GetData()[0].GetMetadata() + if stream := grpc.ServerTransportStreamFromContext(ctx); stream != nil { + var grpcMetadata metadata.MD = make(map[string][]string) + + timeReq := md.GetTimeRequested() + if timeReq != nil { + grpcMetadata.Set(contextutils.TimeRequestedMetadataKey, timeReq.AsTime().Format(time.RFC3339Nano)) + } + timeRec := md.GetTimeReceived() + if timeRec != nil { + grpcMetadata.Set(contextutils.TimeReceivedMetadataKey, timeRec.AsTime().Format(time.RFC3339Nano)) + } + + if err := grpc.SetHeader(ctx, grpcMetadata); err != nil { + return nil, err + } + } + replay.lastData = resp.GetLast() - data := resp.Data[0].GetBinary() + data := resp.GetData()[0].GetBinary() r, err := gzip.NewReader(bytes.NewBuffer(data)) if err != nil { diff --git a/components/camera/replaypcd/replaypcd_test.go b/components/camera/replaypcd/replaypcd_test.go index b8e97e475e0..4647c1dfd4d 100644 --- a/components/camera/replaypcd/replaypcd_test.go +++ b/components/camera/replaypcd/replaypcd_test.go @@ -12,9 +12,12 @@ import ( "go.viam.com/test" "go.viam.com/utils" "go.viam.com/utils/artifact" + "google.golang.org/grpc" "go.viam.com/rdk/internal/cloud" "go.viam.com/rdk/pointcloud" + "go.viam.com/rdk/testutils" + "go.viam.com/rdk/utils/contextutils" ) const datasetDirectory = "slam/mock_lidar/%d.pcd" @@ -424,3 +427,43 @@ func TestUnimplementedFunctions(t *testing.T) { test.That(t, serverClose(), test.ShouldBeNil) } + +// TestNextPointCloudTimestamps tests that calls to NextPointCloud on the replay camera will inject +// the time received and time requested metadata into the gRPC response header. +func TestNextPointCloudTimestamps(t *testing.T) { + // Construct replay camera. + ctx := context.Background() + cfg := &Config{Source: "source"} + replayCamera, serverClose, err := createNewReplayPCDCamera(ctx, t, cfg, true) + test.That(t, err, test.ShouldBeNil) + test.That(t, replayCamera, test.ShouldNotBeNil) + + // Repeatedly call NextPointCloud, checking for timestamps in the gRPC header. + for i := 0; i < numPCDFiles; i++ { + serverStream := testutils.NewServerTransportStream() + ctx = grpc.NewContextWithServerTransportStream(ctx, serverStream) + pc, err := replayCamera.NextPointCloud(ctx) + test.That(t, err, test.ShouldBeNil) + test.That(t, pc, test.ShouldResemble, getPointCloudFromArtifact(t, i)) + + expectedTimeReq := fmt.Sprintf(testTime, i) + expectedTimeRec := fmt.Sprintf(testTime, i+1) + + actualTimeReq := serverStream.Value(contextutils.TimeRequestedMetadataKey)[0] + actualTimeRec := serverStream.Value(contextutils.TimeReceivedMetadataKey)[0] + + test.That(t, expectedTimeReq, test.ShouldEqual, actualTimeReq) + test.That(t, expectedTimeRec, test.ShouldEqual, actualTimeRec) + } + + // Confirm the end of the dataset was reached when expected + pc, err := replayCamera.NextPointCloud(ctx) + test.That(t, err, test.ShouldNotBeNil) + test.That(t, err.Error(), test.ShouldContainSubstring, errEndOfDataset.Error()) + test.That(t, pc, test.ShouldBeNil) + + err = replayCamera.Close(ctx) + test.That(t, err, test.ShouldBeNil) + + test.That(t, serverClose(), test.ShouldBeNil) +} diff --git a/components/camera/replaypcd/replaypcd_utils_test.go b/components/camera/replaypcd/replaypcd_utils_test.go index 29c0fef21f0..660899cc562 100644 --- a/components/camera/replaypcd/replaypcd_utils_test.go +++ b/components/camera/replaypcd/replaypcd_utils_test.go @@ -11,6 +11,7 @@ import ( "path/filepath" "strconv" "testing" + "time" "github.com/edaniels/golog" "github.com/pkg/errors" @@ -18,6 +19,7 @@ import ( "go.viam.com/test" "go.viam.com/utils/artifact" "go.viam.com/utils/rpc" + "google.golang.org/protobuf/types/known/timestamppb" "go.viam.com/rdk/components/camera" viamgrpc "go.viam.com/rdk/grpc" @@ -28,6 +30,8 @@ import ( "go.viam.com/rdk/testutils/inject" ) +const testTime = "2000-01-01T12:00:%02dZ" + // mockDataServiceServer is a struct that includes unimplemented versions of all the Data Service endpoints. These // can be overwritten to allow developers to trigger desired behaviors during testing. type mockDataServiceServer struct { @@ -60,9 +64,17 @@ func (mDServer *mockDataServiceServer) BinaryDataByFilter(ctx context.Context, r gz.Close() // Construct response + timeReq, err := time.Parse(time.RFC3339, fmt.Sprintf(testTime, newFileNum)) + if err != nil { + return nil, errors.Wrap(err, "failed parsing time") + } + timeRec := timeReq.Add(time.Second) binaryData := &datapb.BinaryData{ - Binary: dataBuf.Bytes(), - Metadata: &datapb.BinaryMetadata{}, + Binary: dataBuf.Bytes(), + Metadata: &datapb.BinaryMetadata{ + TimeRequested: timestamppb.New(timeReq), + TimeReceived: timestamppb.New(timeRec), + }, } resp := &datapb.BinaryDataByFilterResponse{ diff --git a/robot/client/client.go b/robot/client/client.go index 529e3aa7085..e3ed3615c0f 100644 --- a/robot/client/client.go +++ b/robot/client/client.go @@ -38,6 +38,7 @@ import ( "go.viam.com/rdk/robot/packages" "go.viam.com/rdk/session" "go.viam.com/rdk/spatialmath" + "go.viam.com/rdk/utils/contextutils" ) var ( @@ -278,6 +279,7 @@ func New(ctx context.Context, address string, logger golog.Logger, opts ...Robot // interceptors are applied in order from first to last rc.dialOptions = append( rc.dialOptions, + rpc.WithUnaryClientInterceptor(contextutils.ContextWithMetadataUnaryClientInterceptor), // error handling rpc.WithUnaryClientInterceptor(rc.handleUnaryDisconnect), rpc.WithStreamClientInterceptor(rc.handleStreamDisconnect), diff --git a/session/context_test.go b/session/context_test.go index d948bc84a1b..171dec25760 100644 --- a/session/context_test.go +++ b/session/context_test.go @@ -10,10 +10,10 @@ import ( "github.com/google/uuid" "go.viam.com/test" "google.golang.org/grpc" - "google.golang.org/grpc/metadata" "go.viam.com/rdk/resource" "go.viam.com/rdk/session" + "go.viam.com/rdk/testutils" ) func TestToFromContext(t *testing.T) { @@ -64,7 +64,7 @@ func TestSafetyMonitor(t *testing.T) { } func TestSafetyMonitorForMetadata(t *testing.T) { - stream1 := &myStream{} + stream1 := testutils.NewServerTransportStream() streamCtx := grpc.NewContextWithServerTransportStream(context.Background(), stream1) sess1 := session.New(context.Background(), "ownerID", time.Minute, nil) @@ -73,9 +73,9 @@ func TestSafetyMonitorForMetadata(t *testing.T) { name1 := resource.NewName(resource.APINamespace("foo").WithType("bar").WithSubtype("baz"), "barf") name2 := resource.NewName(resource.APINamespace("woo").WithType("war").WithSubtype("waz"), "warf") session.SafetyMonitor(nextCtx, myThing{Named: name1.AsNamed()}) - test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name1.String()}) + test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name1.String()}) session.SafetyMonitor(nextCtx, myThing{Named: name2.AsNamed()}) - test.That(t, stream1.md[session.SafetyMonitoredResourceMetadataKey], test.ShouldResemble, []string{name2.String()}) + test.That(t, stream1.Value(session.SafetyMonitoredResourceMetadataKey), test.ShouldResemble, []string{name2.String()}) } type myThing struct { @@ -83,16 +83,3 @@ type myThing struct { resource.AlwaysRebuild resource.TriviallyCloseable } - -type myStream struct { - mu sync.Mutex - grpc.ServerTransportStream - md metadata.MD -} - -func (s *myStream) SetHeader(md metadata.MD) error { - s.mu.Lock() - defer s.mu.Unlock() - s.md = md.Copy() - return nil -} diff --git a/testutils/rpc.go b/testutils/rpc.go index 1d1a6cdcefb..05915e62ddf 100644 --- a/testutils/rpc.go +++ b/testutils/rpc.go @@ -3,9 +3,11 @@ package testutils import ( "context" + "sync" "go.viam.com/utils/rpc" "google.golang.org/grpc" + "google.golang.org/grpc/metadata" ) // TrackingDialer tracks dial attempts. @@ -48,3 +50,35 @@ func (td *TrackingDialer) DialFunc( } return conn, cached, err } + +// ServerTransportStream implements grpc.ServerTransportStream and can be used to test setting +// metadata in the gRPC response header. +type ServerTransportStream struct { + mu sync.Mutex + grpc.ServerTransportStream + md metadata.MD +} + +// NewServerTransportStream creates a new ServerTransportStream. +func NewServerTransportStream() *ServerTransportStream { + return &ServerTransportStream{ + md: metadata.New(make(map[string]string)), + } +} + +// SetHeader implements grpc.ServerTransportStream. +func (s *ServerTransportStream) SetHeader(md metadata.MD) error { + s.mu.Lock() + defer s.mu.Unlock() + for k, v := range md { + s.md[k] = v + } + return nil +} + +// Value returns the value in the metadata map corresponding to a given key. +func (s *ServerTransportStream) Value(key string) []string { + s.mu.Lock() + defer s.mu.Unlock() + return s.md[key] +} diff --git a/utils/contextutils/context.go b/utils/contextutils/context.go new file mode 100644 index 00000000000..b7d8737a2a0 --- /dev/null +++ b/utils/contextutils/context.go @@ -0,0 +1,65 @@ +// Package contextutils provides utility for adding and retrieving metadata to/from a context. +package contextutils + +import ( + "context" + + "google.golang.org/grpc" + "google.golang.org/grpc/metadata" +) + +type contextKey string + +const ( + // MetadataContextKey is the key used to access metadata from a context with metadata. + MetadataContextKey = contextKey("viam-metadata") + + // TimeRequestedMetadataKey is optional metadata in the gRPC response header that correlates + // to the time right before the point cloud was captured. + TimeRequestedMetadataKey = "viam-time-requested" + + // TimeReceivedMetadataKey is optional metadata in the gRPC response header that correlates + // to the time right after the point cloud was captured. + TimeReceivedMetadataKey = "viam-time-received" +) + +// ContextWithMetadata attaches a metadata map to the context. +func ContextWithMetadata(ctx context.Context) (context.Context, map[string][]string) { + // If the context already has metadata, return that and leave the context untouched. + existingMD := ctx.Value(MetadataContextKey) + if mdMap, ok := existingMD.(map[string][]string); ok { + return ctx, mdMap + } + + // Otherwise, add a metadata map to the context. + md := make(map[string][]string) + ctx = context.WithValue(ctx, MetadataContextKey, md) + return ctx, md +} + +// ContextWithMetadataUnaryClientInterceptor attempts to read metadata from the gRPC header and +// injects the metadata into the context if the caller has passed in a context with metadata. +func ContextWithMetadataUnaryClientInterceptor( + ctx context.Context, + method string, + req, reply interface{}, + cc *grpc.ClientConn, + invoker grpc.UnaryInvoker, + opts ...grpc.CallOption, +) error { + var header metadata.MD + opts = append(opts, grpc.Header(&header)) + err := invoker(ctx, method, req, reply, cc, opts...) + if err != nil { + return err + } + + md := ctx.Value(MetadataContextKey) + if mdMap, ok := md.(map[string][]string); ok { + for key, value := range header { + mdMap[key] = value + } + } + + return nil +} diff --git a/utils/contextutils/context_test.go b/utils/contextutils/context_test.go new file mode 100644 index 00000000000..33bbc18f785 --- /dev/null +++ b/utils/contextutils/context_test.go @@ -0,0 +1,51 @@ +package contextutils + +import ( + "context" + "testing" + + "go.viam.com/test" +) + +func TestContextWithMetadata(t *testing.T) { + // nothing in context + ctx := context.Background() + mdFromContext := ctx.Value(MetadataContextKey) + test.That(t, mdFromContext, test.ShouldBeNil) + + // initialize metadata, empty at first + ctx, md := ContextWithMetadata(ctx) + test.That(t, md, test.ShouldBeEmpty) + test.That(t, ctx.Value(MetadataContextKey), test.ShouldBeEmpty) + + // add values to local metadata and show context metadata has updated + k, v := "hello", []string{"world"} + md[k] = v + mdFromContext = ctx.Value(MetadataContextKey) + mdMap, ok := mdFromContext.(map[string][]string) + test.That(t, ok, test.ShouldEqual, true) + test.That(t, mdMap[k], test.ShouldResemble, v) + + // after calling ContextWithMetadata second time, old metadata still there + ctx, md = ContextWithMetadata(ctx) + test.That(t, md[k], test.ShouldResemble, v) + + // if metadata value gets overwritten with non-metadata value, next call to + // ContextWithMetadata will add viam-metadata again, but will not be able to access old + // metadata + someString := "iamastring" + ctx = context.WithValue(ctx, MetadataContextKey, someString) + mdFromContext = ctx.Value(MetadataContextKey) + mdString, ok := mdFromContext.(string) + test.That(t, ok, test.ShouldEqual, true) + test.That(t, mdString, test.ShouldEqual, someString) + + ctx, md = ContextWithMetadata(ctx) + test.That(t, md, test.ShouldBeEmpty) + + mdFromContext = ctx.Value(MetadataContextKey) + mdMap, ok = mdFromContext.(map[string][]string) + test.That(t, ok, test.ShouldEqual, true) + test.That(t, mdMap, test.ShouldBeEmpty) + test.That(t, ctx.Value(MetadataContextKey), test.ShouldBeEmpty) +}