Skip to content

Commit

Permalink
Add tracing API to fs ops
Browse files Browse the repository at this point in the history
  • Loading branch information
kislaykishore committed Aug 21, 2024
1 parent 574b76c commit 351efd6
Show file tree
Hide file tree
Showing 4 changed files with 235 additions and 219 deletions.
5 changes: 3 additions & 2 deletions go.mod
Original file line number Diff line number Diff line change
Expand Up @@ -27,6 +27,9 @@ require (
github.com/stretchr/testify v1.9.0
github.com/urfave/cli v1.22.15
go.opencensus.io v0.24.0
go.opentelemetry.io/otel v1.28.0
go.opentelemetry.io/otel/sdk v1.28.0
go.opentelemetry.io/otel/trace v1.28.0
golang.org/x/net v0.28.0
golang.org/x/oauth2 v0.22.0
golang.org/x/sync v0.8.0
Expand Down Expand Up @@ -98,9 +101,7 @@ require (
github.com/subosito/gotenv v1.6.0 // indirect
go.opentelemetry.io/contrib/instrumentation/google.golang.org/grpc/otelgrpc v0.53.0 // indirect
go.opentelemetry.io/contrib/instrumentation/net/http/otelhttp v0.53.0 // indirect
go.opentelemetry.io/otel v1.28.0 // indirect
go.opentelemetry.io/otel/metric v1.28.0 // indirect
go.opentelemetry.io/otel/trace v1.28.0 // indirect
go.uber.org/multierr v1.11.0 // indirect
golang.org/x/crypto v0.26.0 // indirect
golang.org/x/exp v0.0.0-20240530194437-404ba88c7ed0 // indirect
Expand Down
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1242,8 +1242,8 @@ go.opentelemetry.io/otel/oteltest v0.20.0/go.mod h1:L7bgKf9ZB7qCwT9Up7i9/pn0PWIa
go.opentelemetry.io/otel/sdk v0.20.0/go.mod h1:g/IcepuwNsoiX5Byy2nNV0ySUF1em498m7hBWC279Yc=
go.opentelemetry.io/otel/sdk v1.3.0/go.mod h1:rIo4suHNhQwBIPg9axF8V9CA72Wz2mKF1teNrup8yzs=
go.opentelemetry.io/otel/sdk v1.6.1/go.mod h1:IVYrddmFZ+eJqu2k38qD3WezFR2pymCzm8tdxyh3R4E=
go.opentelemetry.io/otel/sdk v1.24.0 h1:YMPPDNymmQN3ZgczicBY3B6sf9n62Dlj9pWD3ucgoDw=
go.opentelemetry.io/otel/sdk v1.24.0/go.mod h1:KVrIYw6tEubO9E96HQpcmpTKDVn9gdv35HoYiQWGDFg=
go.opentelemetry.io/otel/sdk v1.28.0 h1:b9d7hIry8yZsgtbmM0DKyPWMMUMlK9NEKuIG4aBqWyE=
go.opentelemetry.io/otel/sdk v1.28.0/go.mod h1:oYj7ClPUA7Iw3m+r7GeEjz0qckQRJK2B8zjcZEfu7Pg=
go.opentelemetry.io/otel/sdk/export/metric v0.20.0/go.mod h1:h7RBNMsDJ5pmI1zExLi+bJK+Dr8NQCh0qGhm1KDnNlE=
go.opentelemetry.io/otel/sdk/metric v0.20.0/go.mod h1:knxiS8Xd4E/N+ZqKmUPf3gTTZ4/0TjTXukfxjzSTpHE=
go.opentelemetry.io/otel/trace v0.20.0/go.mod h1:6GjCW8zgDjwGHGa6GkyeB8+/5vjT16gUEi0Nf1iBdgw=
Expand Down
282 changes: 81 additions & 201 deletions internal/fs/wrappers/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,12 +29,20 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

// TODO: name is subject to change.
const name = "cloud.google.com/gcsfuse"

var (
opsCount = stats.Int64("fs/ops_count", "The number of ops processed by the file system.", stats.UnitDimensionless)
opsLatency = stats.Float64("fs/ops_latency", "The latency of a file system operation.", stats.UnitMilliseconds)
opsErrorCount = stats.Int64("fs/ops_error_count", "The number of errors generated by file system operation.", stats.UnitDimensionless)

tracer = otel.Tracer(name)
)

// Initialize the metrics.
Expand Down Expand Up @@ -313,263 +321,135 @@ func (fs *monitoring) Destroy() {
fs.wrapped.Destroy()
}

func (fs *monitoring) StatFS(
ctx context.Context,
op *fuseops.StatFSOp) error {
type wrappedCall func() error

func invokeWrapped(ctx context.Context, opName string, w wrappedCall) error {
ctx, span := tracer.Start(ctx, opName, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
startTime := time.Now()
err := fs.wrapped.StatFS(ctx, op)
recordOp(ctx, "StatFS", startTime, err)
err := w()

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

recordOp(ctx, opName, startTime, err)
return err
}

func (fs *monitoring) LookUpInode(
ctx context.Context,
op *fuseops.LookUpInodeOp) error {
startTime := time.Now()
err := fs.wrapped.LookUpInode(ctx, op)
recordOp(ctx, "LookUpInode", startTime, err)
return err
func (fs *monitoring) StatFS(ctx context.Context, op *fuseops.StatFSOp) error {
return invokeWrapped(ctx, "StatFS", func() error { return fs.wrapped.StatFS(ctx, op) })
}

func (fs *monitoring) GetInodeAttributes(
ctx context.Context,
op *fuseops.GetInodeAttributesOp) error {
startTime := time.Now()
err := fs.wrapped.GetInodeAttributes(ctx, op)
recordOp(ctx, "GetInodeAttributes", startTime, err)
return err
func (fs *monitoring) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error {
return invokeWrapped(ctx, "LookUpInode", func() error { return fs.wrapped.LookUpInode(ctx, op) })
}

func (fs *monitoring) SetInodeAttributes(
ctx context.Context,
op *fuseops.SetInodeAttributesOp) error {
startTime := time.Now()
err := fs.wrapped.SetInodeAttributes(ctx, op)
recordOp(ctx, "SetInodeAttributes", startTime, err)
return err
func (fs *monitoring) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error {
return invokeWrapped(ctx, "GetInodeAttributes", func() error { return fs.wrapped.GetInodeAttributes(ctx, op) })
}

func (fs *monitoring) ForgetInode(
ctx context.Context,
op *fuseops.ForgetInodeOp) error {
startTime := time.Now()
err := fs.wrapped.ForgetInode(ctx, op)
recordOp(ctx, "ForgetInode", startTime, err)
return err
func (fs *monitoring) SetInodeAttributes(ctx context.Context, op *fuseops.SetInodeAttributesOp) error {
return invokeWrapped(ctx, "SetInodeAttributes", func() error { return fs.wrapped.SetInodeAttributes(ctx, op) })
}

func (fs *monitoring) BatchForget(
ctx context.Context,
op *fuseops.BatchForgetOp) error {
startTime := time.Now()
err := fs.wrapped.BatchForget(ctx, op)
recordOp(ctx, "BatchForget", startTime, err)
return err
func (fs *monitoring) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error {
return invokeWrapped(ctx, "ForgetInode", func() error { return fs.wrapped.ForgetInode(ctx, op) })
}

func (fs *monitoring) MkDir(
ctx context.Context,
op *fuseops.MkDirOp) error {
startTime := time.Now()
err := fs.wrapped.MkDir(ctx, op)
recordOp(ctx, "MkDir", startTime, err)
return err
func (fs *monitoring) BatchForget(ctx context.Context, op *fuseops.BatchForgetOp) error {
return invokeWrapped(ctx, "BatchForget", func() error { return fs.wrapped.BatchForget(ctx, op) })
}

func (fs *monitoring) MkNode(
ctx context.Context,
op *fuseops.MkNodeOp) error {
startTime := time.Now()
err := fs.wrapped.MkNode(ctx, op)
recordOp(ctx, "MkNode", startTime, err)
return err
func (fs *monitoring) MkDir(ctx context.Context, op *fuseops.MkDirOp) error {
return invokeWrapped(ctx, "MkDir", func() error { return fs.wrapped.MkDir(ctx, op) })
}

func (fs *monitoring) CreateFile(
ctx context.Context,
op *fuseops.CreateFileOp) error {
startTime := time.Now()
err := fs.wrapped.CreateFile(ctx, op)
recordOp(ctx, "CreateFile", startTime, err)
return err
func (fs *monitoring) MkNode(ctx context.Context, op *fuseops.MkNodeOp) error {
return invokeWrapped(ctx, "MkNode", func() error { return fs.wrapped.MkNode(ctx, op) })
}

func (fs *monitoring) CreateLink(
ctx context.Context,
op *fuseops.CreateLinkOp) error {
startTime := time.Now()
err := fs.wrapped.CreateLink(ctx, op)
recordOp(ctx, "CreateLink", startTime, err)
return err
func (fs *monitoring) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) error {
return invokeWrapped(ctx, "CreateFile", func() error { return fs.wrapped.CreateFile(ctx, op) })
}

func (fs *monitoring) CreateSymlink(
ctx context.Context,
op *fuseops.CreateSymlinkOp) error {
startTime := time.Now()
err := fs.wrapped.CreateSymlink(ctx, op)
recordOp(ctx, "CreateSymlink", startTime, err)
return err
func (fs *monitoring) CreateLink(ctx context.Context, op *fuseops.CreateLinkOp) error {
return invokeWrapped(ctx, "CreateLink", func() error { return fs.wrapped.CreateLink(ctx, op) })
}

func (fs *monitoring) Rename(
ctx context.Context,
op *fuseops.RenameOp) error {
startTime := time.Now()
err := fs.wrapped.Rename(ctx, op)
recordOp(ctx, "Rename", startTime, err)
return err
func (fs *monitoring) CreateSymlink(ctx context.Context, op *fuseops.CreateSymlinkOp) error {
return invokeWrapped(ctx, "CreateSymlink", func() error { return fs.wrapped.CreateSymlink(ctx, op) })
}

func (fs *monitoring) RmDir(
ctx context.Context,
op *fuseops.RmDirOp) error {
startTime := time.Now()
err := fs.wrapped.RmDir(ctx, op)
recordOp(ctx, "RmDir", startTime, err)
return err
func (fs *monitoring) Rename(ctx context.Context, op *fuseops.RenameOp) error {
return invokeWrapped(ctx, "Rename", func() error { return fs.wrapped.Rename(ctx, op) })
}

func (fs *monitoring) Unlink(
ctx context.Context,
op *fuseops.UnlinkOp) error {
startTime := time.Now()
err := fs.wrapped.Unlink(ctx, op)
recordOp(ctx, "Unlink", startTime, err)
return err
func (fs *monitoring) RmDir(ctx context.Context, op *fuseops.RmDirOp) error {
return invokeWrapped(ctx, "RmDir", func() error { return fs.wrapped.RmDir(ctx, op) })
}

func (fs *monitoring) OpenDir(
ctx context.Context,
op *fuseops.OpenDirOp) error {
startTime := time.Now()
err := fs.wrapped.OpenDir(ctx, op)
recordOp(ctx, "OpenDir", startTime, err)
return err
func (fs *monitoring) Unlink(ctx context.Context, op *fuseops.UnlinkOp) error {
return invokeWrapped(ctx, "Unlink", func() error { return fs.wrapped.Unlink(ctx, op) })
}

func (fs *monitoring) ReadDir(
ctx context.Context,
op *fuseops.ReadDirOp) error {
startTime := time.Now()
err := fs.wrapped.ReadDir(ctx, op)
recordOp(ctx, "ReadDir", startTime, err)
return err
func (fs *monitoring) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error {
return invokeWrapped(ctx, "OpenDir", func() error { return fs.wrapped.OpenDir(ctx, op) })
}

func (fs *monitoring) ReleaseDirHandle(
ctx context.Context,
op *fuseops.ReleaseDirHandleOp) error {
startTime := time.Now()
err := fs.wrapped.ReleaseDirHandle(ctx, op)
recordOp(ctx, "ReleaseDirHandle", startTime, err)
return err
func (fs *monitoring) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error {
return invokeWrapped(ctx, "ReadDir", func() error { return fs.wrapped.ReadDir(ctx, op) })
}

func (fs *monitoring) OpenFile(
ctx context.Context,
op *fuseops.OpenFileOp) error {
startTime := time.Now()
err := fs.wrapped.OpenFile(ctx, op)
recordOp(ctx, "OpenFile", startTime, err)
return err
func (fs *monitoring) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error {
return invokeWrapped(ctx, "ReleaseDirHandle", func() error { return fs.wrapped.ReleaseDirHandle(ctx, op) })
}

func (fs *monitoring) ReadFile(
ctx context.Context,
op *fuseops.ReadFileOp) error {
startTime := time.Now()
err := fs.wrapped.ReadFile(ctx, op)
recordOp(ctx, "ReadFile", startTime, err)
return err
func (fs *monitoring) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error {
return invokeWrapped(ctx, "OpenFile", func() error { return fs.wrapped.OpenFile(ctx, op) })
}

func (fs *monitoring) WriteFile(
ctx context.Context,
op *fuseops.WriteFileOp) error {
startTime := time.Now()
err := fs.wrapped.WriteFile(ctx, op)
recordOp(ctx, "WriteFile", startTime, err)
return err
func (fs *monitoring) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error {
return invokeWrapped(ctx, "ReadFile", func() error { return fs.wrapped.ReadFile(ctx, op) })
}

func (fs *monitoring) SyncFile(
ctx context.Context,
op *fuseops.SyncFileOp) error {
startTime := time.Now()
err := fs.wrapped.SyncFile(ctx, op)
recordOp(ctx, "SyncFile", startTime, err)
return err
func (fs *monitoring) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) error {
return invokeWrapped(ctx, "WriteFile", func() error { return fs.wrapped.WriteFile(ctx, op) })
}

func (fs *monitoring) FlushFile(
ctx context.Context,
op *fuseops.FlushFileOp) error {
startTime := time.Now()
err := fs.wrapped.FlushFile(ctx, op)
recordOp(ctx, "FlushFile", startTime, err)
return err
func (fs *monitoring) SyncFile(ctx context.Context, op *fuseops.SyncFileOp) error {
return invokeWrapped(ctx, "SyncFile", func() error { return fs.wrapped.SyncFile(ctx, op) })
}

func (fs *monitoring) ReleaseFileHandle(
ctx context.Context,
op *fuseops.ReleaseFileHandleOp) error {
startTime := time.Now()
err := fs.wrapped.ReleaseFileHandle(ctx, op)
recordOp(ctx, "ReleaseFileHandle", startTime, err)
return err
func (fs *monitoring) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error {
return invokeWrapped(ctx, "FlushFile", func() error { return fs.wrapped.FlushFile(ctx, op) })
}

func (fs *monitoring) ReadSymlink(
ctx context.Context,
op *fuseops.ReadSymlinkOp) error {
startTime := time.Now()
err := fs.wrapped.ReadSymlink(ctx, op)
recordOp(ctx, "ReadSymlink", startTime, err)
return err
func (fs *monitoring) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error {
return invokeWrapped(ctx, "ReleaseFileHandle", func() error { return fs.wrapped.ReleaseFileHandle(ctx, op) })
}

func (fs *monitoring) RemoveXattr(
ctx context.Context,
op *fuseops.RemoveXattrOp) error {
startTime := time.Now()
err := fs.wrapped.RemoveXattr(ctx, op)
recordOp(ctx, "RemoveXattr", startTime, err)
return err
func (fs *monitoring) ReadSymlink(ctx context.Context, op *fuseops.ReadSymlinkOp) error {
return invokeWrapped(ctx, "ReadSymlink", func() error { return fs.wrapped.ReadSymlink(ctx, op) })
}

func (fs *monitoring) GetXattr(
ctx context.Context,
op *fuseops.GetXattrOp) error {
startTime := time.Now()
err := fs.wrapped.GetXattr(ctx, op)
recordOp(ctx, "GetXattr", startTime, err)
return err
func (fs *monitoring) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) error {
return invokeWrapped(ctx, "RemoveXattr", func() error { return fs.wrapped.RemoveXattr(ctx, op) })
}

func (fs *monitoring) ListXattr(
ctx context.Context,
op *fuseops.ListXattrOp) error {
startTime := time.Now()
err := fs.wrapped.ListXattr(ctx, op)
recordOp(ctx, "ListXattr", startTime, err)
return err
func (fs *monitoring) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error {
return invokeWrapped(ctx, "GetXattr", func() error { return fs.wrapped.GetXattr(ctx, op) })
}

func (fs *monitoring) SetXattr(
ctx context.Context,
op *fuseops.SetXattrOp) error {
startTime := time.Now()
err := fs.wrapped.SetXattr(ctx, op)
recordOp(ctx, "SetXattr", startTime, err)
return err
func (fs *monitoring) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error {
return invokeWrapped(ctx, "ListXattr", func() error { return fs.wrapped.ListXattr(ctx, op) })
}

func (fs *monitoring) Fallocate(
ctx context.Context,
op *fuseops.FallocateOp) error {
startTime := time.Now()
err := fs.wrapped.Fallocate(ctx, op)
recordOp(ctx, "Fallocate", startTime, err)
return err
func (fs *monitoring) SetXattr(ctx context.Context, op *fuseops.SetXattrOp) error {
return invokeWrapped(ctx, "SetXattr", func() error { return fs.wrapped.SetXattr(ctx, op) })
}

func (fs *monitoring) Fallocate(ctx context.Context, op *fuseops.FallocateOp) error {
return invokeWrapped(ctx, "Fallocate", func() error { return fs.wrapped.Fallocate(ctx, op) })
}
Loading

0 comments on commit 351efd6

Please sign in to comment.