Skip to content
This repository has been archived by the owner on Jul 19, 2023. It is now read-only.

Commit

Permalink
WIP: Write up gopatch
Browse files Browse the repository at this point in the history
  • Loading branch information
simonswine committed Jul 11, 2023
1 parent 3a9306c commit 8363ed7
Show file tree
Hide file tree
Showing 13 changed files with 217 additions and 158 deletions.
40 changes: 40 additions & 0 deletions migrate-to-otel.gopatch
Original file line number Diff line number Diff line change
@@ -0,0 +1,40 @@
@@
var a expression
var b expression
var s identifier
var t identifier
@@
-s, t := opentracing.StartSpanFromContext(a,b)
-...
- defer s.Finish()
+import "go.opentelemetry.io/otel"
+t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b)
+defer s.End()

@@
var foo,x identifier
@@

-import foo "github.com/opentracing/opentracing-go/log"
+import foo "go.opentelemetry.io/otel/attribute"
foo.x

@@
@@
- otlog
+ attribute

@@
var span identifier
var x expression
@@
- span.LogFields(...)
+import "go.opentelemetry.io/otel/trace"
+ span.AddEvent("TODO", trace.WithAttributes(...))


@@
@@
-opentracing.Span
+import "go.opentelemetry.io/otel/trace"
+trace.Span
66 changes: 33 additions & 33 deletions pkg/phlaredb/block_querier.go
Original file line number Diff line number Diff line change
Expand Up @@ -21,13 +21,15 @@ import (
"github.com/grafana/dskit/runutil"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/promql/parser"
"github.com/samber/lo"
"github.com/segmentio/parquet-go"
"github.com/thanos-io/objstore"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"golang.org/x/sync/errgroup"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -557,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile
}

func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -572,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -621,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in

// Signals the end of the profile streaming by sending an empty response.
// This allows the client to not block other streaming ingesters.
sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming")))
if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil {
return err
}
Expand All @@ -631,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

// sends the final result to the client.
sp.LogFields(otlog.String("msg", "sending the final result to the client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client")))
err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{
Result: &ingestv1.MergeProfilesStacktracesResult{
Format: ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE,
Expand All @@ -649,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in
}

func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -666,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
request := r.Request
by := r.By
sort.Strings(by)
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
otlog.String("by", strings.Join(by, ",")),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID),
attribute.String("by", strings.Join(by, ","))))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -743,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv
}

func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof")
defer sp.End()

r, err := stream.Receive()
if err != nil {
Expand All @@ -758,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1
return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request"))
}
request := r.Request
sp.LogFields(
otlog.String("start", model.Time(request.Start).Time().String()),
otlog.String("end", model.Time(request.End).Time().String()),
otlog.String("selector", request.LabelSelector),
otlog.String("profile_id", request.Type.ID),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("start", model.Time(request.Start).Time().String()),
attribute.String("end", model.Time(request.End).Time().String()),
attribute.String("selector", request.LabelSelector),
attribute.String("profile_id", request.Type.ID)))

queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End))
if err != nil {
Expand Down Expand Up @@ -889,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 {
}

func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block")
defer sp.End()

if err := b.Open(ctx); err != nil {
return nil, err
}
Expand Down Expand Up @@ -1045,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error {
sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open")
defer func() {
q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds())
sp.LogFields(
otlog.String("block_ulid", q.meta.ULID.String()),
)
sp.AddEvent("TODO", trace.WithAttributes(
attribute.String("block_ulid", q.meta.ULID.String())))

sp.Finish()
}()
g, ctx := errgroup.WithContext(ctx)
Expand Down
14 changes: 8 additions & 6 deletions pkg/phlaredb/head.go
Original file line number Diff line number Diff line change
Expand Up @@ -17,13 +17,15 @@ import (
"github.com/google/pprof/profile"
"github.com/google/uuid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/prometheus/prometheus/model/labels"
"github.com/prometheus/prometheus/promql/parser"
"github.com/prometheus/prometheus/tsdb/fileutil"
"github.com/samber/lo"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"
"go.uber.org/atomic"
"google.golang.org/grpc/codes"

Expand Down Expand Up @@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers {

// add the location IDs to the stacktraces
func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head")
defer sp.End()

names := []string{}
functions := map[int64]int{}
Expand All @@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
h.strings.lock.RUnlock()
}()

sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult")))
_ = stacktracesByMapping.ForEach(
func(mapping uint64, stacktraceSamples stacktraceSampleMap) error {
mp, ok := h.symbolDB.MappingReader(mapping)
Expand Down Expand Up @@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac
}

func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile {
sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head")
defer sp.End()

locations := map[int32]*profile.Location{}
functions := map[uint64]*profile.Function{}
Expand Down
33 changes: 17 additions & 16 deletions pkg/phlaredb/head_queriers.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"github.com/segmentio/parquet-go"
"go.opentelemetry.io/otel"

ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1"
Expand All @@ -34,8 +35,8 @@ func (q *headOnDiskQuerier) Open(_ context.Context) error {
}

func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadOnDisk")
defer sp.End()

// query the index for rows
rowIter, labelsPerFP, err := q.head.profiles.index.selectMatchingRowRanges(ctx, params, q.rowGroupIdx)
Expand Down Expand Up @@ -106,8 +107,8 @@ func (q *headOnDiskQuerier) Bounds() (model.Time, model.Time) {
}

func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadOnDisk")
defer sp.End()

stacktraceSamples := stacktracesByMapping{}

Expand All @@ -120,8 +121,8 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It
}

func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByPprof - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByPprof - HeadOnDisk")
defer sp.End()

stacktraceSamples := profileSampleByMapping{}

Expand All @@ -133,8 +134,8 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P
}

func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadOnDisk")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadOnDisk")
defer sp.End()

seriesByLabels := make(seriesByLabels)

Expand Down Expand Up @@ -168,8 +169,8 @@ func (q *headInMemoryQuerier) Open(_ context.Context) error {
}

func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) {
sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory")
defer sp.Finish()
ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadInMemory")
defer sp.End()

index := q.head.profiles.index

Expand Down Expand Up @@ -215,8 +216,8 @@ func (q *headInMemoryQuerier) Bounds() (model.Time, model.Time) {
}

func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadInMemory")
defer sp.End()

stacktraceSamples := stacktracesByMapping{}

Expand All @@ -243,8 +244,8 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.
}

func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergePprof - HeadInMemory")
defer sp.End()

stacktraceSamples := profileSampleByMapping{}

Expand All @@ -267,8 +268,8 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator
}

func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) {
sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadInMemory")
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadInMemory")
defer sp.End()

labelsByFingerprint := map[model.Fingerprint]string{}
seriesByLabels := make(seriesByLabels)
Expand Down
20 changes: 9 additions & 11 deletions pkg/phlaredb/phlaredb.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,9 +18,11 @@ import (
"github.com/grafana/dskit/services"
"github.com/oklog/ulid"
"github.com/opentracing/opentracing-go"
otlog "github.com/opentracing/opentracing-go/log"
"github.com/pkg/errors"
"github.com/prometheus/common/model"
"go.opentelemetry.io/otel"
attribute "go.opentelemetry.io/otel/attribute"
"go.opentelemetry.io/otel/trace"

profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1"
ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1"
Expand Down Expand Up @@ -380,12 +382,8 @@ func filterProfiles[B BidiServerMerge[Res, Req],
Profile: maxBlockProfile,
Index: 0,
}, true, its...), batchProfileSize, func(ctx context.Context, batch []ProfileWithIndex) error {
sp, _ := opentracing.StartSpanFromContext(ctx, "filterProfiles - Filtering batch")
sp.LogFields(
otlog.Int("batch_len", len(batch)),
otlog.Int("batch_requested_size", batchProfileSize),
)
defer sp.Finish()
_, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "filterProfiles - Filtering batch")
defer sp.End()

seriesByFP := map[model.Fingerprint]labelWithIndex{}
selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0]
Expand All @@ -409,7 +407,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
})

}
sp.LogFields(otlog.String("msg", "sending batch to client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending batch to client")))
var err error
switch s := BidiServerMerge[Res, Req](stream).(type) {
case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]:
Expand All @@ -433,9 +431,9 @@ func filterProfiles[B BidiServerMerge[Res, Req],
}
return err
}
sp.LogFields(otlog.String("msg", "batch sent to client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "batch sent to client")))

sp.LogFields(otlog.String("msg", "reading selection from client"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "reading selection from client")))

// handle response for the batch.
var selected []bool
Expand All @@ -462,7 +460,7 @@ func filterProfiles[B BidiServerMerge[Res, Req],
}
return err
}
sp.LogFields(otlog.String("msg", "selection received"))
sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "selection received")))
for i, k := range selected {
if k {
selection[batch[i].Index] = append(selection[batch[i].Index], batch[i].Profile)
Expand Down
Loading

0 comments on commit 8363ed7

Please sign in to comment.