diff --git a/pkg/phlaredb/block/list.go b/pkg/phlaredb/block/list.go index c8081fd95..abde0100c 100644 --- a/pkg/phlaredb/block/list.go +++ b/pkg/phlaredb/block/list.go @@ -1,15 +1,24 @@ package block import ( + "context" + "fmt" "os" "path/filepath" "sort" "time" + "github.com/go-kit/log/level" "github.com/oklog/ulid" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "golang.org/x/sync/errgroup" + + phlareobj "github.com/grafana/phlare/pkg/objstore" + "github.com/grafana/phlare/pkg/util" ) -func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) { +func ListBlocks(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) { result := make(map[ulid.ULID]*Meta) entries, err := os.ReadDir(path) if err != nil { @@ -32,6 +41,109 @@ func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) return result, nil } +// IterBlockMetas iterates over all block metas in the given time range. +// It calls the given function for each block meta. +// It returns the first error returned by the function. +// It returns nil if all calls succeed. +// The function is called concurrently. +// Currently doesn't work with filesystem bucket. +func IterBlockMetas(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time, fn func(*Meta)) error { + allIDs, err := listAllBlockByPrefixes(ctx, bkt, from, to) + if err != nil { + return err + } + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(128) + + // fetch all meta.json + for _, ids := range allIDs { + for _, id := range ids { + id := id + g.Go(func() error { + r, err := bkt.Get(ctx, id+block.MetaFilename) + if err != nil { + return err + } + + m, err := Read(r) + if err != nil { + return err + } + fn(m) + return nil + }) + } + } + return g.Wait() +} + +func listAllBlockByPrefixes(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time) ([][]string, error) { + // todo: We should cache prefixes listing per tenants. + blockPrefixes, err := blockPrefixesFromTo(from, to, 4) + if err != nil { + return nil, err + } + ids := make([][]string, len(blockPrefixes)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(64) + + for i, prefix := range blockPrefixes { + prefix := prefix + i := i + g.Go(func() error { + level.Debug(util.Logger).Log("msg", "listing blocks", "prefix", prefix, "i", i) + prefixIds := []string{} + err := bkt.Iter(ctx, prefix, func(name string) error { + if _, ok := block.IsBlockDir(name); ok { + prefixIds = append(prefixIds, name) + } + return nil + }, objstore.WithoutApendingDirDelim) + if err != nil { + return err + } + ids[i] = prefixIds + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + return ids, nil +} + +// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is: +// 0: 1114y +// 1: 34.8y +// 2: 1y +// 3: 12.4d +// 4: 9h19m +// TODO: To needs to be adapted based on the MaxBlockDuration. +func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) { + var id ulid.ULID + + if orderOfSplit > 9 { + return nil, fmt.Errorf("order of split must be between 0 and 9") + } + + byteShift := (9 - orderOfSplit) * 5 + + ms := uint64(from.UnixMilli()) >> byteShift + ms = ms << byteShift + for ms <= uint64(to.UnixMilli()) { + if err := id.SetTime(ms); err != nil { + return nil, err + } + prefixes = append(prefixes, id.String()[:orderOfSplit+1]) + + ms = ms >> byteShift + ms += 1 + ms = ms << byteShift + } + + return prefixes, nil +} + func SortBlocks(metas map[ulid.ULID]*Meta) []*Meta { var blocks []*Meta diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 1211a9ae8..e431f14f5 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -427,6 +427,21 @@ func newStacktraceResolverV2(bucketReader phlareobj.Bucket) StacktraceDB { } } +func (b *singleBlockQuerier) Profiles() []parquet.RowGroup { + return b.profiles.file.RowGroups() +} + +func (b *singleBlockQuerier) Index() IndexReader { + return b.index +} + +func (b *singleBlockQuerier) Meta() block.Meta { + if b.meta == nil { + return block.Meta{} + } + return *b.meta +} + func (b *singleBlockQuerier) Close() error { b.openLock.Lock() defer func() { @@ -933,9 +948,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params } } - var ( - buf [][]parquet.Value - ) + var buf [][]parquet.Value pIt := query.NewBinaryJoinIterator( 0, diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go new file mode 100644 index 000000000..433b851fc --- /dev/null +++ b/pkg/phlaredb/compact.go @@ -0,0 +1,536 @@ +package phlaredb + +import ( + "context" + "io/fs" + "math" + "os" + "path/filepath" + + "github.com/oklog/ulid" + "github.com/opentracing/opentracing-go" + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/segmentio/parquet-go" + + "github.com/grafana/phlare/pkg/iter" + phlaremodel "github.com/grafana/phlare/pkg/model" + phlareparquet "github.com/grafana/phlare/pkg/parquet" + "github.com/grafana/phlare/pkg/phlaredb/block" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/tsdb/index" + "github.com/grafana/phlare/pkg/util" + "github.com/grafana/phlare/pkg/util/loser" +) + +type BlockReader interface { + Meta() block.Meta + Profiles() []parquet.RowGroup + Index() IndexReader + // todo symbdb +} + +func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error) { + srcMetas := make([]block.Meta, len(src)) + ulids := make([]string, len(src)) + + for i, b := range src { + srcMetas[i] = b.Meta() + ulids[i] = b.Meta().ULID.String() + } + meta = compactMetas(srcMetas) + blockPath := filepath.Join(dst, meta.ULID.String()) + indexPath := filepath.Join(blockPath, block.IndexFilename) + profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + + sp, ctx := opentracing.StartSpanFromContext(ctx, "Compact") + defer func() { + // todo: context propagation is not working through objstore + // This is because the BlockReader has no context. + sp.SetTag("src", ulids) + sp.SetTag("block_id", meta.ULID.String()) + if err != nil { + sp.SetTag("error", err) + } + sp.Finish() + }() + + if len(src) <= 1 { + return block.Meta{}, errors.New("not enough blocks to compact") + } + if err := os.MkdirAll(blockPath, 0o777); err != nil { + return block.Meta{}, err + } + + indexw, err := prepareIndexWriter(ctx, indexPath, src) + if err != nil { + return block.Meta{}, err + } + + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + if err != nil { + return block.Meta{}, err + } + profileWriter := newProfileWriter(profileFile) + + // todo new symbdb + + rowsIt, err := newMergeRowProfileIterator(src) + if err != nil { + return block.Meta{}, err + } + seriesRewriter := newSeriesRewriter(rowsIt, indexw) + symbolsRewriter := newSymbolsRewriter(seriesRewriter) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter)) + + total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) + if err != nil { + return block.Meta{}, err + } + + // flush the index file. + if err := indexw.Close(); err != nil { + return block.Meta{}, err + } + + if err := profileWriter.Close(); err != nil { + return block.Meta{}, err + } + + metaFiles, err := metaFilesFromDir(blockPath) + if err != nil { + return block.Meta{}, err + } + meta.Files = metaFiles + meta.Stats.NumProfiles = total + meta.Stats.NumSeries = seriesRewriter.NumSeries() + meta.Stats.NumSamples = symbolsRewriter.NumSamples() + if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { + return block.Meta{}, err + } + return meta, nil +} + +// metaFilesFromDir returns a list of block files description from a directory. +func metaFilesFromDir(dir string) ([]block.File, error) { + var files []block.File + err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + var f block.File + switch filepath.Ext(info.Name()) { + case block.ParquetSuffix: + f, err = parquetMetaFile(path, info.Size()) + if err != nil { + return err + } + case filepath.Ext(block.IndexFilename): + f, err = tsdbMetaFile(path) + if err != nil { + return err + } + } + f.RelPath, err = filepath.Rel(dir, path) + if err != nil { + return err + } + f.SizeBytes = uint64(info.Size()) + files = append(files, f) + return nil + }) + return files, err +} + +func tsdbMetaFile(filePath string) (block.File, error) { + idxReader, err := index.NewFileReader(filePath) + if err != nil { + return block.File{}, err + } + + return idxReader.FileInfo(), nil +} + +func parquetMetaFile(filePath string, size int64) (block.File, error) { + f, err := os.Open(filePath) + if err != nil { + return block.File{}, err + } + defer f.Close() + + pqFile, err := parquet.OpenFile(f, size) + if err != nil { + return block.File{}, err + } + return block.File{ + Parquet: &block.ParquetFile{ + NumRowGroups: uint64(len(pqFile.RowGroups())), + NumRows: uint64(pqFile.NumRows()), + }, + }, nil +} + +// todo write tests +func compactMetas(src []block.Meta) block.Meta { + meta := block.NewMeta() + highestCompactionLevel := 0 + ulids := make([]ulid.ULID, len(src)) + parents := make([]tsdb.BlockDesc, len(src)) + minTime, maxTime := model.Latest, model.Earliest + labels := make(map[string]string) + for _, b := range src { + if b.Compaction.Level > highestCompactionLevel { + highestCompactionLevel = b.Compaction.Level + } + ulids = append(ulids, b.ULID) + parents = append(parents, tsdb.BlockDesc{ + ULID: b.ULID, + MinTime: int64(b.MinTime), + MaxTime: int64(b.MaxTime), + }) + if b.MinTime < minTime { + minTime = b.MinTime + } + if b.MaxTime > maxTime { + maxTime = b.MaxTime + } + for k, v := range b.Labels { + if k == block.HostnameLabel { + continue + } + labels[k] = v + } + } + if hostname, err := os.Hostname(); err == nil { + labels[block.HostnameLabel] = hostname + } + meta.Source = block.CompactorSource + meta.Compaction = tsdb.BlockMetaCompaction{ + Deletable: meta.Stats.NumSamples == 0, + Level: highestCompactionLevel + 1, + Sources: ulids, + Parents: parents, + } + meta.MaxTime = maxTime + meta.MinTime = minTime + meta.Labels = labels + return *meta +} + +type profileRow struct { + timeNanos int64 + + seriesRef uint32 + labels phlaremodel.Labels + fp model.Fingerprint + row schemav1.ProfileRow +} + +type profileRowIterator struct { + profiles iter.Iterator[parquet.Row] + index IndexReader + allPostings index.Postings + err error + + currentRow profileRow + chunks []index.ChunkMeta +} + +func newProfileRowIterator(reader parquet.RowReader, idx IndexReader) (*profileRowIterator, error) { + k, v := index.AllPostingsKey() + allPostings, err := idx.Postings(k, nil, v) + if err != nil { + return nil, err + } + return &profileRowIterator{ + profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + index: idx, + allPostings: allPostings, + currentRow: profileRow{ + seriesRef: math.MaxUint32, + }, + chunks: make([]index.ChunkMeta, 1), + }, nil +} + +func (p *profileRowIterator) At() profileRow { + return p.currentRow +} + +func (p *profileRowIterator) Next() bool { + if !p.profiles.Next() { + return false + } + p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) + seriesIndex := p.currentRow.row.SeriesIndex() + p.currentRow.timeNanos = p.currentRow.row.TimeNanos() + // do we have a new series? + if seriesIndex == p.currentRow.seriesRef { + return true + } + p.currentRow.seriesRef = seriesIndex + if !p.allPostings.Next() { + if err := p.allPostings.Err(); err != nil { + p.err = err + return false + } + p.err = errors.New("unexpected end of postings") + return false + } + + fp, err := p.index.Series(p.allPostings.At(), &p.currentRow.labels, &p.chunks) + if err != nil { + p.err = err + return false + } + p.currentRow.fp = model.Fingerprint(fp) + return true +} + +func (p *profileRowIterator) Err() error { + if p.err != nil { + return p.err + } + return p.profiles.Err() +} + +func (p *profileRowIterator) Close() error { + return p.profiles.Close() +} + +func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], error) { + its := make([]iter.Iterator[profileRow], len(src)) + for i, s := range src { + // todo: may be we could merge rowgroups in parallel but that requires locking. + reader := parquet.MultiRowGroup(s.Profiles()...).Rows() + it, err := newProfileRowIterator( + reader, + s.Index(), + ) + if err != nil { + return nil, err + } + its[i] = it + } + return &dedupeProfileRowIterator{ + Iterator: iter.NewTreeIterator(loser.New( + its, + profileRow{ + timeNanos: math.MaxInt64, + }, + func(it iter.Iterator[profileRow]) profileRow { return it.At() }, + func(r1, r2 profileRow) bool { + // first handle max profileRow if it's either r1 or r2 + if r1.timeNanos == math.MaxInt64 { + return false + } + if r2.timeNanos == math.MaxInt64 { + return true + } + // then handle normal profileRows + if cmp := phlaremodel.CompareLabelPairs(r1.labels, r2.labels); cmp != 0 { + return cmp < 0 + } + return r1.timeNanos < r2.timeNanos + }, + func(it iter.Iterator[profileRow]) { _ = it.Close() }, + )), + }, nil +} + +type noopStacktraceRewriter struct{} + +func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error { + copy(dst, src) + return nil +} + +type StacktraceRewriter interface { + RewriteStacktraces(src, dst []uint32) error +} + +type symbolsRewriter struct { + iter.Iterator[profileRow] + err error + + rewriter StacktraceRewriter + src, dst []uint32 + numSamples uint64 +} + +// todo remap symbols & ingest symbols +func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { + return &symbolsRewriter{ + Iterator: it, + rewriter: noopStacktraceRewriter{}, + } +} + +func (s *symbolsRewriter) NumSamples() uint64 { + return s.numSamples +} + +func (s *symbolsRewriter) Next() bool { + if !s.Iterator.Next() { + return false + } + var err error + s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.numSamples += uint64(len(values)) + s.loadStacktracesID(values) + err = s.rewriter.RewriteStacktraces(s.src, s.dst) + if err != nil { + return + } + for i, v := range values { + values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.Iterator.Err() +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + if cap(s.src) < len(values) { + s.src = make([]uint32, len(values)*2) + s.dst = make([]uint32, len(values)*2) + } + s.src = s.src[:len(values)] + s.dst = s.dst[:len(values)] + for i := range values { + s.src[i] = values[i].Uint32() + } +} + +type seriesRewriter struct { + iter.Iterator[profileRow] + + indexw *index.Writer + + seriesRef storage.SeriesRef + labels phlaremodel.Labels + previousFp model.Fingerprint + currentChunkMeta index.ChunkMeta + err error + + numSeries uint64 +} + +func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { + return &seriesRewriter{ + Iterator: it, + indexw: indexw, + } +} + +func (s *seriesRewriter) NumSeries() uint64 { + return s.numSeries +} + +func (s *seriesRewriter) Next() bool { + if !s.Iterator.Next() { + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + s.numSeries++ + } + return false + } + currentProfile := s.Iterator.At() + if s.previousFp != currentProfile.fp { + // store the previous series. + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + s.numSeries++ + } + s.seriesRef++ + s.labels = currentProfile.labels.Clone() + s.previousFp = currentProfile.fp + s.currentChunkMeta.MinTime = currentProfile.timeNanos + } + s.currentChunkMeta.MaxTime = currentProfile.timeNanos + currentProfile.row.SetSeriesIndex(uint32(s.seriesRef)) + return true +} + +type rowsIterator struct { + iter.Iterator[profileRow] +} + +func newRowsIterator(it iter.Iterator[profileRow]) *rowsIterator { + return &rowsIterator{ + Iterator: it, + } +} + +func (r *rowsIterator) At() parquet.Row { + return parquet.Row(r.Iterator.At().row) +} + +type dedupeProfileRowIterator struct { + iter.Iterator[profileRow] + + prevFP model.Fingerprint + prevTimeNanos int64 +} + +func (it *dedupeProfileRowIterator) Next() bool { + for { + if !it.Iterator.Next() { + return false + } + currentProfile := it.Iterator.At() + if it.prevFP == currentProfile.fp && it.prevTimeNanos == currentProfile.timeNanos { + // skip duplicate profile + continue + } + it.prevFP = currentProfile.fp + it.prevTimeNanos = currentProfile.timeNanos + return true + } +} + +func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) (*index.Writer, error) { + var symbols index.StringIter + indexw, err := index.NewWriter(ctx, path) + if err != nil { + return nil, err + } + for i, r := range readers { + if i == 0 { + symbols = r.Index().Symbols() + } + symbols = tsdb.NewMergedStringIter(symbols, r.Index().Symbols()) + } + + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return nil, errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return nil, errors.Wrap(symbols.Err(), "next symbol") + } + + return indexw, nil +} diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go new file mode 100644 index 000000000..63a793cc4 --- /dev/null +++ b/pkg/phlaredb/compact_test.go @@ -0,0 +1,296 @@ +package phlaredb + +import ( + "context" + "fmt" + "net/http" + "os" + "path/filepath" + "sort" + "sync" + "testing" + "time" + + _ "net/http/pprof" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/segmentio/parquet-go" + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" + + typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" + phlaremodel "github.com/grafana/phlare/pkg/model" + phlareobj "github.com/grafana/phlare/pkg/objstore" + "github.com/grafana/phlare/pkg/objstore/client" + "github.com/grafana/phlare/pkg/objstore/providers/filesystem" + "github.com/grafana/phlare/pkg/objstore/providers/gcs" + "github.com/grafana/phlare/pkg/phlaredb/block" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/tsdb/index" +) + +func init() { + go func() { + _ = http.ListenAndServe("localhost:6060", nil) + }() +} + +func TestCompact(t *testing.T) { + t.TempDir() + ctx := context.Background() + bkt, err := client.NewBucket(ctx, client.Config{ + StorageBackendConfig: client.StorageBackendConfig{ + Backend: client.GCS, + GCS: gcs.Config{ + BucketName: "dev-us-central-0-profiles-dev-001-data", + }, + }, + StoragePrefix: "1218/phlaredb/", + }, "test") + require.NoError(t, err) + now := time.Now() + var ( + meta []*block.Meta + mtx sync.Mutex + ) + + err = block.IterBlockMetas(ctx, bkt, now.Add(-24*time.Hour), now, func(m *block.Meta) { + mtx.Lock() + defer mtx.Unlock() + meta = append(meta, m) + }) + require.NoError(t, err) + dst := t.TempDir() + + sort.Slice(meta, func(i, j int) bool { + return meta[i].MinTime.Before(meta[j].MinTime) + }) + + // only test on the 4 latest blocks. + meta = meta[len(meta)-4:] + testCompact(t, meta, bkt, dst) +} + +// to download the blocks: +// gsutil -m cp -r \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H53WJEAB43S3GJ26XMSNRSJA" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H5454JBEV80V2J7CKYHPCBG8" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H54553SYKH43FNJN5BVR1M2H" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H5457Q89WYYH9FCK8PZ6XG75" \ +// . +func TestCompactLocal(t *testing.T) { + t.TempDir() + ctx := context.Background() + bkt, err := client.NewBucket(ctx, client.Config{ + StorageBackendConfig: client.StorageBackendConfig{ + Backend: client.Filesystem, + Filesystem: filesystem.Config{ + Directory: "/Users/cyril/work/phlare-data/", + }, + }, + StoragePrefix: "", + }, "test") + require.NoError(t, err) + var metas []*block.Meta + + metaMap, err := block.ListBlocks("/Users/cyril/work/phlare-data/", time.Time{}) + require.NoError(t, err) + for _, m := range metaMap { + metas = append(metas, m) + } + dst := t.TempDir() + testCompact(t, metas, bkt, dst) +} + +func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst string) { + t.Helper() + g, ctx := errgroup.WithContext(context.Background()) + var src []BlockReader + now := time.Now() + for i, m := range metas { + t.Log("src block(#", i, ")", + "ID", m.ULID.String(), + "minTime", m.MinTime.Time().Format(time.RFC3339Nano), + "maxTime", m.MaxTime.Time().Format(time.RFC3339Nano), + "numSeries", m.Stats.NumSeries, + "numProfiles", m.Stats.NumProfiles, + "numSamples", m.Stats.NumSamples) + b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) + g.Go(func() error { + return b.Open(ctx) + }) + + src = append(src, b) + } + + require.NoError(t, g.Wait()) + + new, err := Compact(context.Background(), src, dst) + require.NoError(t, err) + t.Log(new, dst) + t.Log("Compaction duration", time.Since(now)) + t.Log("numSeries", new.Stats.NumSeries, + "numProfiles", new.Stats.NumProfiles, + "numSamples", new.Stats.NumSamples) +} + +func TestProfileRowIterator(t *testing.T) { + filePath := t.TempDir() + "/index.tsdb" + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) + idxr, err := index.NewFileReader(filePath) + require.NoError(t, err) + + it, err := newProfileRowIterator(schemav1.NewInMemoryProfilesRowReader( + []schemav1.InMemoryProfile{ + {SeriesIndex: 0, TimeNanos: 1}, + {SeriesIndex: 1, TimeNanos: 2}, + {SeriesIndex: 2, TimeNanos: 3}, + }, + ), idxr) + require.NoError(t, err) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + require.Equal(t, it.At().timeNanos, int64(1)) + require.Equal(t, it.At().seriesRef, uint32(0)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + require.Equal(t, it.At().timeNanos, int64(2)) + require.Equal(t, it.At().seriesRef, uint32(1)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.Equal(t, it.At().timeNanos, int64(3)) + require.Equal(t, it.At().seriesRef, uint32(2)) + + assert.False(t, it.Next()) + require.NoError(t, it.Err()) + require.NoError(t, it.Close()) +} + +func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Labels) { + t.Helper() + require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) +} + +func TestMetaFilesFromDir(t *testing.T) { + dst := t.TempDir() + generateParquetFile(t, filepath.Join(dst, "foo.parquet")) + generateParquetFile(t, filepath.Join(dst, "symbols", "bar.parquet")) + generateFile(t, filepath.Join(dst, "symbols", "index.symdb"), 100) + generateFile(t, filepath.Join(dst, "symbols", "stacktraces.symdb"), 200) + generateIndexFile(t, dst) + actual, err := metaFilesFromDir(dst) + + require.NoError(t, err) + require.Equal(t, 5, len(actual)) + require.Equal(t, []block.File{ + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: "foo.parquet", + SizeBytes: fileSize(t, filepath.Join(dst, "foo.parquet")), + }, + { + RelPath: block.IndexFilename, + SizeBytes: fileSize(t, filepath.Join(dst, block.IndexFilename)), + TSDB: &block.TSDBFile{ + NumSeries: 3, + }, + }, + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: filepath.Join("symbols", "bar.parquet"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "bar.parquet")), + }, + { + RelPath: filepath.Join("symbols", "index.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "index.symdb")), + }, + { + RelPath: filepath.Join("symbols", "stacktraces.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "stacktraces.symdb")), + }, + }, actual) +} + +func fileSize(t *testing.T, path string) uint64 { + t.Helper() + fi, err := os.Stat(path) + require.NoError(t, err) + return uint64(fi.Size()) +} + +func generateFile(t *testing.T, path string, size int) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + f, err := os.Create(path) + require.NoError(t, err) + defer f.Close() + require.NoError(t, f.Truncate(int64(size))) +} + +func generateIndexFile(t *testing.T, dir string) { + t.Helper() + filePath := filepath.Join(dir, block.IndexFilename) + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) +} + +func generateParquetFile(t *testing.T, path string) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + require.NoError(t, err) + defer file.Close() + + writer := parquet.NewGenericWriter[struct{ Name string }](file, parquet.MaxRowsPerRowGroup(10)) + defer writer.Close() + for i := 0; i < 100; i++ { + _, err := writer.Write([]struct{ Name string }{ + {Name: fmt.Sprintf("name-%d", i)}, + }) + require.NoError(t, err) + } +} diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 79e44467a..9bcaea9e5 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -63,6 +63,14 @@ type profileStore struct { flushBufferLbs []phlaremodel.Labels } +func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] { + return parquet.NewGenericWriter[*schemav1.Profile](writer, (&schemav1.ProfilePersister{}).Schema(), + parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), + parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), + parquet.PageBufferSize(3*1024*1024), + ) +} + func newProfileStore(phlarectx context.Context) *profileStore { s := &profileStore{ logger: phlarecontext.Logger(phlarectx), @@ -76,11 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(), - parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), - parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), - parquet.PageBufferSize(3*1024*1024), - ) + s.writer = newProfileWriter(io.Discard) return s } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index 7fbe2288b..c3806ced9 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,9 +42,10 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int ) func init() { @@ -62,6 +63,11 @@ func init() { panic(fmt.Errorf("TimeNanos column not found")) } timeNanoColIndex = timeCol.ColumnIndex + stacktraceIDCol, ok := profilesSchema.Lookup("Samples", "list", "element", "StacktraceID") + if !ok { + panic(fmt.Errorf("StacktraceID column not found")) + } + stacktraceIDColIndex = stacktraceIDCol.ColumnIndex } type Sample struct { @@ -458,3 +464,43 @@ func lessProfileRows(r1, r2 parquet.Row) bool { } return ts1 < ts2 } + +type ProfileRow parquet.Row + +func (p ProfileRow) SeriesIndex() uint32 { + return p[seriesIndexColIndex].Uint32() +} + +func (p ProfileRow) TimeNanos() int64 { + var ts int64 + for i := len(p) - 1; i >= 0; i-- { + if p[i].Column() == timeNanoColIndex { + ts = p[i].Int64() + break + } + } + return ts +} + +func (p ProfileRow) SetSeriesIndex(v uint32) { + p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex) +} + +func (p ProfileRow) ForStacktraceIDsValues(fn func([]parquet.Value)) { + start := -1 + var i int + for i = 0; i < len(p); i++ { + col := p[i].Column() + if col == stacktraceIDColIndex && p[i].DefinitionLevel() == 1 { + if start == -1 { + start = i + } + } + if col > stacktraceIDColIndex { + break + } + } + if start != -1 { + fn(p[start:i]) + } +} diff --git a/pkg/phlaredb/schemas/v1/profiles_test.go b/pkg/phlaredb/schemas/v1/profiles_test.go index ec94ffd2b..5de8046a7 100644 --- a/pkg/phlaredb/schemas/v1/profiles_test.go +++ b/pkg/phlaredb/schemas/v1/profiles_test.go @@ -207,6 +207,66 @@ func TestLessProfileRows(t *testing.T) { } } +func TestProfileRowStacktraceIDs(t *testing.T) { + for _, tc := range []struct { + name string + expected []uint32 + profile InMemoryProfile + }{ + {"empty", nil, InMemoryProfile{}}, + {"one sample", []uint32{1}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{StacktraceIDs: []uint32{1}, Values: []uint64{1}}, + }}, + {"many", []uint32{1, 1, 2, 3, 4}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }}, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + rows := generateProfileRow([]InMemoryProfile{tc.profile}) + var ids []uint32 + ProfileRow(rows[0]).ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, tc.expected, ids) + }) + } +} + +func TestProfileRowMutateValues(t *testing.T) { + row := ProfileRow(generateProfileRow([]InMemoryProfile{ + { + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }, + })[0]) + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for i := range values { + values[i] = parquet.Int32Value(1).Level(0, 1, values[i].Column()) + } + }) + var ids []uint32 + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, []uint32{1, 1, 1, 1, 1}, ids) +} + func BenchmarkProfileRows(b *testing.B) { a := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 1}})[0] a1 := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 2}})[0] diff --git a/pkg/phlaredb/tsdb/index/index.go b/pkg/phlaredb/tsdb/index/index.go index 7a7ef4fc7..583ac873f 100644 --- a/pkg/phlaredb/tsdb/index/index.go +++ b/pkg/phlaredb/tsdb/index/index.go @@ -1334,9 +1334,21 @@ func (r *Reader) Version() int { // FileInfo returns some general stats about the underlying file func (r *Reader) FileInfo() block.File { + k, v := AllPostingsKey() + postings, err := r.Postings(k, nil, v) + if err != nil { + panic(err) + } + var numSeries uint64 + for postings.Next() { + numSeries++ + } return block.File{ RelPath: block.IndexFilename, SizeBytes: uint64(r.Size()), + TSDB: &block.TSDBFile{ + NumSeries: numSeries, + }, } } diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 4a2c89034..3212fed91 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -2,7 +2,6 @@ package storegateway import ( "context" - "fmt" "os" "path" "path/filepath" @@ -17,8 +16,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/samber/lo" - "github.com/thanos-io/objstore" - "golang.org/x/sync/errgroup" phlareobj "github.com/grafana/phlare/pkg/objstore" "github.com/grafana/phlare/pkg/phlaredb" @@ -258,48 +255,25 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block to = time.Now() from = to.Add(-time.Hour * 24 * 31) // todo make this configurable ) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(128) - allIDs, err := s.listAllBlockByPrefixes(ctx, from, to) - if err != nil { - return nil, err - } - totalMeta := 0 - for _, ids := range allIDs { - totalMeta += len(ids) - } - metas := make([]*block.Meta, totalMeta) - idx := 0 + var ( + metas []*block.Meta + mtx sync.Mutex + ) + start := time.Now() - level.Debug(s.logger).Log("msg", "fetching blocks meta", "total", totalMeta) + level.Debug(s.logger).Log("msg", "fetching blocks meta", "from", from, "to", to) defer func() { - level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", totalMeta, "elapsed", time.Since(start)) + level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", len(metas), "elapsed", time.Since(start)) }() - // fetch all meta.json - for _, ids := range allIDs { - for _, id := range ids { - id := id - currentIdx := idx - idx++ - g.Go(func() error { - r, err := s.bucket.Get(ctx, id+block.MetaFilename) - if err != nil { - return err - } - - m, err := block.Read(r) - if err != nil { - return err - } - metas[currentIdx] = m - return nil - }) - } - } - if err := g.Wait(); err != nil { - return nil, err + if err := block.IterBlockMetas(ctx, s.bucket, from, to, func(m *block.Meta) { + mtx.Lock() + defer mtx.Unlock() + metas = append(metas, m) + }); err != nil { + return nil, errors.Wrap(err, "iter block metas") } + metaMap := lo.SliceToMap(metas, func(item *block.Meta) (ulid.ULID, *block.Meta) { return item.ULID, item }) @@ -316,73 +290,6 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block return metaMap, nil } -func (s *BucketStore) listAllBlockByPrefixes(ctx context.Context, from, to time.Time) ([][]string, error) { - // todo: We should cache prefixes listing per tenants. - blockPrefixes, err := blockPrefixesFromTo(from, to, 4) - if err != nil { - return nil, err - } - ids := make([][]string, len(blockPrefixes)) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(64) - - for i, prefix := range blockPrefixes { - prefix := prefix - i := i - g.Go(func() error { - level.Debug(s.logger).Log("msg", "listing blocks", "prefix", prefix, "i", i) - prefixIds := []string{} - err := s.bucket.Iter(ctx, prefix, func(name string) error { - if _, ok := block.IsBlockDir(name); ok { - prefixIds = append(prefixIds, name) - } - return nil - }, objstore.WithoutApendingDirDelim) - if err != nil { - return err - } - ids[i] = prefixIds - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - return ids, nil -} - -// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is: -// 0: 1114y -// 1: 34.8y -// 2: 1y -// 3: 12.4d -// 4: 9h19m -// TODO: To needs to be adapted based on the MaxBlockDuration. -func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) { - var id ulid.ULID - - if orderOfSplit > 9 { - return nil, fmt.Errorf("order of split must be between 0 and 9") - } - - byteShift := (9 - orderOfSplit) * 5 - - ms := uint64(from.UnixMilli()) >> byteShift - ms = ms << byteShift - for ms <= uint64(to.UnixMilli()) { - if err := id.SetTime(ms); err != nil { - return nil, err - } - prefixes = append(prefixes, id.String()[:orderOfSplit+1]) - - ms = ms >> byteShift - ms += 1 - ms = ms << byteShift - } - - return prefixes, nil -} - // bucketBlockSet holds all blocks. type bucketBlockSet struct { mtx sync.RWMutex diff --git a/pkg/storegateway/gateway_blocks_http.go b/pkg/storegateway/gateway_blocks_http.go index 3cd5203d5..bce862c73 100644 --- a/pkg/storegateway/gateway_blocks_http.go +++ b/pkg/storegateway/gateway_blocks_http.go @@ -80,7 +80,7 @@ func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request) { } } - metasMap, err := block.ListBlock(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{}) + metasMap, err := block.ListBlocks(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{}) if err != nil { util.WriteTextResponse(w, fmt.Sprintf("Failed to read block metadata: %s", err)) return