diff --git a/migrate-to-otel.gopatch b/migrate-to-otel.gopatch new file mode 100644 index 000000000..81ae324c5 --- /dev/null +++ b/migrate-to-otel.gopatch @@ -0,0 +1,40 @@ +@@ +var a expression +var b expression +var s identifier +var t identifier +@@ +-s, t := opentracing.StartSpanFromContext(a,b) +-... +- defer s.Finish() ++import "go.opentelemetry.io/otel" ++t, s := otel.Tracer("github.com/grafana/pyroscope").Start(a,b) ++defer s.End() + +@@ +var foo,x identifier +@@ + +-import foo "github.com/opentracing/opentracing-go/log" ++import foo "go.opentelemetry.io/otel/attribute" +foo.x + +@@ +@@ +- otlog ++ attribute + +@@ +var span identifier +var x expression +@@ +- span.LogFields(...) ++import "go.opentelemetry.io/otel/trace" ++ span.AddEvent("TODO", trace.WithAttributes(...)) + + +@@ +@@ +-opentracing.Span ++import "go.opentelemetry.io/otel/trace" ++trace.Span diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 989e95a1a..fd690d60f 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -21,14 +21,15 @@ import ( "github.com/grafana/dskit/runutil" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/samber/lo" "github.com/segmentio/parquet-go" "github.com/thanos-io/objstore" - "golang.org/x/exp/constraints" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" "google.golang.org/grpc/codes" @@ -462,58 +463,6 @@ func (b *singleBlockQuerier) Bounds() (model.Time, model.Time) { return b.meta.MinTime, b.meta.MaxTime } -type mapPredicate[K constraints.Integer, V any] struct { - min K - max K - m map[K]V -} - -func newMapPredicate[K constraints.Integer, V any](m map[K]V) query.Predicate { - p := &mapPredicate[K, V]{ - m: m, - } - - first := true - for k := range m { - if first || p.max < k { - p.max = k - } - if first || p.min > k { - p.min = k - } - first = false - } - - return p -} - -func (m *mapPredicate[K, V]) KeepColumnChunk(c parquet.ColumnChunk) bool { - if ci := c.ColumnIndex(); ci != nil { - for i := 0; i < ci.NumPages(); i++ { - min := K(ci.MinValue(i).Int64()) - max := K(ci.MaxValue(i).Int64()) - if m.max >= min && m.min <= max { - return true - } - } - return false - } - - return true -} - -func (m *mapPredicate[K, V]) KeepPage(page parquet.Page) bool { - if min, max, ok := page.Bounds(); ok { - return m.max >= K(min.Int64()) && m.min <= K(max.Int64()) - } - return true -} - -func (m *mapPredicate[K, V]) KeepValue(v parquet.Value) bool { - _, exists := m.m[K(v.Int64())] - return exists -} - type labelsInfo struct { fp model.Fingerprint lbs phlaremodel.Labels @@ -610,8 +559,8 @@ func SelectMatchingProfiles(ctx context.Context, request *ingestv1.SelectProfile } func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesStacktracesRequest, ingestv1.MergeProfilesStacktracesResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesStacktraces") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesStacktraces") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -625,12 +574,11 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request")) } request := r.Request - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -674,7 +622,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in // Signals the end of the profile streaming by sending an empty response. // This allows the client to not block other streaming ingesters. - sp.LogFields(otlog.String("msg", "signaling the end of the profile streaming")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "signaling the end of the profile streaming"))) if err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{}); err != nil { return err } @@ -684,7 +632,7 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in } // sends the final result to the client. - sp.LogFields(otlog.String("msg", "sending the final result to the client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending the final result to the client"))) err = stream.Send(&ingestv1.MergeProfilesStacktracesResponse{ Result: &ingestv1.MergeProfilesStacktracesResult{ Format: ingestv1.StacktracesMergeFormat_MERGE_FORMAT_TREE, @@ -702,8 +650,8 @@ func MergeProfilesStacktraces(ctx context.Context, stream *connect.BidiStream[in } func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesLabelsRequest, ingestv1.MergeProfilesLabelsResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesLabels") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesLabels") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -719,13 +667,12 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv request := r.Request by := r.By sort.Strings(by) - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - otlog.String("by", strings.Join(by, ",")), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID), + attribute.String("by", strings.Join(by, ",")))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -796,8 +743,8 @@ func MergeProfilesLabels(ctx context.Context, stream *connect.BidiStream[ingestv } func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1.MergeProfilesPprofRequest, ingestv1.MergeProfilesPprofResponse], blockGetter BlockGetter) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeProfilesPprof") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeProfilesPprof") + defer sp.End() r, err := stream.Receive() if err != nil { @@ -811,12 +758,11 @@ func MergeProfilesPprof(ctx context.Context, stream *connect.BidiStream[ingestv1 return connect.NewError(connect.CodeInvalidArgument, errors.New("missing initial select request")) } request := r.Request - sp.LogFields( - otlog.String("start", model.Time(request.Start).Time().String()), - otlog.String("end", model.Time(request.End).Time().String()), - otlog.String("selector", request.LabelSelector), - otlog.String("profile_id", request.Type.ID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(request.Start).Time().String()), + attribute.String("end", model.Time(request.End).Time().String()), + attribute.String("selector", request.LabelSelector), + attribute.String("profile_id", request.Type.ID))) queriers, err := blockGetter(ctx, model.Time(request.Start), model.Time(request.End)) if err != nil { @@ -942,8 +888,9 @@ func retrieveStacktracePartition(buf [][]parquet.Value, pos int) uint64 { } func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - Block") + defer sp.End() + if err := b.Open(ctx); err != nil { return nil, err } @@ -988,26 +935,26 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params } var ( - buf [][]parquet.Value - joinIters []query.Iterator + buf [][]parquet.Value + ) + + pIt := query.NewBinaryJoinIterator( + 0, + b.profiles.columnIter(ctx, "SeriesIndex", query.NewMapPredicate(lblsPerRef), "SeriesIndex"), + b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), ) if b.meta.Version >= 2 { - joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), - b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), + pIt = query.NewBinaryJoinIterator( + 0, + pIt, b.profiles.columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), - } + ) buf = make([][]parquet.Value, 3) } else { - joinIters = []query.Iterator{ - b.profiles.columnIter(ctx, "SeriesIndex", newMapPredicate(lblsPerRef), "SeriesIndex"), - b.profiles.columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(model.Time(params.Start).UnixNano(), model.Time(params.End).UnixNano()), "TimeNanos"), - } buf = make([][]parquet.Value, 2) } - pIt := query.NewJoinIterator(0, joinIters, nil) iters := make([]iter.Iterator[Profile], 0, len(lblsPerRef)) defer pIt.Close() @@ -1098,9 +1045,9 @@ func (q *singleBlockQuerier) openFiles(ctx context.Context) error { sp, ctx := opentracing.StartSpanFromContext(ctx, "BlockQuerier - open") defer func() { q.metrics.blockOpeningLatency.Observe(time.Since(start).Seconds()) - sp.LogFields( - otlog.String("block_ulid", q.meta.ULID.String()), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("block_ulid", q.meta.ULID.String()))) + sp.Finish() }() g, ctx := errgroup.WithContext(ctx) @@ -1206,7 +1153,7 @@ func (r *parquetReader[M, P]) columnIter(ctx context.Context, columnName string, return query.NewErrIterator(fmt.Errorf("column '%s' not found in parquet file '%s'", columnName, r.relPath())) } ctx = query.AddMetricsToContext(ctx, r.metrics.query) - return query.NewColumnIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) + return query.NewSyncIterator(ctx, r.file.RowGroups(), index, columnName, 1000, predicate, alias) } func repeatedColumnIter[T any](ctx context.Context, source Source, columnName string, rows iter.Iterator[T]) iter.Iterator[*query.RepeatedRow[T]] { diff --git a/pkg/phlaredb/block_querier_test.go b/pkg/phlaredb/block_querier_test.go index 828a8411e..495107d00 100644 --- a/pkg/phlaredb/block_querier_test.go +++ b/pkg/phlaredb/block_querier_test.go @@ -190,5 +190,4 @@ func TestBlockCompatability(t *testing.T) { }) } - } diff --git a/pkg/phlaredb/head.go b/pkg/phlaredb/head.go index 3c131920d..b54a787e9 100644 --- a/pkg/phlaredb/head.go +++ b/pkg/phlaredb/head.go @@ -17,13 +17,15 @@ import ( "github.com/google/pprof/profile" "github.com/google/uuid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/model/labels" "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/tsdb/fileutil" "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -533,8 +535,8 @@ func (h *Head) Queriers() Queriers { // add the location IDs to the stacktraces func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stacktracesByMapping) *ingestv1.MergeProfilesStacktracesResult { - sp, _ := opentracing.StartSpanFromContext(ctx, "resolveStacktraces - Head") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolveStacktraces - Head") + defer sp.End() names := []string{} functions := map[int64]int{} @@ -548,7 +550,7 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac h.strings.lock.RUnlock() }() - sp.LogFields(otlog.String("msg", "building MergeProfilesStacktracesResult")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building MergeProfilesStacktracesResult"))) _ = stacktracesByMapping.ForEach( func(mapping uint64, stacktraceSamples stacktraceSampleMap) error { mp, ok := h.symbolDB.MappingReader(mapping) @@ -595,8 +597,8 @@ func (h *Head) resolveStacktraces(ctx context.Context, stacktracesByMapping stac } func (h *Head) resolvePprof(ctx context.Context, stacktracesByMapping profileSampleByMapping) *profile.Profile { - sp, _ := opentracing.StartSpanFromContext(ctx, "resolvePprof - Head") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "resolvePprof - Head") + defer sp.End() locations := map[int32]*profile.Location{} functions := map[uint64]*profile.Function{} diff --git a/pkg/phlaredb/head_queriers.go b/pkg/phlaredb/head_queriers.go index bf15d922e..fb132e30f 100644 --- a/pkg/phlaredb/head_queriers.go +++ b/pkg/phlaredb/head_queriers.go @@ -10,6 +10,7 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" @@ -34,8 +35,8 @@ func (q *headOnDiskQuerier) Open(_ context.Context) error { } func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadOnDisk") + defer sp.End() // query the index for rows rowIter, labelsPerFP, err := q.head.profiles.index.selectMatchingRowRanges(ctx, params, q.rowGroupIdx) @@ -48,14 +49,13 @@ func (q *headOnDiskQuerier) SelectMatchingProfiles(ctx context.Context, params * start = model.Time(params.Start) end = model.Time(params.End) ) - pIt := query.NewJoinIterator( - 0, - []query.Iterator{ + pIt := query.NewBinaryJoinIterator(0, + query.NewBinaryJoinIterator( + 0, rowIter, q.rowGroup().columnIter(ctx, "TimeNanos", query.NewIntBetweenPredicate(start.UnixNano(), end.UnixNano()), "TimeNanos"), - q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), - }, - nil, + ), + q.rowGroup().columnIter(ctx, "StacktracePartition", nil, "StacktracePartition"), ) defer pIt.Close() @@ -107,8 +107,8 @@ func (q *headOnDiskQuerier) Bounds() (model.Time, model.Time) { } func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadOnDisk") + defer sp.End() stacktraceSamples := stacktracesByMapping{} @@ -121,8 +121,8 @@ func (q *headOnDiskQuerier) MergeByStacktraces(ctx context.Context, rows iter.It } func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByPprof - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByPprof - HeadOnDisk") + defer sp.End() stacktraceSamples := profileSampleByMapping{} @@ -134,8 +134,8 @@ func (q *headOnDiskQuerier) MergePprof(ctx context.Context, rows iter.Iterator[P } func (q *headOnDiskQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadOnDisk") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadOnDisk") + defer sp.End() seriesByLabels := make(seriesByLabels) @@ -169,8 +169,8 @@ func (q *headInMemoryQuerier) Open(_ context.Context) error { } func (q *headInMemoryQuerier) SelectMatchingProfiles(ctx context.Context, params *ingestv1.SelectProfilesRequest) (iter.Iterator[Profile], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMatchingProfiles - HeadInMemory") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectMatchingProfiles - HeadInMemory") + defer sp.End() index := q.head.profiles.index @@ -216,8 +216,8 @@ func (q *headInMemoryQuerier) Bounds() (model.Time, model.Time) { } func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - HeadInMemory") + defer sp.End() stacktraceSamples := stacktracesByMapping{} @@ -244,8 +244,8 @@ func (q *headInMemoryQuerier) MergeByStacktraces(ctx context.Context, rows iter. } func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergePprof - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergePprof - HeadInMemory") + defer sp.End() stacktraceSamples := profileSampleByMapping{} @@ -268,8 +268,8 @@ func (q *headInMemoryQuerier) MergePprof(ctx context.Context, rows iter.Iterator } func (q *headInMemoryQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "MergeByLabels - HeadInMemory") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - HeadInMemory") + defer sp.End() labelsByFingerprint := map[model.Fingerprint]string{} seriesByLabels := make(seriesByLabels) diff --git a/pkg/phlaredb/phlaredb.go b/pkg/phlaredb/phlaredb.go index 640410353..391f0755d 100644 --- a/pkg/phlaredb/phlaredb.go +++ b/pkg/phlaredb/phlaredb.go @@ -18,9 +18,11 @@ import ( "github.com/grafana/dskit/services" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/common/model" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" profilev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -380,12 +382,8 @@ func filterProfiles[B BidiServerMerge[Res, Req], Profile: maxBlockProfile, Index: 0, }, true, its...), batchProfileSize, func(ctx context.Context, batch []ProfileWithIndex) error { - sp, _ := opentracing.StartSpanFromContext(ctx, "filterProfiles - Filtering batch") - sp.LogFields( - otlog.Int("batch_len", len(batch)), - otlog.Int("batch_requested_size", batchProfileSize), - ) - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "filterProfiles - Filtering batch") + defer sp.End() seriesByFP := map[model.Fingerprint]labelWithIndex{} selectProfileResult.LabelsSets = selectProfileResult.LabelsSets[:0] @@ -409,7 +407,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], }) } - sp.LogFields(otlog.String("msg", "sending batch to client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "sending batch to client"))) var err error switch s := BidiServerMerge[Res, Req](stream).(type) { case BidiServerMerge[*ingestv1.MergeProfilesStacktracesResponse, *ingestv1.MergeProfilesStacktracesRequest]: @@ -433,9 +431,9 @@ func filterProfiles[B BidiServerMerge[Res, Req], } return err } - sp.LogFields(otlog.String("msg", "batch sent to client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "batch sent to client"))) - sp.LogFields(otlog.String("msg", "reading selection from client")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "reading selection from client"))) // handle response for the batch. var selected []bool @@ -462,7 +460,7 @@ func filterProfiles[B BidiServerMerge[Res, Req], } return err } - sp.LogFields(otlog.String("msg", "selection received")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "selection received"))) for i, k := range selected { if k { selection[batch[i].Index] = append(selection[batch[i].Index], batch[i].Profile) diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 5d21f7590..79e44467a 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -498,7 +498,7 @@ func (r *rowGroupOnDisk) columnIter(ctx context.Context, columnName string, pred if !found { return query.NewErrIterator(fmt.Errorf("column '%s' not found in head row group segment '%s'", columnName, r.file.Name())) } - return query.NewColumnIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias) + return query.NewSyncIterator(ctx, []parquet.RowGroup{r.RowGroup}, column.ColumnIndex, columnName, 1000, predicate, alias) } type seriesIDRowsRewriter struct { diff --git a/pkg/phlaredb/profiles.go b/pkg/phlaredb/profiles.go index 5d28d4484..8e236b1aa 100644 --- a/pkg/phlaredb/profiles.go +++ b/pkg/phlaredb/profiles.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/prometheus/promql/parser" "github.com/prometheus/prometheus/storage" "github.com/samber/lo" + "go.opentelemetry.io/otel" "go.uber.org/atomic" "google.golang.org/grpc/codes" @@ -195,8 +196,9 @@ func (pi *profilesIndex) Add(ps *schemav1.InMemoryProfile, lbs phlaremodel.Label } func (pi *profilesIndex) selectMatchingFPs(ctx context.Context, params *ingestv1.SelectProfilesRequest) ([]model.Fingerprint, error) { - sp, _ := opentracing.StartSpanFromContext(ctx, "selectMatchingFPs - Index") - defer sp.Finish() + _, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingFPs - Index") + defer sp.End() + selectors, err := parser.ParseMetricSelector(params.LabelSelector) if err != nil { return nil, status.Error(codes.InvalidArgument, "failed to parse label selectors: "+err.Error()) @@ -246,8 +248,8 @@ func (pi *profilesIndex) selectMatchingRowRanges(ctx context.Context, params *in map[model.Fingerprint]phlaremodel.Labels, error, ) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "selectMatchingRowRanges - Index") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMatchingRowRanges - Index") + defer sp.End() ids, err := pi.selectMatchingFPs(ctx, params) if err != nil { diff --git a/pkg/phlaredb/query/iters.go b/pkg/phlaredb/query/iters.go index 28b2264cc..ac2013fba 100644 --- a/pkg/phlaredb/query/iters.go +++ b/pkg/phlaredb/query/iters.go @@ -8,12 +8,12 @@ import ( "math" "strings" "sync" - "sync/atomic" "github.com/grafana/dskit/multierror" - "github.com/opentracing/opentracing-go" - "github.com/opentracing/opentracing-go/log" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/grafana/phlare/pkg/iter" ) @@ -40,6 +40,10 @@ type RowNumberWithDefinitionLevel struct { DefinitionLevel int } +func (r *RowNumberWithDefinitionLevel) String() string { + return fmt.Sprintf("%v:%v", r.RowNumber, r.DefinitionLevel) +} + // EmptyRowNumber creates an empty invalid row number. func EmptyRowNumber() RowNumber { return RowNumber{-1, -1, -1, -1, -1, -1} @@ -76,7 +80,7 @@ func TruncateRowNumber(t RowNumberWithDefinitionLevel) RowNumber { return n } -func (t RowNumber) Valid() bool { +func (t *RowNumber) Valid() bool { return t[0] >= 0 } @@ -96,17 +100,188 @@ func (t RowNumber) Valid() bool { // gb | 1 | 3 | { 0, 2, 0, 0 } // null | 0 | 1 | { 1, 0, -1, -1 } func (t *RowNumber) Next(repetitionLevel, definitionLevel int) { - // Next row at this level t[repetitionLevel]++ - // New children up through the definition level - for i := repetitionLevel + 1; i <= definitionLevel; i++ { - t[i] = 0 - } - - // Children past the definition level are undefined - for i := definitionLevel + 1; i < len(t); i++ { - t[i] = -1 + // the following is nextSlow() unrolled + switch repetitionLevel { + case 0: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[1] = 0 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[1] = 0 + t[2] = 0 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[1] = 0 + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 1: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[2] = 0 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[2] = 0 + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[2] = 0 + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 2: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[3] = 0 + t[4] = -1 + t[5] = -1 + case 4: + t[3] = 0 + t[4] = 0 + t[5] = -1 + case 5: + t[3] = 0 + t[4] = 0 + t[5] = 0 + } + case 3: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[4] = 0 + t[5] = -1 + case 5: + t[4] = 0 + t[5] = 0 + } + case 4: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[5] = -1 + case 5: + t[5] = 0 + } + case 5: + switch definitionLevel { + case 0: + t[1] = -1 + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 1: + t[2] = -1 + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 2: + t[3] = -1 + t[4] = -1 + t[5] = -1 + case 3: + t[4] = -1 + t[5] = -1 + case 4: + t[5] = -1 + } } } @@ -118,6 +293,26 @@ func (t *RowNumber) Skip(numRows int64) { } } +// Preceding returns the largest representable row number that is immediately prior to this +// one. Think of it like math.NextAfter but for segmented row numbers. Examples: +// +// RowNumber 1000.0.0 (defined at 3 levels) is preceded by 999.max.max +// RowNumber 1000.-1.-1 (defined at 1 level) is preceded by 999.-1.-1 +func (t RowNumber) Preceding() RowNumber { + for i := len(t) - 1; i >= 0; i-- { + switch t[i] { + case -1: + continue + case 0: + t[i] = math.MaxInt64 + default: + t[i]-- + return t + } + } + return t +} + // IteratorResult is a row of data with a row number and named columns of data. // Internally it has an unstructured list for efficient collection. The ToMap() // function can be used to make inspection easier. @@ -179,6 +374,14 @@ func (r *IteratorResult) Columns(buffer [][]parquet.Value, names ...string) [][] return buffer } +func (r *IteratorResult) String() string { + if r == nil { + return "nil" + } + return fmt.Sprintf("rowNum=%d entries=%+#v", r.RowNumber[0], r.ToMap()) + +} + // iterator - Every iterator follows this interface and can be composed. type Iterator = iter.SeekIterator[*IteratorResult, RowNumberWithDefinitionLevel] @@ -186,34 +389,7 @@ func NewErrIterator(err error) Iterator { return iter.NewErrSeekIterator[*IteratorResult, RowNumberWithDefinitionLevel](err) } -var columnIteratorPool = sync.Pool{ - New: func() interface{} { - return &columnIteratorBuffer{} - }, -} - -func columnIteratorPoolGet(capacity, len int) *columnIteratorBuffer { - res := columnIteratorPool.Get().(*columnIteratorBuffer) - if cap(res.rowNumbers) < capacity { - res.rowNumbers = make([]RowNumber, capacity) - } - if cap(res.values) < capacity { - res.values = make([]parquet.Value, capacity) - } - res.rowNumbers = res.rowNumbers[:len] - res.values = res.values[:len] - return res -} - -func columnIteratorPoolPut(b *columnIteratorBuffer) { - b.values = b.values[:cap(b.values)] - for i := range b.values { - b.values[i] = parquet.Value{} - } - columnIteratorPool.Put(b) -} - -var columnIteratorResultPool = sync.Pool{ +var iteratorResultPool = sync.Pool{ New: func() interface{} { return &IteratorResult{Entries: make([]struct { k string @@ -223,459 +399,112 @@ var columnIteratorResultPool = sync.Pool{ }, } -func columnIteratorResultPoolGet() *IteratorResult { - res := columnIteratorResultPool.Get().(*IteratorResult) +func iteratorResultPoolGet() *IteratorResult { + res := iteratorResultPool.Get().(*IteratorResult) return res } -func columnIteratorResultPoolPut(r *IteratorResult) { +func iteratorResultPoolPut(r *IteratorResult) { if r != nil { r.Reset() - columnIteratorResultPool.Put(r) + iteratorResultPool.Put(r) } } -// ColumnIterator asynchronously iterates through the given row groups and column. Applies -// the optional predicate to each chunk, page, and value. Results are read by calling -// Next() until it returns nil. -type ColumnIterator struct { - rgs []parquet.RowGroup - col int - colName string - filter *InstrumentedPredicate - - selectAs string - seekTo atomic.Value - - metrics *Metrics - table string - quit chan struct{} - ch chan *columnIteratorBuffer - - curr *columnIteratorBuffer - currN int +type BinaryJoinIterator struct { + left Iterator + right Iterator + definitionLevel int - result *IteratorResult - err error + err error + res *IteratorResult } -var _ Iterator = (*ColumnIterator)(nil) +var _ Iterator = (*BinaryJoinIterator)(nil) -type columnIteratorBuffer struct { - rowNumbers []RowNumber - values []parquet.Value - err error +func NewBinaryJoinIterator(definitionLevel int, left, right Iterator) *BinaryJoinIterator { + return &BinaryJoinIterator{ + left: left, + right: right, + definitionLevel: definitionLevel, + } } -func NewColumnIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *ColumnIterator { - c := &ColumnIterator{ - metrics: getMetricsFromContext(ctx), - table: strings.ToLower(rgs[0].Schema().Name()) + "s", - rgs: rgs, - col: column, - colName: columnName, - filter: &InstrumentedPredicate{pred: filter}, - selectAs: selectAs, - quit: make(chan struct{}), - ch: make(chan *columnIteratorBuffer, 1), - currN: -1, +// nextOrSeek will use next if the iterator is exactly one row aways +func (bj *BinaryJoinIterator) nextOrSeek(to RowNumberWithDefinitionLevel, it Iterator) bool { + // Seek when definition level is higher then 0, there is not previous iteration or when the difference between current position and to is not 1 + if to.DefinitionLevel != 0 || it.At() == nil || to.RowNumber.Preceding() != it.At().RowNumber { + return it.Seek(to) } - - go c.iterate(ctx, readSize) - return c + return it.Next() } -func (c *ColumnIterator) iterate(ctx context.Context, readSize int) { - defer close(c.ch) - - span, _ := opentracing.StartSpanFromContext(ctx, "columnIterator.iterate", opentracing.Tags{ - "columnIndex": c.col, - "column": c.colName, - }) - defer func() { - span.SetTag("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load()) - span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) - span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) - span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) - span.SetTag("keptPages", c.filter.KeptPages.Load()) - span.SetTag("keptValues", c.filter.KeptValues.Load()) - span.Finish() - }() - - rn := EmptyRowNumber() - buffer := make([]parquet.Value, readSize) - - checkSkip := func(numRows int64) bool { - seekTo := c.seekTo.Load() - if seekTo == nil { +func (bj *BinaryJoinIterator) Next() bool { + for { + if !bj.left.Next() { + bj.err = bj.left.Err() return false } + resLeft := bj.left.At() - seekToRN := seekTo.(RowNumber) - - rnNext := rn - rnNext.Skip(numRows) - - return CompareRowNumbers(0, rnNext, seekToRN) == -1 - } - - for _, rg := range c.rgs { - col := rg.ColumnChunks()[c.col] - - if checkSkip(rg.NumRows()) { - // Skip column chunk - rn.Skip(rg.NumRows()) - continue - } - - if c.filter != nil { - if !c.filter.KeepColumnChunk(col) { - // Skip column chunk - rn.Skip(rg.NumRows()) - continue - } - } - - func(col parquet.ColumnChunk) { - pgs := col.Pages() - defer func() { - if err := pgs.Close(); err != nil { - span.LogKV("closing error", err) - } - }() - for { - pg, err := pgs.ReadPage() - if pg == nil || err == io.EOF { - break - } - c.metrics.pageReadsTotal.WithLabelValues(c.table, c.colName).Add(1) - span.LogFields( - log.String("msg", "reading page"), - log.Int64("page_num_values", pg.NumValues()), - log.Int64("page_size", pg.Size()), - ) - if err != nil { - return - } - - if checkSkip(pg.NumRows()) { - // Skip page - rn.Skip(pg.NumRows()) - continue - } - - if c.filter != nil { - if !c.filter.KeepPage(pg) { - // Skip page - rn.Skip(pg.NumRows()) - continue - } - } - - vr := pg.Values() - for { - count, err := vr.ReadValues(buffer) - if count > 0 { - - // Assign row numbers, filter values, and collect the results. - newBuffer := columnIteratorPoolGet(readSize, 0) - - for i := 0; i < count; i++ { - - v := buffer[i] - - // We have to do this for all values (even if the - // value is excluded by the predicate) - rn.Next(v.RepetitionLevel(), v.DefinitionLevel()) - - if c.filter != nil { - if !c.filter.KeepValue(v) { - continue - } - } - - newBuffer.rowNumbers = append(newBuffer.rowNumbers, rn) - newBuffer.values = append(newBuffer.values, v) - } - - if len(newBuffer.rowNumbers) > 0 { - select { - case c.ch <- newBuffer: - case <-c.quit: - return - case <-ctx.Done(): - return - } - } else { - // All values excluded, we go ahead and immediately - // return the buffer to the pool. - columnIteratorPoolPut(newBuffer) - } - } - - // Error checks MUST occur after processing any returned data - // following io.Reader behavior. - if err == io.EOF { - break - } - if err != nil { - c.ch <- &columnIteratorBuffer{err: err} - return - } - } - - } - }(col) - } -} - -// At returns the current value from the iterator. -func (c *ColumnIterator) At() *IteratorResult { - return c.result -} - -// Next returns the next matching value from the iterator. -// Returns nil when finished. -func (c *ColumnIterator) Next() bool { - t, v := c.next() - if t.Valid() { - c.result = c.makeResult(t, v) - return true - } - - c.result = nil - return false -} - -func (c *ColumnIterator) next() (RowNumber, parquet.Value) { - // Consume current buffer until exhausted - // then read another one from the channel. - if c.curr != nil { - for c.currN++; c.currN < len(c.curr.rowNumbers); { - t := c.curr.rowNumbers[c.currN] - if t.Valid() { - return t, c.curr.values[c.currN] - } + // now seek the right iterator to the left position + if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resLeft.RowNumber, bj.definitionLevel}, bj.right) { + bj.err = bj.right.Err() + return false } - - // Done with this buffer - columnIteratorPoolPut(c.curr) - c.curr = nil - } - - if v, ok := <-c.ch; ok { - if v.err != nil { - c.err = v.err - return EmptyRowNumber(), parquet.Value{} + resRight := bj.right.At() + + makeResult := func() { + bj.res = iteratorResultPoolGet() + bj.res.RowNumber = resLeft.RowNumber + bj.res.Append(resLeft) + bj.res.Append(resRight) + iteratorResultPoolPut(resLeft) + iteratorResultPoolPut(resRight) } - // Got next buffer, guaranteed to have at least 1 element - c.curr = v - c.currN = 0 - return c.curr.rowNumbers[0], c.curr.values[0] - } - - // Failed to read from the channel, means iterator is exhausted. - return EmptyRowNumber(), parquet.Value{} -} - -// SeekTo moves this iterator to the next result that is greater than -// or equal to the given row number (and based on the given definition level) -func (c *ColumnIterator) Seek(to RowNumberWithDefinitionLevel) bool { - var at RowNumber - var v parquet.Value - - // Because iteration happens in the background, we signal the row - // to skip to, and then read until we are at the right spot. The - // seek is best-effort and may have no effect if the iteration - // already further ahead, and there may already be older data - // in the buffer. - c.seekTo.Store(to.RowNumber) - for at, v = c.next(); at.Valid() && CompareRowNumbers(to.DefinitionLevel, at, to.RowNumber) < 0; { - at, v = c.next() - } - - if at.Valid() { - c.result = c.makeResult(at, v) - return true - } - - c.result = nil - return false -} - -func (c *ColumnIterator) makeResult(t RowNumber, v parquet.Value) *IteratorResult { - r := columnIteratorResultPoolGet() - r.RowNumber = t - if c.selectAs != "" { - r.AppendValue(c.selectAs, v) - } - return r -} - -func (c *ColumnIterator) Close() error { - close(c.quit) - return nil -} -func (c *ColumnIterator) Err() error { - return c.err -} - -// JoinIterator joins two or more iterators for matches at the given definition level. -// I.e. joining at definitionLevel=0 means that each iterator must produce a result -// within the same root node. -type JoinIterator struct { - definitionLevel int - iters []Iterator - peeks []*IteratorResult - pred GroupPredicate - - result *IteratorResult -} - -var _ Iterator = (*JoinIterator)(nil) - -func NewJoinIterator(definitionLevel int, iters []Iterator, pred GroupPredicate) *JoinIterator { - j := JoinIterator{ - definitionLevel: definitionLevel, - iters: iters, - peeks: make([]*IteratorResult, len(iters)), - pred: pred, - } - return &j -} - -func (j *JoinIterator) At() *IteratorResult { - return j.result -} - -func (j *JoinIterator) Next() bool { - // Here is the algorithm for joins: On each pass of the iterators - // we remember which ones are pointing at the earliest rows. If all - // are the lowest (and therefore pointing at the same thing) then - // there is a successful join and return the result. - // Else we progress the iterators and try again. - // There is an optimization here in that we can seek to the highest - // row seen. It's impossible to have joins before that row. - for { - lowestRowNumber := MaxRowNumber() - highestRowNumber := EmptyRowNumber() - lowestIters := make([]int, 0, len(j.iters)) - - for iterNum := range j.iters { - res := j.peek(iterNum) - - if res == nil { - // Iterator exhausted, no more joins possible - j.result = nil + if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { + // we have a found an element + makeResult() + return true + } else if cmp < 0 { + if !bj.nextOrSeek(RowNumberWithDefinitionLevel{resRight.RowNumber, bj.definitionLevel}, bj.left) { + bj.err = bj.left.Err() return false } + resLeft = bj.left.At() - c := CompareRowNumbers(j.definitionLevel, res.RowNumber, lowestRowNumber) - switch c { - case -1: - // New lowest, reset - lowestIters = lowestIters[:0] - lowestRowNumber = res.RowNumber - fallthrough - - case 0: - // Same, append - lowestIters = append(lowestIters, iterNum) - } - - if CompareRowNumbers(j.definitionLevel, res.RowNumber, highestRowNumber) == 1 { - // New high water mark - highestRowNumber = res.RowNumber - } - } - - // All iterators pointing at same row? - if len(lowestIters) == len(j.iters) { - // Get the data - result := j.collect(lowestRowNumber) - - // Keep group? - if j.pred == nil || j.pred.KeepGroup(result) { - // Yes - j.result = result + if cmp := CompareRowNumbers(bj.definitionLevel, resLeft.RowNumber, resRight.RowNumber); cmp == 0 { + makeResult() return true } - } - - // Skip all iterators to the highest row seen, it's impossible - // to find matches before that. - j.seekAll(RowNumberWithDefinitionLevel{RowNumber: highestRowNumber, DefinitionLevel: j.definitionLevel}) - } -} - -func (j *JoinIterator) Seek(to RowNumberWithDefinitionLevel) bool { - j.seekAll(to) - return j.Next() -} -func (j *JoinIterator) seekAll(to RowNumberWithDefinitionLevel) { - to.RowNumber = TruncateRowNumber(to) - for iterNum, iter := range j.iters { - if j.peeks[iterNum] == nil || CompareRowNumbers(to.DefinitionLevel, j.peeks[iterNum].RowNumber, to.RowNumber) == -1 { - columnIteratorResultPoolPut(j.peeks[iterNum]) - if iter.Seek(to) { - j.peeks[iterNum] = iter.At() - } else { - j.peeks[iterNum] = nil - } + } else { + // the right value can't be smaller than the left one because we seeked beyond it + panic("not expected to happen") } } } -func (j *JoinIterator) peek(iterNum int) *IteratorResult { - if j.peeks[iterNum] == nil { - if j.iters[iterNum].Next() { - j.peeks[iterNum] = j.iters[iterNum].At() - } - } - return j.peeks[iterNum] +func (bj *BinaryJoinIterator) At() *IteratorResult { + return bj.res } -// Collect data from the given iterators until they point at -// the next row (according to the configured definition level) -// or are exhausted. -func (j *JoinIterator) collect(rowNumber RowNumber) *IteratorResult { - result := columnIteratorResultPoolGet() - result.RowNumber = rowNumber - - for i := range j.iters { - for j.peeks[i] != nil && CompareRowNumbers(j.definitionLevel, j.peeks[i].RowNumber, rowNumber) == 0 { - - result.Append(j.peeks[i]) - - columnIteratorResultPoolPut(j.peeks[i]) - - if j.iters[i].Next() { - j.peeks[i] = j.iters[i].At() - } else { - j.peeks[i] = nil - } - } - } - return result +func (bj *BinaryJoinIterator) Seek(to RowNumberWithDefinitionLevel) bool { + bj.left.Seek(to) + bj.right.Seek(to) + return bj.Next() } -func (j *JoinIterator) Close() error { +func (bj *BinaryJoinIterator) Close() error { var merr multierror.MultiError - for _, i := range j.iters { - merr.Add(i.Close()) - } + merr.Add(bj.left.Close()) + merr.Add(bj.right.Close()) return merr.Err() } -func (j *JoinIterator) Err() error { - for _, i := range j.iters { - if err := i.Err(); err != nil { - return err - } - } - return nil +func (c *BinaryJoinIterator) Err() error { + return c.err } // UnionIterator produces all results for all given iterators. When iterators @@ -783,7 +612,7 @@ func (u *UnionIterator) peek(iterNum int) *IteratorResult { // the next row (according to the configured definition level) // or are exhausted. func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorResult { - result := columnIteratorResultPoolGet() + result := iteratorResultPoolGet() result.RowNumber = rowNumber for _, iterNum := range iterNums { @@ -791,7 +620,7 @@ func (u *UnionIterator) collect(iterNums []int, rowNumber RowNumber) *IteratorRe result.Append(u.peeks[iterNum]) - columnIteratorResultPoolPut(u.peeks[iterNum]) + iteratorResultPoolPut(u.peeks[iterNum]) if u.iters[iterNum].Next() { u.peeks[iterNum] = u.iters[iterNum].At() @@ -902,7 +731,7 @@ func (r *RowNumberIterator[T]) Next() bool { if !r.Iterator.Next() { return false } - r.current = columnIteratorResultPoolGet() + r.current = iteratorResultPoolGet() r.current.Reset() rowGetter, ok := any(r.Iterator.At()).(RowGetter) if !ok { @@ -941,3 +770,463 @@ func (r *RowNumberIterator[T]) Seek(to RowNumberWithDefinitionLevel) bool { } return true } + +// SyncIterator is a synchronous column iterator. It scans through the given row +// groups and column, and applies the optional predicate to each chunk, page, and value. +// Results are read by calling Next() until it returns nil. +type SyncIterator struct { + // Config + column int + columnName string + table string + rgs []parquet.RowGroup + rgsMin []RowNumber + rgsMax []RowNumber // Exclusive, row number of next one past the row group + readSize int + selectAs string + filter *InstrumentedPredicate + + // Status + ctx context.Context + cancel func() + span trace.Span + metrics *Metrics + curr RowNumber + currRowGroup parquet.RowGroup + currRowGroupMin RowNumber + currRowGroupMax RowNumber + currChunk parquet.ColumnChunk + currPages parquet.Pages + currPage parquet.Page + currPageMax RowNumber + currValues parquet.ValueReader + currBuf []parquet.Value + currBufN int + + err error + res *IteratorResult +} + +var _ Iterator = (*SyncIterator)(nil) + +var syncIteratorPool = sync.Pool{ + New: func() interface{} { + return []parquet.Value{} + }, +} + +func syncIteratorPoolGet(capacity, len int) []parquet.Value { + res := syncIteratorPool.Get().([]parquet.Value) + if cap(res) < capacity { + res = make([]parquet.Value, capacity) + } + res = res[:len] + return res +} + +func syncIteratorPoolPut(b []parquet.Value) { + for i := range b { + b[i] = parquet.Value{} + } + syncIteratorPool.Put(b) // nolint: staticcheck +} + +func NewSyncIterator(ctx context.Context, rgs []parquet.RowGroup, column int, columnName string, readSize int, filter Predicate, selectAs string) *SyncIterator { + + // Assign row group bounds. + // Lower bound is inclusive + // Upper bound is exclusive, points at the first row of the next group + rn := EmptyRowNumber() + rgsMin := make([]RowNumber, len(rgs)) + rgsMax := make([]RowNumber, len(rgs)) + for i, rg := range rgs { + rgsMin[i] = rn + rgsMax[i] = rn + rgsMax[i].Skip(rg.NumRows() + 1) + rn.Skip(rg.NumRows()) + } + + tr := otel.Tracer("query") + + ctx, span := tr.Start(ctx, "syncIterator", trace.WithAttributes( + attribute.String("column", columnName), + attribute.Int("columnIndex", column), + )) + + ctx, cancel := context.WithCancel(ctx) + + return &SyncIterator{ + table: strings.ToLower(rgs[0].Schema().Name()) + "s", + ctx: ctx, + cancel: cancel, + metrics: getMetricsFromContext(ctx), + span: span, + column: column, + columnName: columnName, + rgs: rgs, + readSize: readSize, + selectAs: selectAs, + rgsMin: rgsMin, + rgsMax: rgsMax, + filter: &InstrumentedPredicate{pred: filter}, + curr: EmptyRowNumber(), + } +} + +func (c *SyncIterator) At() *IteratorResult { + return c.res +} + +func (c *SyncIterator) Next() bool { + rn, v, err := c.next() + if err != nil { + c.res = nil + c.err = err + return false + } + if !rn.Valid() { + c.res = nil + c.err = nil + return false + } + c.res = c.makeResult(rn, v) + return true +} + +// SeekTo moves this iterator to the next result that is greater than +// or equal to the given row number (and based on the given definition level) +func (c *SyncIterator) Seek(to RowNumberWithDefinitionLevel) bool { + + if c.seekRowGroup(to.RowNumber, to.DefinitionLevel) { + c.res = nil + c.err = nil + return false + } + + done, err := c.seekPages(to.RowNumber, to.DefinitionLevel) + if err != nil { + c.res = nil + c.err = err + return false + } + if done { + c.res = nil + c.err = nil + return false + } + + // The row group and page have been selected to where this value is possibly + // located. Now scan through the page and look for it. + for { + rn, v, err := c.next() + if err != nil { + c.res = nil + c.err = err + return false + } + if !rn.Valid() { + c.res = nil + c.err = nil + return false + } + + if CompareRowNumbers(to.DefinitionLevel, rn, to.RowNumber) >= 0 { + c.res = c.makeResult(rn, v) + c.err = nil + return true + } + } +} + +func (c *SyncIterator) popRowGroup() (parquet.RowGroup, RowNumber, RowNumber) { + if len(c.rgs) == 0 { + return nil, EmptyRowNumber(), EmptyRowNumber() + } + + rg := c.rgs[0] + min := c.rgsMin[0] + max := c.rgsMax[0] + + c.rgs = c.rgs[1:] + c.rgsMin = c.rgsMin[1:] + c.rgsMax = c.rgsMax[1:] + + return rg, min, max +} + +// seekRowGroup skips ahead to the row group that could contain the value at the +// desired row number. Does nothing if the current row group is already the correct one. +func (c *SyncIterator) seekRowGroup(seekTo RowNumber, definitionLevel int) (done bool) { + if c.currRowGroup != nil && CompareRowNumbers(definitionLevel, seekTo, c.currRowGroupMax) >= 0 { + // Done with this row group + c.closeCurrRowGroup() + } + + for c.currRowGroup == nil { + + rg, min, max := c.popRowGroup() + if rg == nil { + return true + } + + if CompareRowNumbers(definitionLevel, seekTo, max) != -1 { + continue + } + + cc := rg.ColumnChunks()[c.column] + if c.filter != nil && !c.filter.KeepColumnChunk(cc) { + continue + } + + // This row group matches both row number and filter. + c.setRowGroup(rg, min, max) + } + + return c.currRowGroup == nil +} + +// seekPages skips ahead in the current row group to the page that could contain the value at +// the desired row number. Does nothing if the current page is already the correct one. +func (c *SyncIterator) seekPages(seekTo RowNumber, definitionLevel int) (done bool, err error) { + if c.currPage != nil && CompareRowNumbers(definitionLevel, seekTo, c.currPageMax) >= 0 { + // Value not in this page + c.setPage(nil) + } + + if c.currPage == nil { + + // TODO (mdisibio) :(((((((( + // pages.SeekToRow is more costly than expected. It doesn't reuse existing i/o + // so it can't be called naively every time we swap pages. We need to figure out + // a way to determine when it is worth calling here. + /* + // Seek into the pages. This is relative to the start of the row group + if seekTo[0] > 0 { + // Determine row delta. We subtract 1 because curr points at the previous row + skip := seekTo[0] - c.currRowGroupMin[0] - 1 + if skip > 0 { + if err := c.currPages.SeekToRow(skip); err != nil { + return true, err + } + c.curr.Skip(skip) + } + }*/ + + for c.currPage == nil { + pg, err := c.currPages.ReadPage() + if pg == nil || err != nil { + // No more pages in this column chunk, + // cleanup and exit. + if err == io.EOF { + err = nil + } + parquet.Release(pg) + c.closeCurrRowGroup() + return true, err + } + c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) + c.span.AddEvent( + "read page (seekPages)", + trace.WithAttributes( + attribute.Int64("page_num_values", pg.NumValues()), + attribute.Int64("page_size", pg.Size()), + )) + + // Skip based on row number? + newRN := c.curr + newRN.Skip(pg.NumRows() + 1) + if CompareRowNumbers(definitionLevel, seekTo, newRN) >= 0 { + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + + // Skip based on filter? + if c.filter != nil && !c.filter.KeepPage(pg) { + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + + c.setPage(pg) + } + } + + return false, nil +} + +// next is the core functionality of this iterator and returns the next matching result. This +// may involve inspecting multiple row groups, pages, and values until a match is found. When +// we run out of things to inspect, it returns nil. The reason this method is distinct from +// Next() is because it doesn't wrap the results in an IteratorResult, which is more efficient +// when being called multiple times and throwing away the results like in SeekTo(). +func (c *SyncIterator) next() (RowNumber, *parquet.Value, error) { + for { + + // return if context is cancelled + select { + case <-c.ctx.Done(): + return EmptyRowNumber(), nil, c.ctx.Err() + default: + } + + if c.currRowGroup == nil { + rg, min, max := c.popRowGroup() + if rg == nil { + return EmptyRowNumber(), nil, nil + } + + cc := rg.ColumnChunks()[c.column] + if c.filter != nil && !c.filter.KeepColumnChunk(cc) { + continue + } + + c.setRowGroup(rg, min, max) + } + + if c.currPage == nil { + pg, err := c.currPages.ReadPage() + if pg == nil || err == io.EOF { + // This row group is exhausted + c.closeCurrRowGroup() + continue + } + if err != nil { + return EmptyRowNumber(), nil, err + } + c.metrics.pageReadsTotal.WithLabelValues(c.table, c.columnName).Add(1) + c.span.AddEvent( + "read page (next)", + trace.WithAttributes( + attribute.Int64("page_num_values", pg.NumValues()), + attribute.Int64("page_size", pg.Size()), + )) + + if c.filter != nil && !c.filter.KeepPage(pg) { + // This page filtered out + c.curr.Skip(pg.NumRows()) + parquet.Release(pg) + continue + } + c.setPage(pg) + } + + // Read next batch of values if needed + if c.currBuf == nil { + c.currBuf = syncIteratorPoolGet(c.readSize, 0) + } + if c.currBufN >= len(c.currBuf) || len(c.currBuf) == 0 { + c.currBuf = c.currBuf[:cap(c.currBuf)] + n, err := c.currValues.ReadValues(c.currBuf) + if err != nil && err != io.EOF { + return EmptyRowNumber(), nil, err + } + c.currBuf = c.currBuf[:n] + c.currBufN = 0 + if n == 0 { + // This value reader and page are exhausted. + c.setPage(nil) + continue + } + } + + // Consume current buffer until empty + for c.currBufN < len(c.currBuf) { + v := &c.currBuf[c.currBufN] + + // Inspect all values to track the current row number, + // even if the value is filtered out next. + c.curr.Next(v.RepetitionLevel(), v.DefinitionLevel()) + c.currBufN++ + + if c.filter != nil && !c.filter.KeepValue(*v) { + continue + } + + return c.curr, v, nil + } + } +} + +func (c *SyncIterator) setRowGroup(rg parquet.RowGroup, min, max RowNumber) { + c.closeCurrRowGroup() + c.curr = min + c.currRowGroup = rg + c.currRowGroupMin = min + c.currRowGroupMax = max + c.currChunk = rg.ColumnChunks()[c.column] + c.currPages = c.currChunk.Pages() +} + +func (c *SyncIterator) setPage(pg parquet.Page) { + + // Handle an outgoing page + if c.currPage != nil { + c.curr = c.currPageMax.Preceding() // Reposition current row number to end of this page. + parquet.Release(c.currPage) + c.currPage = nil + } + + // Reset value buffers + c.currValues = nil + c.currPageMax = EmptyRowNumber() + c.currBufN = 0 + + // If we don't immediately have a new incoming page + // then return the buffer to the pool. + if pg == nil && c.currBuf != nil { + syncIteratorPoolPut(c.currBuf) + c.currBuf = nil + } + + // Handle an incoming page + if pg != nil { + rn := c.curr + rn.Skip(pg.NumRows() + 1) // Exclusive upper bound, points at the first rownumber in the next page + c.currPage = pg + c.currPageMax = rn + c.currValues = pg.Values() + } +} + +func (c *SyncIterator) closeCurrRowGroup() { + if c.currPages != nil { + c.currPages.Close() + } + + c.currRowGroup = nil + c.currRowGroupMin = EmptyRowNumber() + c.currRowGroupMax = EmptyRowNumber() + c.currChunk = nil + c.currPages = nil + c.setPage(nil) +} + +func (c *SyncIterator) makeResult(t RowNumber, v *parquet.Value) *IteratorResult { + r := iteratorResultPoolGet() + r.RowNumber = t + if c.selectAs != "" { + r.AppendValue(c.selectAs, v.Clone()) + } + return r +} + +func (c *SyncIterator) Err() error { + return c.err +} + +func (c *SyncIterator) Close() error { + c.cancel() + c.closeCurrRowGroup() + + c.span.SetAttributes(attribute.Int64("inspectedColumnChunks", c.filter.InspectedColumnChunks.Load())) + /* + c.span.SetTag("inspectedPages", c.filter.InspectedPages.Load()) + c.span.SetTag("inspectedValues", c.filter.InspectedValues.Load()) + c.span.SetTag("keptColumnChunks", c.filter.KeptColumnChunks.Load()) + c.span.SetTag("keptPages", c.filter.KeptPages.Load()) + c.span.SetTag("keptValues", c.filter.KeptValues.Load()) + */ + c.span.End() + return nil +} diff --git a/pkg/phlaredb/query/iters_test.go b/pkg/phlaredb/query/iters_test.go index ba7828d85..1b06b344f 100644 --- a/pkg/phlaredb/query/iters_test.go +++ b/pkg/phlaredb/query/iters_test.go @@ -1,175 +1,586 @@ package query import ( + "bytes" "context" - "errors" + "fmt" + "log" + "math" + "os" "testing" + "github.com/prometheus/client_golang/prometheus" + "github.com/prometheus/client_golang/prometheus/testutil" "github.com/segmentio/parquet-go" "github.com/stretchr/testify/require" + "go.opentelemetry.io/otel" + "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/exporters/jaeger" + "go.opentelemetry.io/otel/sdk/resource" + tracesdk "go.opentelemetry.io/otel/sdk/trace" + semconv "go.opentelemetry.io/otel/semconv/v1.17.0" + "go.opentelemetry.io/otel/trace" ) -type testData struct { - ID int64 `parquet:"id"` - Name string `parquet:"name"` +const MaxDefinitionLevel = 5 + +type makeTestIterFn func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator + +var iterTestCases = []struct { + name string + makeIter makeTestIterFn +}{ + {"sync", func(pf *parquet.File, idx int, filter Predicate, selectAs string) Iterator { + return NewSyncIterator(context.TODO(), pf.RowGroups(), idx, selectAs, 1000, filter, selectAs) + }}, } -func newTestBuffer[A any](rows []A) parquet.RowGroup { - buffer := parquet.NewBuffer() - for i := range rows { - err := buffer.Write(rows[i]) - if err != nil { - panic(err.Error()) +/* +// TestNext compares the unrolled Next() with the original nextSlow() to +// prevent drift +func TestNext(t *testing.T) { + rn1 := RowNumber{0, 0, 0, 0, 0, 0} + rn2 := RowNumber{0, 0, 0, 0, 0, 0} + + for i := 0; i < 1000; i++ { + r := rand.Intn(6) + d := rand.Intn(6) + + rn1.Next(r, d) + rn2.nextSlow(r, d) + + require.Equal(t, rn1, rn2) + } +} +*/ + +func TestRowNumber(t *testing.T) { + tr := EmptyRowNumber() + require.Equal(t, RowNumber{-1, -1, -1, -1, -1, -1}, tr) + + steps := []struct { + repetitionLevel int + definitionLevel int + expected RowNumber + }{ + // Name.Language.Country examples from the Dremel whitepaper + {0, 3, RowNumber{0, 0, 0, 0, -1, -1}}, + {2, 2, RowNumber{0, 0, 1, -1, -1, -1}}, + {1, 1, RowNumber{0, 1, -1, -1, -1, -1}}, + {1, 3, RowNumber{0, 2, 0, 0, -1, -1}}, + {0, 1, RowNumber{1, 0, -1, -1, -1, -1}}, + } + + for _, step := range steps { + tr.Next(step.repetitionLevel, step.definitionLevel) + require.Equal(t, step.expected, tr) + } +} + +func TestCompareRowNumbers(t *testing.T) { + testCases := []struct { + a, b RowNumber + expected int + }{ + {RowNumber{-1}, RowNumber{0}, -1}, + {RowNumber{0}, RowNumber{0}, 0}, + {RowNumber{1}, RowNumber{0}, 1}, + + {RowNumber{0, 1}, RowNumber{0, 2}, -1}, + {RowNumber{0, 2}, RowNumber{0, 1}, 1}, + } + + for _, tc := range testCases { + require.Equal(t, tc.expected, CompareRowNumbers(MaxDefinitionLevel, tc.a, tc.b)) + } +} + +func TestRowNumberPreceding(t *testing.T) { + testCases := []struct { + start, preceding RowNumber + }{ + {RowNumber{1000, -1, -1, -1, -1, -1}, RowNumber{999, -1, -1, -1, -1, -1}}, + {RowNumber{1000, 0, 0, 0, 0, 0}, RowNumber{999, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64, math.MaxInt64}}, + } + + for _, tc := range testCases { + require.Equal(t, tc.preceding, tc.start.Preceding()) + } +} + +func TestColumnIterator(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIterator(t, tc.makeIter) + }) + } +} + +func testColumnIterator(t *testing.T, makeIter makeTestIterFn) { + count := 100_000 + pf := createTestFile(t, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, nil, "A") + defer iter.Close() + + for i := 0; i < count; i++ { + require.True(t, iter.Next()) + res := iter.At() + require.NotNil(t, res, "i=%d", i) + require.Equal(t, RowNumber{int64(i), -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, int64(i), res.ToMap()["A"][0].Int64()) + } + + require.False(t, iter.Next()) + require.NoError(t, iter.Err()) +} + +func TestColumnIteratorSeek(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIteratorSeek(t, tc.makeIter) + }) + } +} + +func testColumnIteratorSeek(t *testing.T, makeIter makeTestIterFn) { + count := 10_000 + pf := createTestFile(t, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, nil, "A") + defer iter.Close() + + seekTos := []int64{ + 100, + 1234, + 4567, + 5000, + 7890, + } + + for _, seekTo := range seekTos { + rn := EmptyRowNumber() + rn[0] = seekTo + require.True(t, iter.Seek(RowNumberWithDefinitionLevel{rn, 0})) + res := iter.At() + require.NotNil(t, res, "seekTo=%v", seekTo) + require.Equal(t, RowNumber{seekTo, -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, seekTo, res.ToMap()["A"][0].Int64()) + } +} + +func TestColumnIteratorPredicate(t *testing.T) { + for _, tc := range iterTestCases { + t.Run(tc.name, func(t *testing.T) { + testColumnIteratorPredicate(t, tc.makeIter) + }) + } +} + +func testColumnIteratorPredicate(t *testing.T, makeIter makeTestIterFn) { + count := 10_000 + pf := createTestFile(t, count) + + pred := NewIntBetweenPredicate(7001, 7003) + + idx, _ := GetColumnIndexByPath(pf, "A") + iter := makeIter(pf, idx, pred, "A") + defer iter.Close() + + expectedResults := []int64{ + 7001, + 7002, + 7003, + } + + for _, expectedResult := range expectedResults { + require.True(t, iter.Next()) + res := iter.At() + require.NotNil(t, res) + require.Equal(t, RowNumber{expectedResult, -1, -1, -1, -1, -1}, res.RowNumber) + require.Equal(t, expectedResult, res.ToMap()["A"][0].Int64()) + } +} + +func TestColumnIteratorExitEarly(t *testing.T) { + type T struct{ A int } + + rows := []T{} + count := 10_000 + for i := 0; i < count; i++ { + rows = append(rows, T{i}) + } + + pf := createFileWith(t, rows, 2) + idx, _ := GetColumnIndexByPath(pf, "A") + readSize := 1000 + + readIter := func(iter Iterator) (int, error) { + received := 0 + for iter.Next() { + received++ } + return received, iter.Err() } - return buffer + + t.Run("cancelledEarly", func(t *testing.T) { + // Cancel before iterating + ctx, cancel := context.WithCancel(context.TODO()) + cancel() + iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A") + count, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + require.Equal(t, 0, count) + }) + + t.Run("cancelledPartial", func(t *testing.T) { + ctx, cancel := context.WithCancel(context.TODO()) + iter := NewSyncIterator(ctx, pf.RowGroups(), idx, "", readSize, nil, "A") + + // Read some results + require.True(t, iter.Next()) + + // Then cancel + cancel() + + // Read again = context cancelled + _, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + }) + + t.Run("closedEarly", func(t *testing.T) { + // Close before iterating + iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A") + iter.Close() + count, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + require.Equal(t, 0, count) + }) + + t.Run("closedPartial", func(t *testing.T) { + iter := NewSyncIterator(context.TODO(), pf.RowGroups(), idx, "", readSize, nil, "A") + + // Read some results + require.True(t, iter.Next()) + + // Then close + iter.Close() + + // Read again = should close early + res2, err := readIter(iter) + require.ErrorContains(t, err, "context canceled") + require.Less(t, readSize+res2, count) + }) } -type errRowGroup struct { - parquet.RowGroup +func BenchmarkColumnIterator(b *testing.B) { + for _, tc := range iterTestCases { + b.Run(tc.name, func(b *testing.B) { + benchmarkColumnIterator(b, tc.makeIter) + }) + } } -func (e *errRowGroup) ColumnChunks() []parquet.ColumnChunk { - chunks := e.RowGroup.ColumnChunks() - for pos := range chunks { - chunks[pos] = &errColumnChunk{chunks[pos]} +func benchmarkColumnIterator(b *testing.B, makeIter makeTestIterFn) { + count := 100_000 + pf := createTestFile(b, count) + + idx, _ := GetColumnIndexByPath(pf, "A") + + b.ResetTimer() + + for i := 0; i < b.N; i++ { + iter := makeIter(pf, idx, nil, "A") + actualCount := 0 + for iter.Next() { + actualCount++ + } + iter.Close() + require.Equal(b, count, actualCount) + //fmt.Println(actualCount) + } +} + +func createTestFile(t testing.TB, count int) *parquet.File { + type T struct{ A int } + + rows := []T{} + for i := 0; i < count; i++ { + rows = append(rows, T{i}) + } + + pf := createFileWith(t, rows, 2) + return pf +} + +func createProfileLikeFile(t testing.TB, count int) *parquet.File { + type T struct { + SeriesID uint32 + TimeNanos int64 + } + + // every row group is ordered by serieID and then time nanos + // time is always increasing between rowgroups + + rowGroups := 10 + series := 8 + + rows := make([]T, count) + for i := range rows { + + rowsPerRowGroup := count / rowGroups + seriesPerRowGroup := rowsPerRowGroup / series + rowGroupNum := i / rowsPerRowGroup + + seriesID := uint32(i % (count / rowGroups) / (rowsPerRowGroup / series)) + rows[i] = T{ + SeriesID: seriesID, + TimeNanos: int64(i%seriesPerRowGroup+rowGroupNum*seriesPerRowGroup) * 1000, + } + } - return chunks + + return createFileWith[T](t, rows, rowGroups) + } -type errColumnChunk struct { - parquet.ColumnChunk +func createFileWith[T any](t testing.TB, rows []T, rowGroups int) *parquet.File { + f, err := os.CreateTemp(t.TempDir(), "data.parquet") + require.NoError(t, err) + t.Logf("Created temp file %s", f.Name()) + + perRG := len(rows) / rowGroups + + w := parquet.NewGenericWriter[T](f) + for i := 0; i < (rowGroups - 1); i++ { + _, err = w.Write(rows[0:perRG]) + require.NoError(t, err) + require.NoError(t, w.Flush()) + rows = rows[perRG:] + } + + _, err = w.Write(rows) + require.NoError(t, err) + require.NoError(t, w.Flush()) + + require.NoError(t, w.Close()) + + stat, err := f.Stat() + require.NoError(t, err) + + pf, err := parquet.OpenFile(f, stat.Size()) + require.NoError(t, err) + + return pf } -func (e *errColumnChunk) Pages() parquet.Pages { - return &errPages{e.ColumnChunk.Pages()} +type iteratorTracer struct { + it Iterator + span trace.Span + name string + nextCount uint32 + seekCount uint32 } -type errPages struct { - parquet.Pages +func (i iteratorTracer) Next() bool { + i.nextCount++ + posBefore := i.it.At() + result := i.it.Next() + posAfter := i.it.At() + i.span.AddEvent("next", trace.WithAttributes( + attribute.String("column", i.name), + attribute.Bool("result", result), + attribute.Stringer("posBefore", posBefore), + attribute.Stringer("posAfter", posAfter), + )) + return result } -func (e *errPages) ReadPage() (parquet.Page, error) { - p, err := e.Pages.ReadPage() - return &errPage{p}, err +func (i iteratorTracer) At() *IteratorResult { + return i.it.At() } -type errPage struct { - parquet.Page +func (i iteratorTracer) Err() error { + return i.it.Err() } -func (e *errPage) Values() parquet.ValueReader { - return &errValueReader{e.Page.Values()} +func (i iteratorTracer) Close() error { + return i.it.Close() } -type errValueReader struct { - parquet.ValueReader +func (i iteratorTracer) Seek(pos RowNumberWithDefinitionLevel) bool { + i.seekCount++ + posBefore := i.it.At() + result := i.it.Seek(pos) + posAfter := i.it.At() + i.span.AddEvent("seek", trace.WithAttributes( + attribute.String("column", i.name), + attribute.Bool("result", result), + attribute.Stringer("seekTo", &pos), + attribute.Stringer("posBefore", posBefore), + attribute.Stringer("posAfter", posAfter), + )) + return result } -func (e *errValueReader) ReadValues(vals []parquet.Value) (int, error) { - _, _ = e.ValueReader.ReadValues(vals) - return 0, errors.New("read error") +func newIteratorTracer(span trace.Span, name string, it Iterator) Iterator { + return &iteratorTracer{ + span: span, + name: name, + it: it, + } } -func withReadValueError(rg []parquet.RowGroup) []parquet.RowGroup { - for pos := range rg { - rg[pos] = &errRowGroup{rg[pos]} +// tracerProvider returns an OpenTelemetry TracerProvider configured to use +// the Jaeger exporter that will send spans to the provided url. The returned +// TracerProvider will also use a Resource configured with all the information +// about the application. +func tracerProvider(url string) (*tracesdk.TracerProvider, error) { + // Create the Jaeger exporter + exp, err := jaeger.New(jaeger.WithCollectorEndpoint(jaeger.WithEndpoint(url))) + if err != nil { + return nil, err } - return rg + tp := tracesdk.NewTracerProvider( + // Always be sure to batch in production. + tracesdk.WithBatcher(exp), + // Record information about this application in a Resource. + tracesdk.WithResource(resource.NewWithAttributes( + semconv.SchemaURL, + semconv.ServiceName("phlare-go-test"), + )), + ) + return tp, nil } -func newTestSet() []parquet.RowGroup { - return []parquet.RowGroup{ - newTestBuffer( - []testData{ - {1, "one"}, - {2, "two"}, - }), - newTestBuffer( - []testData{ - {3, "three"}, - {5, "five"}, - }), +func TestMain(m *testing.M) { + tp, err := tracerProvider("http://localhost:14268/api/traces") + if err != nil { + log.Fatal(err) } + + // Register our TracerProvider as the global so any imported + // instrumentation in the future will default to using it. + otel.SetTracerProvider(tp) + + result := m.Run() + + fmt.Println("shutting tracer down") + tp.Shutdown(context.Background()) + + os.Exit(result) } -func TestColumnIterator(t *testing.T) { +func TestBinaryJoinIterator(t *testing.T) { + tr := otel.Tracer("query") + + _, span := tr.Start(context.Background(), "TestBinaryJoinIterator") + defer span.End() + + rowCount := 1600 + pf := createProfileLikeFile(t, rowCount) + for _, tc := range []struct { - name string - result []parquet.Value - rowGroups []parquet.RowGroup - err error + name string + seriesPredicate Predicate + seriesPageReads int + timePredicate Predicate + timePageReads int + expectedResultCount int }{ { - name: "read-int-column", - rowGroups: newTestSet(), - result: []parquet.Value{ - parquet.ValueOf(1), - parquet.ValueOf(2), - parquet.ValueOf(3), - parquet.ValueOf(5), - }, + name: "no predicate", + expectedResultCount: rowCount, // expect everything + seriesPageReads: 10, + timePageReads: 10, + }, + { + name: "one series ID", + expectedResultCount: rowCount / 8, // expect an eight of the rows + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}}), + seriesPageReads: 10, + timePageReads: 10, + }, + { + name: "two series IDs", + expectedResultCount: rowCount / 8 * 2, // expect two eights of the rows + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + seriesPageReads: 10, + timePageReads: 10, + }, + { + name: "missing series", + expectedResultCount: 0, + seriesPredicate: NewMapPredicate(map[int64]struct{}{10: {}}), + }, + { + name: "first two time stamps each", + expectedResultCount: 2 * 8, // expect two profiles for each series + timePredicate: NewIntBetweenPredicate(0, 1000), + seriesPageReads: 1, + timePageReads: 1, }, { - name: "err-read-values", - rowGroups: withReadValueError(newTestSet()), - err: errors.New("read error"), + name: "time before results", + expectedResultCount: 0, + timePredicate: NewIntBetweenPredicate(-10, -1), + seriesPageReads: 1, + timePageReads: 0, + }, + { + name: "time after results", + expectedResultCount: 0, + timePredicate: NewIntBetweenPredicate(200000, 20001000), + seriesPredicate: NewMapPredicate(map[int64]struct{}{0: {}, 1: {}}), + seriesPageReads: 1, + timePageReads: 0, }, } { t.Run(tc.name, func(t *testing.T) { - var ( - buffer [][]parquet.Value + ctx, cancel := context.WithCancel(context.Background()) + defer cancel() - ctx = context.Background() - i = NewColumnIterator(ctx, tc.rowGroups, 0, "id", 10, nil, "id") - ) - for i.Next() { - require.Nil(t, i.Err()) - buffer = i.At().Columns(buffer, "id") - } + reg := prometheus.NewRegistry() + metrics := NewMetrics(reg) + metrics.pageReadsTotal.WithLabelValues("ts", "SeriesId").Add(0) + metrics.pageReadsTotal.WithLabelValues("ts", "TimeNanos").Add(0) + ctx = AddMetricsToContext(ctx, metrics) - require.Equal(t, tc.err, i.Err()) - }) - } -} + seriesIt := newIteratorTracer(span, "SeriesID", NewSyncIterator(ctx, pf.RowGroups(), 0, "SeriesId", 1000, tc.seriesPredicate, "SeriesId")) + timeIt := newIteratorTracer(span, "TimeNanos", NewSyncIterator(ctx, pf.RowGroups(), 1, "TimeNanos", 1000, tc.timePredicate, "TimeNanos")) -func TestRowNumber(t *testing.T) { - tr := EmptyRowNumber() - require.Equal(t, RowNumber{-1, -1, -1, -1, -1, -1}, tr) + ctx, span := tr.Start(ctx, t.Name()) + defer span.End() - steps := []struct { - repetitionLevel int - definitionLevel int - expected RowNumber - }{ - // Name.Language.Country examples from the Dremel whitepaper - {0, 3, RowNumber{0, 0, 0, 0, -1, -1}}, - {2, 2, RowNumber{0, 0, 1, -1, -1, -1}}, - {1, 1, RowNumber{0, 1, -1, -1, -1, -1}}, - {1, 3, RowNumber{0, 2, 0, 0, -1, -1}}, - {0, 1, RowNumber{1, 0, -1, -1, -1, -1}}, - } + it := NewBinaryJoinIterator( + 0, + seriesIt, + timeIt, + ) - for _, step := range steps { - tr.Next(step.repetitionLevel, step.definitionLevel) - require.Equal(t, step.expected, tr) - } -} + results := 0 + for it.Next() { + span.AddEvent("match", trace.WithAttributes( + attribute.Stringer("element", it.At()), + )) + results++ + } + require.NoError(t, it.Err()) -func TestCompareRowNumbers(t *testing.T) { - testCases := []struct { - a, b RowNumber - expected int - }{ - {RowNumber{-1}, RowNumber{0}, -1}, - {RowNumber{0}, RowNumber{0}, 0}, - {RowNumber{1}, RowNumber{0}, 1}, + require.NoError(t, it.Close()) - {RowNumber{0, 1}, RowNumber{0, 2}, -1}, - {RowNumber{0, 2}, RowNumber{0, 1}, 1}, - } + require.Equal(t, tc.expectedResultCount, results) - for _, tc := range testCases { - require.Equal(t, tc.expected, CompareRowNumbers(5, tc.a, tc.b)) + require.NoError(t, testutil.GatherAndCompare(reg, bytes.NewReader([]byte(fmt.Sprintf( + ` + # HELP pyroscopedb_page_reads_total Total number of pages read while querying + # TYPE pyroscopedb_page_reads_total counter + pyroscopedb_page_reads_total{column="SeriesId",table="ts"} %d + pyroscopedb_page_reads_total{column="TimeNanos",table="ts"} %d + `, tc.seriesPageReads, tc.timePageReads))), "pyroscopedb_page_reads_total")) + + }) } } diff --git a/pkg/phlaredb/query/predicate_test.go b/pkg/phlaredb/query/predicate_test.go index 798d84a96..734041f87 100644 --- a/pkg/phlaredb/query/predicate_test.go +++ b/pkg/phlaredb/query/predicate_test.go @@ -75,7 +75,7 @@ func testPredicate[T any](t *testing.T, tc predicateTestCase[T]) { p := InstrumentedPredicate{pred: tc.predicate} - i := NewColumnIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "") + i := NewSyncIterator(context.TODO(), r.RowGroups(), 0, "test", 100, &p, "") for i.Next() { } diff --git a/pkg/phlaredb/query/predicates.go b/pkg/phlaredb/query/predicates.go index bccdd95cc..5df379154 100644 --- a/pkg/phlaredb/query/predicates.go +++ b/pkg/phlaredb/query/predicates.go @@ -6,6 +6,7 @@ import ( pq "github.com/segmentio/parquet-go" "go.uber.org/atomic" + "golang.org/x/exp/constraints" ) // Predicate is a pushdown predicate that can be applied at @@ -254,3 +255,42 @@ func (p *InstrumentedPredicate) KeepValue(v pq.Value) bool { return false } + +type mapPredicate[K constraints.Integer, V any] struct { + inbetweenPred Predicate + m map[K]V +} + +func NewMapPredicate[K constraints.Integer, V any](m map[K]V) Predicate { + + var min, max int64 + + first := true + for k := range m { + if first || max < int64(k) { + max = int64(k) + } + if first || min > int64(k) { + min = int64(k) + } + first = false + } + + return &mapPredicate[K, V]{ + inbetweenPred: NewIntBetweenPredicate(min, max), + m: m, + } +} + +func (m *mapPredicate[K, V]) KeepColumnChunk(c pq.ColumnChunk) bool { + return m.inbetweenPred.KeepColumnChunk(c) +} + +func (m *mapPredicate[K, V]) KeepPage(page pq.Page) bool { + return m.inbetweenPred.KeepPage(page) +} + +func (m *mapPredicate[K, V]) KeepValue(v pq.Value) bool { + _, exists := m.m[K(v.Int64())] + return exists +} diff --git a/pkg/phlaredb/query/repeated.go b/pkg/phlaredb/query/repeated.go index d2264321c..7164d1a1c 100644 --- a/pkg/phlaredb/query/repeated.go +++ b/pkg/phlaredb/query/repeated.go @@ -7,9 +7,10 @@ import ( "github.com/grafana/dskit/multierror" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/samber/lo" "github.com/segmentio/parquet-go" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "github.com/grafana/phlare/pkg/iter" ) @@ -24,7 +25,7 @@ type repeatedPageIterator[T any] struct { column int readSize int ctx context.Context - span opentracing.Span + span trace.Span rgs []parquet.RowGroup startRowGroupRowNum int64 @@ -134,10 +135,10 @@ Outer: return false } it.span.LogFields( - otlog.String("msg", "Page read"), - otlog.Int64("startRowGroupRowNum", it.startRowGroupRowNum), - otlog.Int64("startPageRowNum", it.startPageRowNum), - otlog.Int64("pageRowNum", it.currentPage.NumRows()), + attribute.String("msg", "Page read"), + attribute.Int64("startRowGroupRowNum", it.startRowGroupRowNum), + attribute.Int64("startPageRowNum", it.startPageRowNum), + attribute.Int64("pageRowNum", it.currentPage.NumRows()), ) it.valueReader = it.currentPage.Values() } diff --git a/pkg/phlaredb/sample_merge.go b/pkg/phlaredb/sample_merge.go index 9b5681530..bde145544 100644 --- a/pkg/phlaredb/sample_merge.go +++ b/pkg/phlaredb/sample_merge.go @@ -6,10 +6,12 @@ import ( "github.com/google/pprof/profile" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/prometheus/common/model" "github.com/samber/lo" "github.com/segmentio/parquet-go" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -20,8 +22,8 @@ import ( ) func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.Iterator[Profile]) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block") + defer sp.End() stacktraceAggrValues := make(stacktracesByMapping) if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil { @@ -33,8 +35,8 @@ func (b *singleBlockQuerier) MergeByStacktraces(ctx context.Context, rows iter.I } func (b *singleBlockQuerier) MergePprof(ctx context.Context, rows iter.Iterator[Profile]) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByStacktraces - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByStacktraces - Block") + defer sp.End() stacktraceAggrValues := make(profileSampleByMapping) if err := mergeByStacktraces(ctx, b.profiles.file, rows, stacktraceAggrValues); err != nil { @@ -85,18 +87,18 @@ func (b *singleBlockQuerier) resolveLocations(ctx context.Context, mapping uint6 } func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSampleByMapping profileSampleByMapping) (*profile.Profile, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolvePprofSymbols - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolvePprofSymbols - Block") + defer sp.End() locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(profileSampleByMapping) * 1024) // gather stacktraces if err := profileSampleByMapping.ForEach(func(mapping uint64, samples profileSampleMap) error { stacktraceIDs := samples.Ids() - sp.LogFields( - otlog.Int("stacktraces", len(stacktraceIDs)), - otlog.Uint64("mapping", mapping), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.Int("stacktraces", len(stacktraceIDs)), + attribute.Uint64("mapping", mapping))) + return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs) }); err != nil { return nil, err @@ -245,26 +247,27 @@ func (b *singleBlockQuerier) resolvePprofSymbols(ctx context.Context, profileSam } func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMapping stacktracesByMapping) (*ingestv1.MergeProfilesStacktracesResult, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ResolveSymbols - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ResolveSymbols - Block") + defer sp.End() + locationsIdsByStacktraceID := newLocationsIdsByStacktraceID(len(stacktracesByMapping) * 1024) // gather stacktraces if err := stacktracesByMapping.ForEach(func(mapping uint64, samples stacktraceSampleMap) error { stacktraceIDs := samples.Ids() - sp.LogFields( - otlog.Int("stacktraces", len(stacktraceIDs)), - otlog.Uint64("mapping", mapping), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.Int("stacktraces", len(stacktraceIDs)), + attribute.Uint64("mapping", mapping))) + return b.resolveLocations(ctx, mapping, locationsIdsByStacktraceID, stacktraceIDs) }); err != nil { return nil, err } - sp.LogFields(otlog.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds()))) + sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("locationIDs", len(locationsIdsByStacktraceID.locationIds())))) // gather locations - sp.LogFields(otlog.String("msg", "gather locations")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather locations"))) var ( locationIDsByFunctionID = newUniqueIDs[[]int64]() locations = b.locations.retrieveRows(ctx, locationsIdsByStacktraceID.locationIds().iterator()) @@ -279,10 +282,10 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa if err := locations.Err(); err != nil { return nil, err } - sp.LogFields(otlog.Int("functions", len(locationIDsByFunctionID))) + sp.AddEvent("TODO", trace.WithAttributes(attribute.Int("functions", len(locationIDsByFunctionID)))) // gather functions - sp.LogFields(otlog.String("msg", "gather functions")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather functions"))) var ( functionIDsByStringID = newUniqueIDs[[]int64]() functions = b.functions.retrieveRows(ctx, locationIDsByFunctionID.iterator()) @@ -297,7 +300,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa } // gather strings - sp.LogFields(otlog.String("msg", "gather strings")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "gather strings"))) var ( names = make([]string, len(functionIDsByStringID)) idSlice = make([][]int64, len(functionIDsByStringID)) @@ -314,7 +317,7 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa return nil, err } - sp.LogFields(otlog.String("msg", "build MergeProfilesStacktracesResult")) + sp.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "build MergeProfilesStacktracesResult"))) // idSlice contains stringIDs and gets rewritten into functionIDs for nameID := range idSlice { var functionIDs []int64 @@ -361,8 +364,8 @@ func (b *singleBlockQuerier) resolveSymbols(ctx context.Context, stacktracesByMa } func (b *singleBlockQuerier) MergeByLabels(ctx context.Context, rows iter.Iterator[Profile], by ...string) ([]*typesv1.Series, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "MergeByLabels - Block") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "MergeByLabels - Block") + defer sp.End() m := make(seriesByLabels) columnName := "TotalValue" @@ -469,8 +472,9 @@ type mapAdder interface { } func mergeByStacktraces(ctx context.Context, profileSource Source, rows iter.Iterator[Profile], m mapAdder) error { - sp, ctx := opentracing.StartSpanFromContext(ctx, "mergeByStacktraces") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "mergeByStacktraces") + defer sp.End() + // clone the rows to be able to iterate over them twice multiRows, err := iter.CloneN(rows, 2) if err != nil { diff --git a/pkg/querier/ingester_querier.go b/pkg/querier/ingester_querier.go index e3361dc67..ab5c18e3f 100644 --- a/pkg/querier/ingester_querier.go +++ b/pkg/querier/ingester_querier.go @@ -8,6 +8,7 @@ import ( ring_client "github.com/grafana/dskit/ring/client" "github.com/opentracing/opentracing-go" "github.com/prometheus/prometheus/promql/parser" + "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -58,8 +59,9 @@ func forAllIngesters[T any](ctx context.Context, ingesterQuerier *IngesterQuerie } func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree Ingesters") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree Ingesters") + defer sp.End() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) @@ -103,8 +105,9 @@ func (q *Querier) selectTreeFromIngesters(ctx context.Context, req *querierv1.Se } func (q *Querier) selectSeriesFromIngesters(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries Ingesters") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries Ingesters") + defer sp.End() + responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(ctx context.Context, ic IngesterQueryClient) (clientpool.BidiClientMergeProfilesLabels, error) { return ic.MergeProfilesLabels(ctx), nil }) diff --git a/pkg/querier/querier.go b/pkg/querier/querier.go index 4d9ce48fd..edde0b6aa 100644 --- a/pkg/querier/querier.go +++ b/pkg/querier/querier.go @@ -15,13 +15,15 @@ import ( "github.com/grafana/dskit/services" "github.com/grafana/mimir/pkg/util/spanlogger" "github.com/opentracing/opentracing-go" - otlog "github.com/opentracing/opentracing-go/log" "github.com/pkg/errors" "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/promql/parser" "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" "golang.org/x/sync/errgroup" googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" @@ -112,8 +114,8 @@ func (q *Querier) stopping(_ error) error { } func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querierv1.ProfileTypesRequest]) (*connect.Response[querierv1.ProfileTypesResponse], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "ProfileTypes") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "ProfileTypes") + defer sp.End() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.ProfileType, error) { res, err := ic.ProfileTypes(childCtx, connect.NewRequest(&ingestv1.ProfileTypesRequest{})) @@ -148,9 +150,9 @@ func (q *Querier) ProfileTypes(ctx context.Context, req *connect.Request[querier func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1.LabelValuesRequest]) (*connect.Response[typesv1.LabelValuesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelValues") defer func() { - sp.LogFields( - otlog.String("name", req.Msg.Name), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("name", req.Msg.Name))) + sp.Finish() }() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) { @@ -173,8 +175,9 @@ func (q *Querier) LabelValues(ctx context.Context, req *connect.Request[typesv1. } func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.LabelNamesRequest]) (*connect.Response[typesv1.LabelNamesResponse], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "LabelNames") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "LabelNames") + defer sp.End() + responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]string, error) { res, err := ic.LabelNames(childCtx, connect.NewRequest(&typesv1.LabelNamesRequest{ Matchers: req.Msg.Matchers, @@ -196,9 +199,9 @@ func (q *Querier) LabelNames(ctx context.Context, req *connect.Request[typesv1.L func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.SeriesRequest]) (*connect.Response[querierv1.SeriesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Series") defer func() { - sp.LogFields( - otlog.String("matchers", strings.Join(req.Msg.Matchers, ",")), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("matchers", strings.Join(req.Msg.Matchers, ",")))) + sp.Finish() }() responses, err := forAllIngesters(ctx, q.ingesterQuerier, func(childCtx context.Context, ic IngesterQueryClient) ([]*typesv1.Labels, error) { @@ -227,13 +230,13 @@ func (q *Querier) Series(ctx context.Context, req *connect.Request[querierv1.Ser func (q *Querier) Diff(ctx context.Context, req *connect.Request[querierv1.DiffRequest]) (*connect.Response[querierv1.DiffResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "Diff") defer func() { - sp.LogFields( - otlog.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()), - otlog.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()), + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("leftStart", model.Time(req.Msg.Left.Start).Time().String()), + attribute.String("leftEnd", model.Time(req.Msg.Left.End).Time().String()), // Assume are the same - otlog.String("selector", req.Msg.Left.LabelSelector), - otlog.String("profile_id", req.Msg.Left.ProfileTypeID), - ) + attribute.String("selector", req.Msg.Left.LabelSelector), + attribute.String("profile_id", req.Msg.Left.ProfileTypeID))) + sp.Finish() }() @@ -409,12 +412,12 @@ func splitQueryToStores(start, end model.Time, now model.Time, queryStoreAfter t func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[querierv1.SelectMergeProfileRequest]) (*connect.Response[googlev1.Profile], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectMergeProfile") defer func() { - sp.LogFields( - otlog.String("start", model.Time(req.Msg.Start).Time().String()), - otlog.String("end", model.Time(req.Msg.End).Time().String()), - otlog.String("selector", req.Msg.LabelSelector), - otlog.String("profile_id", req.Msg.ProfileTypeID), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(req.Msg.Start).Time().String()), + attribute.String("end", model.Time(req.Msg.End).Time().String()), + attribute.String("selector", req.Msg.LabelSelector), + attribute.String("profile_id", req.Msg.ProfileTypeID))) + sp.Finish() }() @@ -467,14 +470,14 @@ func (q *Querier) SelectMergeProfile(ctx context.Context, req *connect.Request[q func (q *Querier) SelectSeries(ctx context.Context, req *connect.Request[querierv1.SelectSeriesRequest]) (*connect.Response[querierv1.SelectSeriesResponse], error) { sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries") defer func() { - sp.LogFields( - otlog.String("start", model.Time(req.Msg.Start).Time().String()), - otlog.String("end", model.Time(req.Msg.End).Time().String()), - otlog.String("selector", req.Msg.LabelSelector), - otlog.String("profile_id", req.Msg.ProfileTypeID), - otlog.String("group_by", strings.Join(req.Msg.GroupBy, ",")), - otlog.Float64("step", req.Msg.Step), - ) + sp.AddEvent("TODO", trace.WithAttributes( + attribute.String("start", model.Time(req.Msg.Start).Time().String()), + attribute.String("end", model.Time(req.Msg.End).Time().String()), + attribute.String("selector", req.Msg.LabelSelector), + attribute.String("profile_id", req.Msg.ProfileTypeID), + attribute.String("group_by", strings.Join(req.Msg.GroupBy, ",")), + attribute.Float64("step", req.Msg.Step))) + sp.Finish() }() diff --git a/pkg/querier/select_merge.go b/pkg/querier/select_merge.go index 5f12d94e7..6a912cad4 100644 --- a/pkg/querier/select_merge.go +++ b/pkg/querier/select_merge.go @@ -8,13 +8,6 @@ import ( "github.com/google/pprof/profile" "github.com/grafana/dskit/multierror" - "github.com/opentracing/opentracing-go" - "github.com/prometheus/common/model" - "github.com/samber/lo" - "golang.org/x/sync/errgroup" - - otlog "github.com/opentracing/opentracing-go/log" - googlev1 "github.com/grafana/phlare/api/gen/proto/go/google/v1" ingestv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" @@ -24,6 +17,13 @@ import ( "github.com/grafana/phlare/pkg/pprof" "github.com/grafana/phlare/pkg/util" "github.com/grafana/phlare/pkg/util/loser" + "github.com/opentracing/opentracing-go" + "github.com/prometheus/common/model" + "github.com/samber/lo" + "go.opentelemetry.io/otel" + attribute "go.opentelemetry.io/otel/attribute" + "go.opentelemetry.io/otel/trace" + "golang.org/x/sync/errgroup" ) type ProfileWithLabels struct { @@ -220,8 +220,9 @@ func (s *mergeIterator[R, Req, Res]) Close() error { // skipDuplicates iterates through the iterator and skip duplicates. func skipDuplicates(ctx context.Context, its []MergeIterator) error { - span, _ := opentracing.StartSpanFromContext(ctx, "skipDuplicates") - defer span.Finish() + _, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "skipDuplicates") + defer span.End() + var errors multierror.MultiError tree := loser.New(its, &ProfileWithLabels{ @@ -259,8 +260,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error { } duplicates++ } - span.LogFields(otlog.Int("duplicates", duplicates)) - span.LogFields(otlog.Int("total", total)) + span.AddEvent("TODO", trace.WithAttributes(attribute.Int("duplicates", duplicates))) + span.AddEvent("TODO", trace.WithAttributes(attribute.Int("total", total))) return errors.Err() } @@ -268,8 +269,8 @@ func skipDuplicates(ctx context.Context, its []MergeIterator) error { // selectMergeTree selects the profile from each ingester by deduping them and // returns merge of stacktrace samples represented as a tree. func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[clientpool.BidiClientMergeProfilesStacktraces]) (*phlaremodel.Tree, error) { - span, ctx := opentracing.StartSpanFromContext(ctx, "selectMergeTree") - defer span.Finish() + ctx, span := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "selectMergeTree") + defer span.End() mergeResults := make([]MergeResult[*ingestv1.MergeProfilesStacktracesResult], len(responses)) iters := make([]MergeIterator, len(responses)) @@ -294,7 +295,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client } // Collects the results in parallel. - span.LogFields(otlog.String("msg", "collecting merge results")) + span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "collecting merge results"))) g, _ := errgroup.WithContext(ctx) m := phlaremodel.NewTreeMerger() sm := phlaremodel.NewStackTraceMerger() @@ -327,7 +328,7 @@ func selectMergeTree(ctx context.Context, responses []ResponseFromReplica[client } } - span.LogFields(otlog.String("msg", "building tree")) + span.AddEvent("TODO", trace.WithAttributes(attribute.String("msg", "building tree"))) return m.Tree(), nil } diff --git a/pkg/querier/store_gateway_querier.go b/pkg/querier/store_gateway_querier.go index 2f333d95b..787616673 100644 --- a/pkg/querier/store_gateway_querier.go +++ b/pkg/querier/store_gateway_querier.go @@ -15,6 +15,7 @@ import ( "github.com/prometheus/client_golang/prometheus" "github.com/prometheus/client_golang/prometheus/promauto" "github.com/prometheus/prometheus/promql/parser" + "go.opentelemetry.io/otel" "golang.org/x/sync/errgroup" ingesterv1 "github.com/grafana/phlare/api/gen/proto/go/ingester/v1" @@ -151,8 +152,9 @@ func GetShuffleShardingSubring(ring ring.ReadRing, userID string, limits StoreGa } func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1.SelectMergeStacktracesRequest) (*phlaremodel.Tree, error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectTree StoreGateway") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectTree StoreGateway") + defer sp.End() + profileType, err := phlaremodel.ParseProfileTypeSelector(req.ProfileTypeID) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) @@ -200,8 +202,9 @@ func (q *Querier) selectTreeFromStoreGateway(ctx context.Context, req *querierv1 } func (q *Querier) selectSeriesFromStoreGateway(ctx context.Context, req *ingesterv1.MergeProfilesLabelsRequest) ([]ResponseFromReplica[clientpool.BidiClientMergeProfilesLabels], error) { - sp, ctx := opentracing.StartSpanFromContext(ctx, "SelectSeries StoreGateway") - defer sp.Finish() + ctx, sp := otel.Tracer("github.com/grafana/pyroscope").Start(ctx, "SelectSeries StoreGateway") + defer sp.End() + tenantID, err := tenant.ExtractTenantIDFromContext(ctx) if err != nil { return nil, connect.NewError(connect.CodeInvalidArgument, err) diff --git a/pkg/scheduler/scheduler.go b/pkg/scheduler/scheduler.go index f4d8d31aa..99a1f8a1f 100644 --- a/pkg/scheduler/scheduler.go +++ b/pkg/scheduler/scheduler.go @@ -27,6 +27,7 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "github.com/weaveworks/common/middleware" "github.com/weaveworks/common/user" + "go.opentelemetry.io/otel/trace" "google.golang.org/grpc" "github.com/grafana/phlare/pkg/frontend/frontendpb" @@ -200,7 +201,7 @@ type schedulerRequest struct { ctx context.Context ctxCancel context.CancelFunc - queueSpan opentracing.Span + queueSpan trace.Span // This is only used for testing. parentSpanContext opentracing.SpanContext