Skip to content

Commit

Permalink
Instrument fs ops only when tracing enabled. (#2542)
Browse files Browse the repository at this point in the history
* Instrument fs ops only when tracing enabled.

While the calls to tracing APIs are cheap,
we can reduce the performance impact by
conditionally instrumenting code only if tracing is enabled.
  • Loading branch information
kislaykishore authored Sep 27, 2024
1 parent 6bb4e4c commit a4b789d
Show file tree
Hide file tree
Showing 6 changed files with 205 additions and 15 deletions.
5 changes: 5 additions & 0 deletions cfg/config_util.go
Original file line number Diff line number Diff line change
Expand Up @@ -34,6 +34,11 @@ func IsParallelDownloadsEnabled(mountConfig *Config) bool {
return IsFileCacheEnabled(mountConfig) && mountConfig.FileCache.EnableParallelDownloads
}

// IsTracingEnabled returns true if tracing is enabled.
func IsTracingEnabled(mountConfig *Config) bool {
return mountConfig.Monitoring.ExperimentalTracingMode != ""
}

// ListCacheTTLSecsToDuration converts TTL in seconds to time.Duration.
func ListCacheTTLSecsToDuration(secs int64) time.Duration {
err := isTTLInSecsValid(secs)
Expand Down
23 changes: 23 additions & 0 deletions cfg/config_util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -180,3 +180,26 @@ func TestIsNegativeNumber(t *testing.T) {
})
}
}

