Skip to content

Commit

Permalink
[RSDK-9458] Remove 'Stream' from Camera interface (viamrobotics#4691)
Browse files Browse the repository at this point in the history
Co-authored-by: hexbabe <[email protected]>
Co-authored-by: sean yu <[email protected]>
Co-authored-by: nicksanford <[email protected]>
Co-authored-by: Dan Gottlieb <[email protected]>
Co-authored-by: Ethan Look-Potts <[email protected]>
Co-authored-by: abe-winter <[email protected]>
Co-authored-by: Eliot Horowitz <[email protected]>
Co-authored-by: Naomi Pentrel <[email protected]>
Co-authored-by: Maxim Pertsov <[email protected]>
Co-authored-by: nicksanford <[email protected]>
Co-authored-by: Devin Hilly <[email protected]>
Co-authored-by: martha-johnston <[email protected]>
Co-authored-by: Kurt S <[email protected]>
Co-authored-by: Vijay Vuyyuru <[email protected]>
Co-authored-by: Peter LoVerso <[email protected]>
Co-authored-by: Nicole Jung <[email protected]>
  • Loading branch information
17 people authored Jan 28, 2025
1 parent ddc16c8 commit c04cefa
Show file tree
Hide file tree
Showing 21 changed files with 167 additions and 191 deletions.
22 changes: 9 additions & 13 deletions components/camera/camera.go
Original file line number Diff line number Diff line change
Expand Up @@ -79,12 +79,6 @@ type ImageMetadata struct {
}

// A Camera is a resource that can capture frames.
type Camera interface {
resource.Resource
VideoSource
}

// VideoSource represents anything that can capture frames.
// For more information, see the [camera component docs].
//
// Image example:
Expand Down Expand Up @@ -129,7 +123,9 @@ type Camera interface {
// [Images method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#getimages
// [NextPointCloud method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#getpointcloud
// [Close method docs]: https://docs.viam.com/dev/reference/apis/components/camera/#close
type VideoSource interface {
type Camera interface {
resource.Resource

// Image returns a byte slice representing an image that tries to adhere to the MIME type hint.
// Image also may return metadata about the frame.
Image(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, ImageMetadata, error)
Expand All @@ -138,20 +134,20 @@ type VideoSource interface {
// along with associated metadata (just timestamp for now). It's not for getting a time series of images from the same imager.
Images(ctx context.Context) ([]NamedImage, resource.ResponseMetadata, error)

// Stream returns a stream that makes a best effort to return consecutive images
// that may have a MIME type hint dictated in the context via gostream.WithMIMETypeHint.
Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error)

// NextPointCloud returns the next immediately available point cloud, not necessarily one
// a part of a sequence. In the future, there could be streaming of point clouds.
NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error)

// Properties returns properties that are intrinsic to the particular
// implementation of a camera.
Properties(ctx context.Context) (Properties, error)
}

// Close shuts down the resource and prevents further use.
Close(ctx context.Context) error
// VideoSource is a camera that has `Stream` embedded to directly integrate with gostream.
// Note that generally, when writing camera components from scratch, embedding `Stream` is an anti-pattern.
type VideoSource interface {
Camera
Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error)
}

// ReadImage reads an image from the given source that is immediately available.
Expand Down
77 changes: 0 additions & 77 deletions components/camera/client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -179,7 +179,6 @@ func TestClient(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
camera1Client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
test.That(t, err, test.ShouldBeNil)

frame, err := camera.DecodeImageFromCamera(context.Background(), rutils.MimeTypeRawRGBA, nil, camera1Client)
test.That(t, err, test.ShouldBeNil)
compVal, _, err := rimage.CompareImages(img, frame)
Expand Down Expand Up @@ -246,10 +245,6 @@ func TestClient(t *testing.T) {
client2, err := resourceAPI.RPCClient(context.Background(), conn, "", camera.Named(failCameraName), logger)
test.That(t, err, test.ShouldBeNil)

_, _, err = camera.ReadImage(context.Background(), client2)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error())

_, _, err = client2.Image(context.Background(), "", nil)
test.That(t, err, test.ShouldNotBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, errGetImageFailed.Error())
Expand Down Expand Up @@ -572,78 +567,6 @@ func TestClientWithInterceptor(t *testing.T) {
test.That(t, conn.Close(), test.ShouldBeNil)
}

func TestClientStreamAfterClose(t *testing.T) {
// Set up gRPC server
logger := logging.NewTestLogger(t)
listener, err := net.Listen("tcp", "localhost:0")
test.That(t, err, test.ShouldBeNil)
rpcServer, err := rpc.NewServer(logger, rpc.WithUnauthenticated())
test.That(t, err, test.ShouldBeNil)

// Set up camera that can stream images
img := image.NewNRGBA(image.Rect(0, 0, 4, 4))
injectCamera := &inject.Camera{}
injectCamera.PropertiesFunc = func(ctx context.Context) (camera.Properties, error) {
return camera.Properties{}, nil
}
injectCamera.ImageFunc = func(ctx context.Context, mimeType string, extra map[string]interface{}) ([]byte, camera.ImageMetadata, error) {
imgBytes, err := rimage.EncodeImage(ctx, img, mimeType)
test.That(t, err, test.ShouldBeNil)
return imgBytes, camera.ImageMetadata{MimeType: mimeType}, nil
}

// Register CameraService API in our gRPC server.
resources := map[resource.Name]camera.Camera{
camera.Named(testCameraName): injectCamera,
}
cameraSvc, err := resource.NewAPIResourceCollection(camera.API, resources)
test.That(t, err, test.ShouldBeNil)
resourceAPI, ok, err := resource.LookupAPIRegistration[camera.Camera](camera.API)
test.That(t, err, test.ShouldBeNil)
test.That(t, ok, test.ShouldBeTrue)
test.That(t, resourceAPI.RegisterRPCService(context.Background(), rpcServer, cameraSvc), test.ShouldBeNil)

// Start serving requests.
go rpcServer.Serve(listener)
defer rpcServer.Stop()

// Make client connection
conn, err := viamgrpc.Dial(context.Background(), listener.Addr().String(), logger)
test.That(t, err, test.ShouldBeNil)
client, err := camera.NewClientFromConn(context.Background(), conn, "", camera.Named(testCameraName), logger)
test.That(t, err, test.ShouldBeNil)

// Get a stream
stream, err := client.Stream(context.Background())
test.That(t, stream, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeNil)

// Read from stream
media, _, err := stream.Next(context.Background())
test.That(t, media, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeNil)

// Close client and read from stream
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
media, _, err = stream.Next(context.Background())
test.That(t, media, test.ShouldBeNil)
test.That(t, err.Error(), test.ShouldContainSubstring, "context canceled")

// Get a new stream
stream, err = client.Stream(context.Background())
test.That(t, stream, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeNil)

// Read from the new stream
media, _, err = stream.Next(context.Background())
test.That(t, media, test.ShouldNotBeNil)
test.That(t, err, test.ShouldBeNil)

// Close client and connection
test.That(t, client.Close(context.Background()), test.ShouldBeNil)
test.That(t, conn.Close(), test.ShouldBeNil)
}

// See modmanager_test.go for the happy path (aka, when the
// client has a webrtc connection).
func TestRTPPassthroughWithoutWebRTC(t *testing.T) {
Expand Down
1 change: 0 additions & 1 deletion components/camera/ffmpeg/ffmpeg.go
Original file line number Diff line number Diff line change
Expand Up @@ -73,7 +73,6 @@ func init() {
}

type ffmpegCamera struct {
resource.Named
gostream.VideoReader
cancelFunc context.CancelFunc
activeBackgroundWorkers sync.WaitGroup
Expand Down
3 changes: 2 additions & 1 deletion components/camera/ffmpeg/ffmpeg_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ import (
"go.viam.com/test"
"go.viam.com/utils/artifact"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/utils"
)
Expand All @@ -20,7 +21,7 @@ func TestFFMPEGCamera(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
test.That(t, err, test.ShouldBeNil)
for i := 0; i < 5; i++ {
_, _, err = cam.Image(ctx, utils.MimeTypeJPEG, nil)
_, err = camera.DecodeImageFromCamera(ctx, utils.MimeTypeJPEG, nil, cam)
test.That(t, err, test.ShouldBeNil)
}
test.That(t, cam.Close(context.Background()), test.ShouldBeNil)
Expand Down
5 changes: 3 additions & 2 deletions components/camera/transformpipeline/classifier_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -82,7 +82,6 @@ func buildRobotWithClassifier(logger logging.Logger) (robot.Robot, error) {
return r, nil
}

//nolint:dupl
func TestClassifierSource(t *testing.T) {
logger := logging.NewTestLogger(t)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -99,7 +98,9 @@ func TestClassifierSource(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
defer classifier.Close(ctx)

resImg, _, err := camera.ReadImage(ctx, classifier)
streamClassifier, ok := classifier.(camera.VideoSource)
test.That(t, ok, test.ShouldBeTrue)
resImg, _, err := camera.ReadImage(ctx, streamClassifier)
test.That(t, err, test.ShouldBeNil)
ovImg := rimage.ConvertImage(resImg)

Expand Down
5 changes: 2 additions & 3 deletions components/camera/transformpipeline/detector_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -104,7 +104,6 @@ func buildRobotWithFakeCamera(logger logging.Logger) (robot.Robot, error) {
return robotimpl.RobotFromConfigPath(context.Background(), newConfFile, logger)
}

//nolint:dupl
func TestColorDetectionSource(t *testing.T) {
logger := logging.NewTestLogger(t)
ctx, cancel := context.WithCancel(context.Background())
Expand All @@ -121,7 +120,7 @@ func TestColorDetectionSource(t *testing.T) {
test.That(t, err, test.ShouldBeNil)
defer detector.Close(ctx)

resImg, _, err := camera.ReadImage(ctx, detector)
resImg, err := camera.DecodeImageFromCamera(ctx, rutils.MimeTypePNG, nil, detector)
test.That(t, err, test.ShouldBeNil)
ovImg := rimage.ConvertImage(resImg)
test.That(t, ovImg.GetXY(852, 431), test.ShouldResemble, rimage.Red)
Expand All @@ -146,7 +145,7 @@ func BenchmarkColorDetectionSource(b *testing.B) {
b.ResetTimer()
// begin benchmarking
for i := 0; i < b.N; i++ {
_, _, _ = camera.ReadImage(ctx, detector)
_, _ = camera.DecodeImageFromCamera(ctx, rutils.MimeTypeJPEG, nil, detector)
}
test.That(b, detector.Close(context.Background()), test.ShouldBeNil)
}
6 changes: 4 additions & 2 deletions components/camera/transformpipeline/mods_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -608,7 +608,8 @@ func BenchmarkColorRotate(b *testing.B) {
am := utils.AttributeMap{
"angle_degs": 180,
}
rs, stream, err := newRotateTransform(context.Background(), src, camera.ColorStream, am)
vs := videoSourceFromCamera(context.Background(), src)
rs, stream, err := newRotateTransform(context.Background(), vs, camera.ColorStream, am)
test.That(b, err, test.ShouldBeNil)
test.That(b, stream, test.ShouldEqual, camera.ColorStream)

Expand All @@ -633,7 +634,8 @@ func BenchmarkDepthRotate(b *testing.B) {
am := utils.AttributeMap{
"angle_degs": 180,
}
rs, stream, err := newRotateTransform(context.Background(), src, camera.DepthStream, am)
vs := videoSourceFromCamera(context.Background(), src)
rs, stream, err := newRotateTransform(context.Background(), vs, camera.DepthStream, am)
test.That(b, err, test.ShouldBeNil)
test.That(b, stream, test.ShouldEqual, camera.DepthStream)

Expand Down
57 changes: 45 additions & 12 deletions components/camera/transformpipeline/pipeline.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,12 +12,14 @@ import (
"go.opencensus.io/trace"

"go.viam.com/rdk/components/camera"
"go.viam.com/rdk/gostream"
"go.viam.com/rdk/logging"
"go.viam.com/rdk/pointcloud"
"go.viam.com/rdk/resource"
"go.viam.com/rdk/rimage"
"go.viam.com/rdk/rimage/transform"
"go.viam.com/rdk/robot"
camerautils "go.viam.com/rdk/robot/web/stream/camera"
"go.viam.com/rdk/utils"
)

Expand Down Expand Up @@ -47,11 +49,12 @@ func init() {
if err != nil {
return nil, fmt.Errorf("no source camera for transform pipeline (%s): %w", sourceName, err)
}
src, err := newTransformPipeline(ctx, source, newConf, actualR, logger)
vs := videoSourceFromCamera(ctx, source)
src, err := newTransformPipeline(ctx, vs, conf.ResourceName().AsNamed(), newConf, actualR, logger)
if err != nil {
return nil, err
}
return camera.FromVideoSource(conf.ResourceName(), src, logger), nil
return src, nil
},
})
}
Expand Down Expand Up @@ -84,9 +87,36 @@ func (cfg *transformConfig) Validate(path string) ([]string, error) {
return deps, nil
}

type videoSource struct {
camera.Camera
vs gostream.VideoSource
}

func (sc *videoSource) Stream(ctx context.Context, errHandlers ...gostream.ErrorHandler) (gostream.VideoStream, error) {
if sc.vs != nil {
return sc.vs.Stream(ctx, errHandlers...)
}
return sc.Stream(ctx, errHandlers...)
}

// videoSourceFromCamera is a hack to allow us to use Stream to pipe frames through the pipeline
// and still implement a camera resource.
// We prefer this methodology over passing Image bytes because each transform desires a image.Image over
// a raw byte slice. To use Image would be to wastefully encode and decode the frame multiple times.
func videoSourceFromCamera(ctx context.Context, cam camera.Camera) camera.VideoSource {
if streamCam, ok := cam.(camera.VideoSource); ok {
return streamCam
}
return &videoSource{
Camera: cam,
vs: camerautils.VideoSourceFromCamera(ctx, cam),
}
}

func newTransformPipeline(
ctx context.Context,
source camera.VideoSource,
named resource.Named,
cfg *transformConfig,
r robot.Robot,
logger logging.Logger,
Expand All @@ -98,7 +128,7 @@ func newTransformPipeline(
return nil, errors.New("pipeline has no transforms in it")
}
// check if the source produces a depth image or color image
img, release, err := camera.ReadImage(ctx, source)
img, err := camera.DecodeImageFromCamera(ctx, "", nil, source)

var streamType camera.ImageType
if err != nil {
Expand All @@ -110,41 +140,44 @@ func newTransformPipeline(
} else {
streamType = camera.ColorStream
}
if release != nil {
release()
}
// loop through the pipeline and create the image flow
pipeline := make([]camera.VideoSource, 0, len(cfg.Pipeline))
lastSource := source
lastSource := videoSourceFromCamera(ctx, source)
for _, tr := range cfg.Pipeline {
src, newStreamType, err := buildTransform(ctx, r, lastSource, streamType, tr)
if err != nil {
return nil, err
}
pipeline = append(pipeline, src)
lastSource = src
streamSrc := videoSourceFromCamera(ctx, src)
pipeline = append(pipeline, streamSrc)
lastSource = streamSrc
streamType = newStreamType
}
cameraModel := camera.NewPinholeModelWithBrownConradyDistortion(cfg.CameraParameters, cfg.DistortionParameters)
return camera.NewVideoSourceFromReader(
ctx,
transformPipeline{pipeline, lastSource, cfg.CameraParameters, logger},
transformPipeline{named, pipeline, lastSource, cfg.CameraParameters, logger},
&cameraModel,
streamType,
)
}

type transformPipeline struct {
resource.Named
pipeline []camera.VideoSource
src camera.VideoSource
src camera.Camera
intrinsicParameters *transform.PinholeCameraIntrinsics
logger logging.Logger
}

func (tp transformPipeline) Read(ctx context.Context) (image.Image, func(), error) {
ctx, span := trace.StartSpan(ctx, "camera::transformpipeline::Read")
defer span.End()
return camera.ReadImage(ctx, tp.src)
img, err := camera.DecodeImageFromCamera(ctx, "", nil, tp.src)
if err != nil {
return nil, func() {}, err
}
return img, func() {}, nil
}

func (tp transformPipeline) NextPointCloud(ctx context.Context) (pointcloud.PointCloud, error) {
Expand Down
Loading

0 comments on commit c04cefa

Please sign in to comment.