func TestIsTracingEnabled(t *testing.T) {
t.Parallel()
var testCases = []struct {
testName string
traceMode string
expected bool
}{
{"empty", "", false},
{"not_empty", "gcptrace", true},
}

for _, tc := range testCases {
tc := tc
t.Run(tc.testName, func(t *testing.T) {
t.Parallel()

assert.Equal(t, tc.expected, IsTracingEnabled(&Config{Monitoring: MonitoringConfig{
ExperimentalTracingMode: tc.traceMode,
}}))
})
}
}
4 changes: 4 additions & 0 deletions internal/fs/server.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,6 +17,7 @@ package fs
import (
"fmt"

newcfg "github.com/googlecloudplatform/gcsfuse/v2/cfg"
"github.com/googlecloudplatform/gcsfuse/v2/internal/fs/wrappers"
"github.com/jacobsa/fuse"
"github.com/jacobsa/fuse/fuseutil"
Expand All @@ -31,6 +32,9 @@ func NewServer(ctx context.Context, cfg *ServerConfig) (fuse.Server, error) {
}

fs = wrappers.WithErrorMapping(fs)
if newcfg.IsTracingEnabled(cfg.NewConfig) {
fs = wrappers.WithTracing(fs)
}
fs = wrappers.WithMonitoring(fs)
return fuseutil.NewFileSystemServer(fs), nil
}
14 changes: 0 additions & 14 deletions internal/fs/wrappers/monitoring.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,9 +30,6 @@ import (
"go.opencensus.io/stats"
"go.opencensus.io/stats/view"
"go.opencensus.io/tag"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

const name = "cloud.google.com/gcsfuse"
Expand Down Expand Up @@ -318,13 +315,11 @@ func recordOp(ctx context.Context, method string, start time.Time, fsErr error)
func WithMonitoring(fs fuseutil.FileSystem) fuseutil.FileSystem {
return &monitoring{
wrapped: fs,
tracer: otel.Tracer(name),
}
}

type monitoring struct {
wrapped fuseutil.FileSystem
tracer trace.Tracer
}

func (fs *monitoring) Destroy() {
Expand All @@ -334,17 +329,8 @@ func (fs *monitoring) Destroy() {
type wrappedCall func(ctx context.Context) error

func (fs *monitoring) invokeWrapped(ctx context.Context, opName string, w wrappedCall) error {
// Span's SpanKid is set to trace.SpanKindServer since GCSFuse is like a server for the requests that the Kernel sends.
ctx, span := fs.tracer.Start(ctx, opName, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
startTime := time.Now()
err := w(ctx)

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

recordOp(ctx, opName, startTime, err)
return err
}
Expand Down
172 changes: 172 additions & 0 deletions internal/fs/wrappers/tracing.go
Original file line number Diff line number Diff line change
@@ -0,0 +1,172 @@
// Copyright 2024 Google LLC
//
// Licensed under the Apache License, Version 2.0 (the "License");
// you may not use this file except in compliance with the License.
// You may obtain a copy of the License at
//
// http://www.apache.org/licenses/LICENSE-2.0
//
// Unless required by applicable law or agreed to in writing, software
// distributed under the License is distributed on an "AS IS" BASIS,
// WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
// See the License for the specific language governing permissions and
// limitations under the License.

package wrappers

import (
"context"

"github.com/jacobsa/fuse/fuseops"
"github.com/jacobsa/fuse/fuseutil"
"go.opentelemetry.io/otel"
"go.opentelemetry.io/otel/codes"
"go.opentelemetry.io/otel/trace"
)

type tracing struct {
wrapped fuseutil.FileSystem
tracer trace.Tracer
}

// WithTracing wraps a FileSystem and creates a root trace.
func WithTracing(wrapped fuseutil.FileSystem) fuseutil.FileSystem {
return &tracing{
wrapped: wrapped,
tracer: otel.Tracer(name),
}
}

func (fs *tracing) Destroy() {
fs.wrapped.Destroy()
}

func (fs *tracing) invokeWrapped(ctx context.Context, opName string, w wrappedCall) error {
// Span's SpanKind is set to trace.SpanKindServer since GCSFuse is like a server for the requests that the Kernel sends.
ctx, span := fs.tracer.Start(ctx, opName, trace.WithSpanKind(trace.SpanKindServer))
defer span.End()
err := w(ctx)

if err != nil {
span.RecordError(err)
span.SetStatus(codes.Error, err.Error())
}

return err
}

func (fs *tracing) StatFS(ctx context.Context, op *fuseops.StatFSOp) error {
return fs.invokeWrapped(ctx, "StatFS", func(ctx context.Context) error { return fs.wrapped.StatFS(ctx, op) })
}

func (fs *tracing) LookUpInode(ctx context.Context, op *fuseops.LookUpInodeOp) error {
return fs.invokeWrapped(ctx, "LookUpInode", func(ctx context.Context) error { return fs.wrapped.LookUpInode(ctx, op) })
}

func (fs *tracing) GetInodeAttributes(ctx context.Context, op *fuseops.GetInodeAttributesOp) error {
return fs.invokeWrapped(ctx, "GetInodeAttributes", func(ctx context.Context) error { return fs.wrapped.GetInodeAttributes(ctx, op) })
}

func (fs *tracing) SetInodeAttributes(ctx context.Context, op *fuseops.SetInodeAttributesOp) error {
return fs.invokeWrapped(ctx, "SetInodeAttributes", func(ctx context.Context) error { return fs.wrapped.SetInodeAttributes(ctx, op) })
}

func (fs *tracing) ForgetInode(ctx context.Context, op *fuseops.ForgetInodeOp) error {
return fs.invokeWrapped(ctx, "ForgetInode", func(ctx context.Context) error { return fs.wrapped.ForgetInode(ctx, op) })
}

func (fs *tracing) BatchForget(ctx context.Context, op *fuseops.BatchForgetOp) error {
return fs.invokeWrapped(ctx, "BatchForget", func(ctx context.Context) error { return fs.wrapped.BatchForget(ctx, op) })
}

func (fs *tracing) MkDir(ctx context.Context, op *fuseops.MkDirOp) error {
return fs.invokeWrapped(ctx, "MkDir", func(ctx context.Context) error { return fs.wrapped.MkDir(ctx, op) })
}

func (fs *tracing) MkNode(ctx context.Context, op *fuseops.MkNodeOp) error {
return fs.invokeWrapped(ctx, "MkNode", func(ctx context.Context) error { return fs.wrapped.MkNode(ctx, op) })
}

func (fs *tracing) CreateFile(ctx context.Context, op *fuseops.CreateFileOp) error {
return fs.invokeWrapped(ctx, "CreateFile", func(ctx context.Context) error { return fs.wrapped.CreateFile(ctx, op) })
}

func (fs *tracing) CreateLink(ctx context.Context, op *fuseops.CreateLinkOp) error {
return fs.invokeWrapped(ctx, "CreateLink", func(ctx context.Context) error { return fs.wrapped.CreateLink(ctx, op) })
}

func (fs *tracing) CreateSymlink(ctx context.Context, op *fuseops.CreateSymlinkOp) error {
return fs.invokeWrapped(ctx, "CreateSymlink", func(ctx context.Context) error { return fs.wrapped.CreateSymlink(ctx, op) })
}

func (fs *tracing) Rename(ctx context.Context, op *fuseops.RenameOp) error {
return fs.invokeWrapped(ctx, "Rename", func(ctx context.Context) error { return fs.wrapped.Rename(ctx, op) })
}

func (fs *tracing) RmDir(ctx context.Context, op *fuseops.RmDirOp) error {
return fs.invokeWrapped(ctx, "RmDir", func(ctx context.Context) error { return fs.wrapped.RmDir(ctx, op) })
}

func (fs *tracing) Unlink(ctx context.Context, op *fuseops.UnlinkOp) error {
return fs.invokeWrapped(ctx, "Unlink", func(ctx context.Context) error { return fs.wrapped.Unlink(ctx, op) })
}

func (fs *tracing) OpenDir(ctx context.Context, op *fuseops.OpenDirOp) error {
return fs.invokeWrapped(ctx, "OpenDir", func(ctx context.Context) error { return fs.wrapped.OpenDir(ctx, op) })
}

func (fs *tracing) ReadDir(ctx context.Context, op *fuseops.ReadDirOp) error {
return fs.invokeWrapped(ctx, "ReadDir", func(ctx context.Context) error { return fs.wrapped.ReadDir(ctx, op) })
}

func (fs *tracing) ReleaseDirHandle(ctx context.Context, op *fuseops.ReleaseDirHandleOp) error {
return fs.invokeWrapped(ctx, "ReleaseDirHandle", func(ctx context.Context) error { return fs.wrapped.ReleaseDirHandle(ctx, op) })
}

func (fs *tracing) OpenFile(ctx context.Context, op *fuseops.OpenFileOp) error {
return fs.invokeWrapped(ctx, "OpenFile", func(ctx context.Context) error { return fs.wrapped.OpenFile(ctx, op) })
}

func (fs *tracing) ReadFile(ctx context.Context, op *fuseops.ReadFileOp) error {
return fs.invokeWrapped(ctx, "ReadFile", func(ctx context.Context) error { return fs.wrapped.ReadFile(ctx, op) })
}

func (fs *tracing) WriteFile(ctx context.Context, op *fuseops.WriteFileOp) error {
return fs.invokeWrapped(ctx, "WriteFile", func(ctx context.Context) error { return fs.wrapped.WriteFile(ctx, op) })
}

func (fs *tracing) SyncFile(ctx context.Context, op *fuseops.SyncFileOp) error {
return fs.invokeWrapped(ctx, "SyncFile", func(ctx context.Context) error { return fs.wrapped.SyncFile(ctx, op) })
}

func (fs *tracing) FlushFile(ctx context.Context, op *fuseops.FlushFileOp) error {
return fs.invokeWrapped(ctx, "FlushFile", func(ctx context.Context) error { return fs.wrapped.FlushFile(ctx, op) })
}

func (fs *tracing) ReleaseFileHandle(ctx context.Context, op *fuseops.ReleaseFileHandleOp) error {
return fs.invokeWrapped(ctx, "ReleaseFileHandle", func(ctx context.Context) error { return fs.wrapped.ReleaseFileHandle(ctx, op) })
}

func (fs *tracing) ReadSymlink(ctx context.Context, op *fuseops.ReadSymlinkOp) error {
return fs.invokeWrapped(ctx, "ReadSymlink", func(ctx context.Context) error { return fs.wrapped.ReadSymlink(ctx, op) })
}

func (fs *tracing) RemoveXattr(ctx context.Context, op *fuseops.RemoveXattrOp) error {
return fs.invokeWrapped(ctx, "RemoveXattr", func(ctx context.Context) error { return fs.wrapped.RemoveXattr(ctx, op) })
}

func (fs *tracing) GetXattr(ctx context.Context, op *fuseops.GetXattrOp) error {
return fs.invokeWrapped(ctx, "GetXattr", func(ctx context.Context) error { return fs.wrapped.GetXattr(ctx, op) })
}

func (fs *tracing) ListXattr(ctx context.Context, op *fuseops.ListXattrOp) error {
return fs.invokeWrapped(ctx, "ListXattr", func(ctx context.Context) error { return fs.wrapped.ListXattr(ctx, op) })
}

func (fs *tracing) SetXattr(ctx context.Context, op *fuseops.SetXattrOp) error {
return fs.invokeWrapped(ctx, "SetXattr", func(ctx context.Context) error { return fs.wrapped.SetXattr(ctx, op) })
}

func (fs *tracing) Fallocate(ctx context.Context, op *fuseops.FallocateOp) error {
return fs.invokeWrapped(ctx, "Fallocate", func(ctx context.Context) error { return fs.wrapped.Fallocate(ctx, op) })
}
2 changes: 1 addition & 1 deletion internal/fs/wrappers/tracing_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -162,7 +162,7 @@ func TestSpanCreation(t *testing.T) {
t.Cleanup(func() {
ex.Reset()
})
m := monitoring{
m := tracing{
wrapped: dummyFS{},
tracer: otel.Tracer("test"),
}
Expand Down

0 comments on commit a4b789d

Please sign in to comment.