From ee8a92e04839d243bb0d040bdbcdabdfb22a3b2e Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 6 Jul 2023 16:59:56 +0200 Subject: [PATCH 01/15] Add first draft of block compaction --- pkg/iter/tree.go | 31 +++ pkg/parquet/row_reader.go | 43 ++-- pkg/parquet/row_reader_test.go | 23 +++ pkg/parquet/row_writer.go | 4 +- pkg/parquet/row_writer_test.go | 2 +- pkg/phlaredb/compact.go | 301 ++++++++++++++++++++++++++++ pkg/phlaredb/profile_store.go | 14 +- pkg/phlaredb/schemas/v1/profiles.go | 21 ++ pkg/phlaredb/tsdb/builder.go | 21 ++ 9 files changed, 436 insertions(+), 24 deletions(-) create mode 100644 pkg/iter/tree.go create mode 100644 pkg/phlaredb/compact.go create mode 100644 pkg/phlaredb/tsdb/builder.go diff --git a/pkg/iter/tree.go b/pkg/iter/tree.go new file mode 100644 index 000000000..4f6695087 --- /dev/null +++ b/pkg/iter/tree.go @@ -0,0 +1,31 @@ +package iter + +import ( + "github.com/grafana/phlare/pkg/util/loser" +) + +var _ Iterator[interface{}] = &TreeIterator[interface{}]{} + +type TreeIterator[T any] struct { + *loser.Tree[T, Iterator[T]] +} + +// NewTreeIterator returns an Iterator that iterates over the given loser tree iterator. +func NewTreeIterator[T any](tree *loser.Tree[T, Iterator[T]]) *TreeIterator[T] { + return &TreeIterator[T]{ + Tree: tree, + } +} + +func (it TreeIterator[T]) At() T { + return it.Tree.Winner().At() +} + +func (it *TreeIterator[T]) Err() error { + return it.Tree.Winner().Err() +} + +func (it *TreeIterator[T]) Close() error { + it.Tree.Close() + return nil +} diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 89a786f6a..80e0d2551 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -16,7 +16,7 @@ const ( var ( _ parquet.RowReader = (*emptyRowReader)(nil) _ parquet.RowReader = (*ErrRowReader)(nil) - _ parquet.RowReader = (*MergeRowReader)(nil) + _ parquet.RowReader = (*IteratorRowReader)(nil) _ iter.Iterator[parquet.Row] = (*BufferedRowReaderIterator)(nil) EmptyRowReader = &emptyRowReader{} @@ -32,10 +32,6 @@ func NewErrRowReader(err error) *ErrRowReader { return &ErrRowReader{err: err} } func (e ErrRowReader) ReadRows(rows []parquet.Row) (int, error) { return 0, e.err } -type MergeRowReader struct { - tree *loser.Tree[parquet.Row, iter.Iterator[parquet.Row]] -} - // NewMergeRowReader returns a RowReader that k-way merges the given readers using the less function. // Each reader must be sorted according to the less function already. func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less func(parquet.Row, parquet.Row) bool) parquet.RowReader { @@ -50,18 +46,31 @@ func NewMergeRowReader(readers []parquet.RowReader, maxValue parquet.Row, less f its[i] = NewBufferedRowReaderIterator(readers[i], defaultRowBufferSize) } - return &MergeRowReader{ - tree: loser.New( - its, - maxValue, - func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, - less, - func(it iter.Iterator[parquet.Row]) { it.Close() }, + return NewIteratorRowReader( + iter.NewTreeIterator[parquet.Row]( + loser.New( + its, + maxValue, + func(it iter.Iterator[parquet.Row]) parquet.Row { return it.At() }, + less, + func(it iter.Iterator[parquet.Row]) { _ = it.Close() }, + ), ), + ) +} + +type IteratorRowReader struct { + iter.Iterator[parquet.Row] +} + +// NewIteratorRowReader returns a RowReader that reads rows from the given iterator. +func NewIteratorRowReader(it iter.Iterator[parquet.Row]) *IteratorRowReader { + return &IteratorRowReader{ + Iterator: it, } } -func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { +func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) { var n int if len(rows) == 0 { return 0, nil @@ -70,11 +79,13 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { if n == len(rows) { break } - if !s.tree.Next() { - s.tree.Close() + if !it.Next() { + if err := it.Close(); err != nil { + return n, err + } return n, io.EOF } - rows[n] = s.tree.Winner().At() + rows[n] = it.At() n++ } return n, nil diff --git a/pkg/parquet/row_reader_test.go b/pkg/parquet/row_reader_test.go index 075d44bc7..48d84d5bc 100644 --- a/pkg/parquet/row_reader_test.go +++ b/pkg/parquet/row_reader_test.go @@ -141,3 +141,26 @@ func TestNewMergeRowReader(t *testing.T) { }) } } + +func TestIteratorRowReader(t *testing.T) { + it := NewIteratorRowReader( + NewBufferedRowReaderIterator(NewBatchReader([][]parquet.Row{ + {{parquet.Int32Value(1)}, {parquet.Int32Value(2)}, {parquet.Int32Value(3)}}, + {{parquet.Int32Value(4)}, {parquet.Int32Value(5)}, {parquet.Int32Value(6)}}, + {{parquet.Int32Value(7)}, {parquet.Int32Value(8)}, {parquet.Int32Value(9)}}, + }), 4), + ) + actual, err := ReadAllWithBufferSize(it, 3) + require.NoError(t, err) + require.Equal(t, []parquet.Row{ + {parquet.Int32Value(1)}, + {parquet.Int32Value(2)}, + {parquet.Int32Value(3)}, + {parquet.Int32Value(4)}, + {parquet.Int32Value(5)}, + {parquet.Int32Value(6)}, + {parquet.Int32Value(7)}, + {parquet.Int32Value(8)}, + {parquet.Int32Value(9)}, + }, actual) +} diff --git a/pkg/parquet/row_writer.go b/pkg/parquet/row_writer.go index eebc76590..54bf3abbd 100644 --- a/pkg/parquet/row_writer.go +++ b/pkg/parquet/row_writer.go @@ -6,7 +6,7 @@ import ( "github.com/segmentio/parquet-go" ) -type RowGroupWriter interface { +type RowWriterFlusher interface { parquet.RowWriter Flush() error } @@ -14,7 +14,7 @@ type RowGroupWriter interface { // CopyAsRowGroups copies row groups to dst from src and flush a rowgroup per rowGroupNumCount read. // It returns the total number of rows copied and the number of row groups written. // Flush is called to create a new row group. -func CopyAsRowGroups(dst RowGroupWriter, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { +func CopyAsRowGroups(dst RowWriterFlusher, src parquet.RowReader, rowGroupNumCount int) (total uint64, rowGroupCount uint64, err error) { if rowGroupNumCount <= 0 { panic("rowGroupNumCount must be positive") } diff --git a/pkg/parquet/row_writer_test.go b/pkg/parquet/row_writer_test.go index 8b6ddb52e..285cc6ab9 100644 --- a/pkg/parquet/row_writer_test.go +++ b/pkg/parquet/row_writer_test.go @@ -7,7 +7,7 @@ import ( "github.com/stretchr/testify/require" ) -var _ RowGroupWriter = (*TestRowGroupWriter)(nil) +var _ RowWriterFlusher = (*TestRowGroupWriter)(nil) type TestRowGroupWriter struct { RowGroups [][]parquet.Row diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go new file mode 100644 index 000000000..696659556 --- /dev/null +++ b/pkg/phlaredb/compact.go @@ -0,0 +1,301 @@ +package phlaredb + +import ( + "context" + "math" + "os" + "path/filepath" + + "github.com/pkg/errors" + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/prometheus/prometheus/tsdb" + "github.com/segmentio/parquet-go" + + "github.com/grafana/phlare/pkg/iter" + phlaremodel "github.com/grafana/phlare/pkg/model" + phlareparquet "github.com/grafana/phlare/pkg/parquet" + "github.com/grafana/phlare/pkg/phlaredb/block" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/tsdb/index" + "github.com/grafana/phlare/pkg/util" + "github.com/grafana/phlare/pkg/util/loser" +) + +type BlockReader interface { + Profiles() []parquet.RowGroup + Index() IndexReader + // Symbols() SymbolReader +} + +type SymbolReader interface { + // todo +} + +func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { + meta := block.NewMeta() + blockPath := filepath.Join(dst, meta.ULID.String()) + if err := os.MkdirAll(blockPath, 0o777); err != nil { + return block.Meta{}, err + } + indexPath := filepath.Join(blockPath, block.IndexFilename) + indexw, err := prepareIndexWriter(ctx, indexPath, src) + if err != nil { + return block.Meta{}, err + } + profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + if err != nil { + return block.Meta{}, err + } + profileWriter := newProfileWriter(profileFile) + + // todo new symbdb + + rowsIt := newMergeRowProfileIterator(src) + rowsIt = newSeriesRewriter(rowsIt, indexw) + rowsIt = newSymbolsRewriter(rowsIt) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) + + // todo size of rowgroups. + _, _, err = phlareparquet.CopyAsRowGroups(profileWriter, reader, 1024) + if err != nil { + return block.Meta{}, err + } + + // flush the index file. + if err := indexw.Close(); err != nil { + return block.Meta{}, err + } + + if err := profileWriter.Close(); err != nil { + return block.Meta{}, err + } + // todo: block meta + if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { + return block.Meta{}, err + } + return *meta, nil +} + +type profileRow struct { + timeNanos int64 + + seriesRef uint32 + labels phlaremodel.Labels + fp model.Fingerprint + row schemav1.ProfileRow +} + +type profileRowIterator struct { + profiles iter.Iterator[parquet.Row] + index IndexReader + err error + + currentRow profileRow + chunks []index.ChunkMeta +} + +func newProfileRowIterator(profiles iter.Iterator[parquet.Row], idx IndexReader) *profileRowIterator { + return &profileRowIterator{ + profiles: profiles, + index: idx, + currentRow: profileRow{ + seriesRef: math.MaxUint32, + }, + chunks: make([]index.ChunkMeta, 1), + } +} + +func (p *profileRowIterator) At() profileRow { + return p.currentRow +} + +func (p *profileRowIterator) Next() bool { + if !p.profiles.Next() { + return false + } + p.currentRow.row = schemav1.ProfileRow(p.profiles.At()) + seriesIndex := p.currentRow.row.SeriesIndex() + p.currentRow.timeNanos = p.currentRow.row.TimeNanos() + // do we have a new series? + if seriesIndex == p.currentRow.seriesRef { + return true + } + p.currentRow.seriesRef = seriesIndex + fp, err := p.index.Series(storage.SeriesRef(p.currentRow.seriesRef), &p.currentRow.labels, &p.chunks) + if err != nil { + p.err = err + return false + } + p.currentRow.fp = model.Fingerprint(fp) + return true +} + +func (p *profileRowIterator) Err() error { + if p.err != nil { + return p.err + } + return p.profiles.Err() +} + +func (p *profileRowIterator) Close() error { + return p.profiles.Close() +} + +func newMergeRowProfileIterator(src []BlockReader) iter.Iterator[profileRow] { + its := make([]iter.Iterator[profileRow], len(src)) + for i, s := range src { + // todo: may be we could merge rowgroups in parallel but that requires locking. + reader := parquet.MultiRowGroup(s.Profiles()...).Rows() + its[i] = newProfileRowIterator( + phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + s.Index(), + ) + } + return &dedupeProfileRowIterator{ + Iterator: iter.NewTreeIterator(loser.New( + its, + profileRow{ + timeNanos: math.MaxInt64, + }, + func(it iter.Iterator[profileRow]) profileRow { return it.At() }, + func(r1, r2 profileRow) bool { + // first handle max profileRow if it's either r1 or r2 + if r1.timeNanos == math.MaxInt64 { + return false + } + if r2.timeNanos == math.MaxInt64 { + return true + } + // then handle normal profileRows + if cmp := phlaremodel.CompareLabelPairs(r1.labels, r2.labels); cmp != 0 { + return cmp < 0 + } + return r1.timeNanos < r2.timeNanos + }, + func(it iter.Iterator[profileRow]) { _ = it.Close() }, + )), + } +} + +type symbolsRewriter struct { + iter.Iterator[profileRow] +} + +// todo remap symbols & ingest symbols +func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { + return &symbolsRewriter{ + Iterator: it, + } +} + +type seriesRewriter struct { + iter.Iterator[profileRow] + + indexw *index.Writer + + seriesRef storage.SeriesRef + labels phlaremodel.Labels + previousFp model.Fingerprint + currentChunkMeta index.ChunkMeta + err error +} + +func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { + return &seriesRewriter{ + Iterator: it, + indexw: indexw, + } +} + +func (s *seriesRewriter) Next() bool { + if !s.Iterator.Next() { + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + } + return false + } + currentProfile := s.Iterator.At() + if s.previousFp != currentProfile.fp { + // store the previous series. + if s.previousFp != 0 { + if err := s.indexw.AddSeries(s.seriesRef, s.labels, s.previousFp, s.currentChunkMeta); err != nil { + s.err = err + return false + } + } + s.seriesRef++ + s.labels = currentProfile.labels.Clone() + s.previousFp = currentProfile.fp + s.currentChunkMeta.MinTime = currentProfile.timeNanos + } + s.currentChunkMeta.MaxTime = currentProfile.timeNanos + currentProfile.row.SetSeriesIndex(uint32(s.seriesRef)) + return true +} + +type rowsIterator struct { + iter.Iterator[profileRow] +} + +func newRowsIterator(it iter.Iterator[profileRow]) *rowsIterator { + return &rowsIterator{ + Iterator: it, + } +} + +func (r *rowsIterator) At() parquet.Row { + return parquet.Row(r.Iterator.At().row) +} + +type dedupeProfileRowIterator struct { + iter.Iterator[profileRow] + + prevFP model.Fingerprint + prevTimeNanos int64 +} + +func (it *dedupeProfileRowIterator) Next() bool { + for { + if !it.Iterator.Next() { + return false + } + currentProfile := it.Iterator.At() + if it.prevFP == currentProfile.fp && it.prevTimeNanos == currentProfile.timeNanos { + // skip duplicate profile + continue + } + it.prevFP = currentProfile.fp + it.prevTimeNanos = currentProfile.timeNanos + return true + } +} + +func prepareIndexWriter(ctx context.Context, path string, readers []BlockReader) (*index.Writer, error) { + var symbols index.StringIter + indexw, err := index.NewWriter(ctx, path) + if err != nil { + return nil, err + } + for i, r := range readers { + if i == 0 { + symbols = r.Index().Symbols() + } + symbols = tsdb.NewMergedStringIter(symbols, r.Index().Symbols()) + } + + for symbols.Next() { + if err := indexw.AddSymbol(symbols.At()); err != nil { + return nil, errors.Wrap(err, "add symbol") + } + } + if symbols.Err() != nil { + return nil, errors.Wrap(symbols.Err(), "next symbol") + } + + return indexw, nil +} diff --git a/pkg/phlaredb/profile_store.go b/pkg/phlaredb/profile_store.go index 5d21f7590..6a0648b4e 100644 --- a/pkg/phlaredb/profile_store.go +++ b/pkg/phlaredb/profile_store.go @@ -63,6 +63,14 @@ type profileStore struct { flushBufferLbs []phlaremodel.Labels } +func newProfileWriter(writer io.Writer) *parquet.GenericWriter[*schemav1.Profile] { + return parquet.NewGenericWriter[*schemav1.Profile](writer, (&schemav1.ProfilePersister{}).Schema(), + parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), + parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), + parquet.PageBufferSize(3*1024*1024), + ) +} + func newProfileStore(phlarectx context.Context) *profileStore { s := &profileStore{ logger: phlarecontext.Logger(phlarectx), @@ -76,11 +84,7 @@ func newProfileStore(phlarectx context.Context) *profileStore { go s.cutRowGroupLoop() // Initialize writer on /dev/null // TODO: Reuse parquet.Writer beyond life time of the head. - s.writer = parquet.NewGenericWriter[*schemav1.Profile](io.Discard, s.persister.Schema(), - parquet.ColumnPageBuffers(parquet.NewFileBufferPool(os.TempDir(), "phlaredb-parquet-buffers*")), - parquet.CreatedBy("github.com/grafana/phlare/", build.Version, build.Revision), - parquet.PageBufferSize(3*1024*1024), - ) + s.writer = newProfileWriter(io.Discard) return s } diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index 7fbe2288b..e18cf45ec 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -458,3 +458,24 @@ func lessProfileRows(r1, r2 parquet.Row) bool { } return ts1 < ts2 } + +type ProfileRow parquet.Row + +func (p ProfileRow) SeriesIndex() uint32 { + return p[seriesIndexColIndex].Uint32() +} + +func (p ProfileRow) TimeNanos() int64 { + var ts int64 + for i := len(p) - 1; i >= 0; i-- { + if p[i].Column() == timeNanoColIndex { + ts = p[i].Int64() + break + } + } + return ts +} + +func (p ProfileRow) SetSeriesIndex(v uint32) { + p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex) +} diff --git a/pkg/phlaredb/tsdb/builder.go b/pkg/phlaredb/tsdb/builder.go new file mode 100644 index 000000000..0125c8696 --- /dev/null +++ b/pkg/phlaredb/tsdb/builder.go @@ -0,0 +1,21 @@ +package tsdb + +import ( + phlaremodel "github.com/grafana/phlare/pkg/model" + "github.com/prometheus/common/model" +) + +type Series struct { + labels phlaremodel.Labels + fp model.Fingerprint +} + +type Builder struct { + series map[string]Series +} + +func NewBuilder() *Builder { + return &Builder{ + series: map[string]Series{}, + } +} From 271be4884640b4b27d6b62747a4b4f3f0ef66ef4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 6 Jul 2023 17:01:26 +0200 Subject: [PATCH 02/15] Removes unused file --- pkg/phlaredb/tsdb/builder.go | 21 --------------------- 1 file changed, 21 deletions(-) delete mode 100644 pkg/phlaredb/tsdb/builder.go diff --git a/pkg/phlaredb/tsdb/builder.go b/pkg/phlaredb/tsdb/builder.go deleted file mode 100644 index 0125c8696..000000000 --- a/pkg/phlaredb/tsdb/builder.go +++ /dev/null @@ -1,21 +0,0 @@ -package tsdb - -import ( - phlaremodel "github.com/grafana/phlare/pkg/model" - "github.com/prometheus/common/model" -) - -type Series struct { - labels phlaremodel.Labels - fp model.Fingerprint -} - -type Builder struct { - series map[string]Series -} - -func NewBuilder() *Builder { - return &Builder{ - series: map[string]Series{}, - } -} From f2a31504d4fcb661f558b181e235ab20122ef7f8 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 6 Jul 2023 17:05:57 +0200 Subject: [PATCH 03/15] Correct row count --- pkg/phlaredb/compact.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 696659556..2a104502d 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -57,8 +57,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er rowsIt = newSymbolsRewriter(rowsIt) reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) - // todo size of rowgroups. - _, _, err = phlareparquet.CopyAsRowGroups(profileWriter, reader, 1024) + _, _, err = phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { return block.Meta{}, err } From 43d0326ad545ca2b9ebf2bee93ad938849ff4ea0 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jul 2023 01:06:36 +0200 Subject: [PATCH 04/15] WIP: Testing against dev. --- pkg/phlaredb/block/list.go | 108 +++++++++++++++++++++++- pkg/phlaredb/compact.go | 3 + pkg/phlaredb/compact_test.go | 44 ++++++++++ pkg/storegateway/gateway_blocks_http.go | 2 +- 4 files changed, 155 insertions(+), 2 deletions(-) create mode 100644 pkg/phlaredb/compact_test.go diff --git a/pkg/phlaredb/block/list.go b/pkg/phlaredb/block/list.go index c8081fd95..09caf1cca 100644 --- a/pkg/phlaredb/block/list.go +++ b/pkg/phlaredb/block/list.go @@ -1,15 +1,24 @@ package block import ( + "context" + "fmt" "os" "path/filepath" "sort" "time" + "github.com/go-kit/log/level" "github.com/oklog/ulid" + "github.com/thanos-io/objstore" + "github.com/thanos-io/thanos/pkg/block" + "golang.org/x/sync/errgroup" + + phlareobj "github.com/grafana/phlare/pkg/objstore" + "github.com/grafana/phlare/pkg/util" ) -func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) { +func ListBlocks(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) { result := make(map[ulid.ULID]*Meta) entries, err := os.ReadDir(path) if err != nil { @@ -32,6 +41,103 @@ func ListBlock(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) return result, nil } +func IterBlockMetas(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time, fn func(*Meta)) error { + allIDs, err := listAllBlockByPrefixes(ctx, bkt, from, to) + if err != nil { + return err + } + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(128) + + // fetch all meta.json + for _, ids := range allIDs { + for _, id := range ids { + id := id + g.Go(func() error { + r, err := bkt.Get(ctx, id+block.MetaFilename) + if err != nil { + return err + } + + m, err := Read(r) + if err != nil { + return err + } + fn(m) + return nil + }) + } + } + return g.Wait() +} + +func listAllBlockByPrefixes(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time) ([][]string, error) { + // todo: We should cache prefixes listing per tenants. + blockPrefixes, err := blockPrefixesFromTo(from, to, 4) + if err != nil { + return nil, err + } + ids := make([][]string, len(blockPrefixes)) + g, ctx := errgroup.WithContext(ctx) + g.SetLimit(64) + + for i, prefix := range blockPrefixes { + prefix := prefix + i := i + g.Go(func() error { + level.Debug(util.Logger).Log("msg", "listing blocks", "prefix", prefix, "i", i) + prefixIds := []string{} + err := bkt.Iter(ctx, prefix, func(name string) error { + if _, ok := block.IsBlockDir(name); ok { + prefixIds = append(prefixIds, name) + } + return nil + }, objstore.WithoutApendingDirDelim) + if err != nil { + return err + } + ids[i] = prefixIds + return nil + }) + } + if err := g.Wait(); err != nil { + return nil, err + } + return ids, nil +} + +// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is: +// 0: 1114y +// 1: 34.8y +// 2: 1y +// 3: 12.4d +// 4: 9h19m +// TODO: To needs to be adapted based on the MaxBlockDuration. +func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) { + var id ulid.ULID + + if orderOfSplit > 9 { + return nil, fmt.Errorf("order of split must be between 0 and 9") + } + + byteShift := (9 - orderOfSplit) * 5 + + ms := uint64(from.UnixMilli()) >> byteShift + ms = ms << byteShift + for ms <= uint64(to.UnixMilli()) { + if err := id.SetTime(ms); err != nil { + return nil, err + } + prefixes = append(prefixes, id.String()[:orderOfSplit+1]) + + ms = ms >> byteShift + ms += 1 + ms = ms << byteShift + } + + return prefixes, nil +} + func SortBlocks(metas map[ulid.ULID]*Meta) []*Meta { var blocks []*Meta diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 2a104502d..6493f45b8 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -33,6 +33,9 @@ type SymbolReader interface { } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { + if len(src) <= 1 { + return block.Meta{}, errors.New("not enough blocks to compact") + } meta := block.NewMeta() blockPath := filepath.Join(dst, meta.ULID.String()) if err := os.MkdirAll(blockPath, 0o777); err != nil { diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go new file mode 100644 index 000000000..b88306049 --- /dev/null +++ b/pkg/phlaredb/compact_test.go @@ -0,0 +1,44 @@ +package phlaredb + +import ( + "context" + "sync" + "testing" + "time" + + "github.com/stretchr/testify/require" + + "github.com/grafana/phlare/pkg/objstore/client" + "github.com/grafana/phlare/pkg/objstore/providers/gcs" + "github.com/grafana/phlare/pkg/phlaredb/block" +) + +func TestCompact(t *testing.T) { + ctx := context.Background() + bkt, err := client.NewBucket(ctx, client.Config{ + StorageBackendConfig: client.StorageBackendConfig{ + Backend: client.GCS, + GCS: gcs.Config{ + BucketName: "dev-us-central-0-profiles-dev-001-data", + }, + }, + StoragePrefix: "1218/phlaredb/", + }, "test") + require.NoError(t, err) + now := time.Now() + var ( + src []BlockReader + mtx sync.Mutex + ) + + err = block.IterBlockMetas(ctx, bkt, now.Add(-6*time.Hour), now, func(m *block.Meta) { + mtx.Lock() + defer mtx.Unlock() + // todo: meta to blockreader + // src = append(src, NewSingleBlockQuerierFromMeta(ctx, bkt, m)) + }) + require.NoError(t, err) + new, err := Compact(ctx, src, "test/") + require.NoError(t, err) + t.Log(new) +} diff --git a/pkg/storegateway/gateway_blocks_http.go b/pkg/storegateway/gateway_blocks_http.go index 3cd5203d5..bce862c73 100644 --- a/pkg/storegateway/gateway_blocks_http.go +++ b/pkg/storegateway/gateway_blocks_http.go @@ -80,7 +80,7 @@ func (s *StoreGateway) BlocksHandler(w http.ResponseWriter, req *http.Request) { } } - metasMap, err := block.ListBlock(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{}) + metasMap, err := block.ListBlocks(filepath.Join(s.gatewayCfg.BucketStoreConfig.SyncDir, tenantID), time.Time{}) if err != nil { util.WriteTextResponse(w, fmt.Sprintf("Failed to read block metadata: %s", err)) return From df94d87674e15825674196506bb6d3fb13e14f7b Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Fri, 7 Jul 2023 18:06:26 +0200 Subject: [PATCH 05/15] WIP: Testing against dev. --- pkg/phlaredb/block_querier.go | 8 ++++++++ pkg/phlaredb/compact_test.go | 14 +++++++++----- 2 files changed, 17 insertions(+), 5 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 989e95a1a..5984e548d 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -429,6 +429,14 @@ func newStacktraceResolverV2(bucketReader phlareobj.Bucket) StacktraceDB { } } +func (b *singleBlockQuerier) Profiles() []parquet.RowGroup { + return b.profiles.file.RowGroups() +} + +func (b *singleBlockQuerier) Index() IndexReader { + return b.index +} + func (b *singleBlockQuerier) Close() error { b.openLock.Lock() defer func() { diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index b88306049..ae8ed1609 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -14,6 +14,7 @@ import ( ) func TestCompact(t *testing.T) { + t.TempDir() ctx := context.Background() bkt, err := client.NewBucket(ctx, client.Config{ StorageBackendConfig: client.StorageBackendConfig{ @@ -31,14 +32,17 @@ func TestCompact(t *testing.T) { mtx sync.Mutex ) - err = block.IterBlockMetas(ctx, bkt, now.Add(-6*time.Hour), now, func(m *block.Meta) { + err = block.IterBlockMetas(ctx, bkt, now.Add(-24*time.Hour), now, func(m *block.Meta) { mtx.Lock() defer mtx.Unlock() - // todo: meta to blockreader - // src = append(src, NewSingleBlockQuerierFromMeta(ctx, bkt, m)) + b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) + err := b.Open(ctx) + require.NoError(t, err) + src = append(src, b) }) require.NoError(t, err) - new, err := Compact(ctx, src, "test/") + dst := t.TempDir() + new, err := Compact(ctx, src, dst) require.NoError(t, err) - t.Log(new) + t.Log(new, dst) } From 858d707c134ba0f25d1bf8a21c663ab024e88170 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 11 Jul 2023 15:12:07 +0200 Subject: [PATCH 06/15] Fixes Profiles Iteration with Labels --- pkg/parquet/row_reader.go | 1 + pkg/phlaredb/compact.go | 49 ++++++++++++++++------ pkg/phlaredb/compact_test.go | 80 ++++++++++++++++++++++++++++++++++++ 3 files changed, 117 insertions(+), 13 deletions(-) diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 22cb2d7a4..80379eefc 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -116,6 +116,7 @@ func (r *BufferedRowReaderIterator) Next() bool { return true } + // todo this seems to do allocations on every call since cap is always smaller if cap(r.buff) < r.bufferSize { r.buff = make([]parquet.Row, r.bufferSize) } diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 6493f45b8..4dd37e775 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -55,7 +55,10 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er // todo new symbdb - rowsIt := newMergeRowProfileIterator(src) + rowsIt, err := newMergeRowProfileIterator(src) + if err != nil { + return block.Meta{}, err + } rowsIt = newSeriesRewriter(rowsIt, indexw) rowsIt = newSymbolsRewriter(rowsIt) reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) @@ -90,23 +93,30 @@ type profileRow struct { } type profileRowIterator struct { - profiles iter.Iterator[parquet.Row] - index IndexReader - err error + profiles iter.Iterator[parquet.Row] + index IndexReader + allPostings index.Postings + err error currentRow profileRow chunks []index.ChunkMeta } -func newProfileRowIterator(profiles iter.Iterator[parquet.Row], idx IndexReader) *profileRowIterator { +func newProfileRowIterator(reader parquet.RowReader, idx IndexReader) (*profileRowIterator, error) { + k, v := index.AllPostingsKey() + allPostings, err := idx.Postings(k, nil, v) + if err != nil { + return nil, err + } return &profileRowIterator{ - profiles: profiles, - index: idx, + profiles: phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + index: idx, + allPostings: allPostings, currentRow: profileRow{ seriesRef: math.MaxUint32, }, chunks: make([]index.ChunkMeta, 1), - } + }, nil } func (p *profileRowIterator) At() profileRow { @@ -125,7 +135,16 @@ func (p *profileRowIterator) Next() bool { return true } p.currentRow.seriesRef = seriesIndex - fp, err := p.index.Series(storage.SeriesRef(p.currentRow.seriesRef), &p.currentRow.labels, &p.chunks) + if !p.allPostings.Next() { + if err := p.allPostings.Err(); err != nil { + p.err = err + return false + } + p.err = errors.New("unexpected end of postings") + return false + } + + fp, err := p.index.Series(p.allPostings.At(), &p.currentRow.labels, &p.chunks) if err != nil { p.err = err return false @@ -145,15 +164,19 @@ func (p *profileRowIterator) Close() error { return p.profiles.Close() } -func newMergeRowProfileIterator(src []BlockReader) iter.Iterator[profileRow] { +func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], error) { its := make([]iter.Iterator[profileRow], len(src)) for i, s := range src { // todo: may be we could merge rowgroups in parallel but that requires locking. reader := parquet.MultiRowGroup(s.Profiles()...).Rows() - its[i] = newProfileRowIterator( - phlareparquet.NewBufferedRowReaderIterator(reader, 1024), + it, err := newProfileRowIterator( + reader, s.Index(), ) + if err != nil { + return nil, err + } + its[i] = it } return &dedupeProfileRowIterator{ Iterator: iter.NewTreeIterator(loser.New( @@ -178,7 +201,7 @@ func newMergeRowProfileIterator(src []BlockReader) iter.Iterator[profileRow] { }, func(it iter.Iterator[profileRow]) { _ = it.Close() }, )), - } + }, nil } type symbolsRewriter struct { diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index ae8ed1609..8ed790ebe 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -2,17 +2,33 @@ package phlaredb import ( "context" + "net/http" "sync" "testing" "time" + _ "net/http/pprof" + + "github.com/prometheus/common/model" + "github.com/prometheus/prometheus/storage" + "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" + phlaremodel "github.com/grafana/phlare/pkg/model" "github.com/grafana/phlare/pkg/objstore/client" "github.com/grafana/phlare/pkg/objstore/providers/gcs" "github.com/grafana/phlare/pkg/phlaredb/block" + schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" + "github.com/grafana/phlare/pkg/phlaredb/tsdb/index" ) +func init() { + go func() { + _ = http.ListenAndServe("localhost:6060", nil) + }() +} + func TestCompact(t *testing.T) { t.TempDir() ctx := context.Background() @@ -35,6 +51,10 @@ func TestCompact(t *testing.T) { err = block.IterBlockMetas(ctx, bkt, now.Add(-24*time.Hour), now, func(m *block.Meta) { mtx.Lock() defer mtx.Unlock() + // only test on the 3 latest blocks. + if len(src) >= 3 { + return + } b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) err := b.Open(ctx) require.NoError(t, err) @@ -46,3 +66,63 @@ func TestCompact(t *testing.T) { require.NoError(t, err) t.Log(new, dst) } + +func TestProfileRowIterator(t *testing.T) { + filePath := t.TempDir() + "/index.tsdb" + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) + idxr, err := index.NewFileReader(filePath) + require.NoError(t, err) + + it, err := newProfileRowIterator(schemav1.NewInMemoryProfilesRowReader( + []schemav1.InMemoryProfile{ + {SeriesIndex: 0, TimeNanos: 1}, + {SeriesIndex: 1, TimeNanos: 2}, + {SeriesIndex: 2, TimeNanos: 3}, + }, + ), idxr) + require.NoError(t, err) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + require.Equal(t, it.At().timeNanos, int64(1)) + require.Equal(t, it.At().seriesRef, uint32(0)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + require.Equal(t, it.At().timeNanos, int64(2)) + require.Equal(t, it.At().seriesRef, uint32(1)) + + assert.True(t, it.Next()) + require.Equal(t, it.At().labels, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.Equal(t, it.At().timeNanos, int64(3)) + require.Equal(t, it.At().seriesRef, uint32(2)) + + assert.False(t, it.Next()) + require.NoError(t, it.Err()) + require.NoError(t, it.Close()) +} + +func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Labels) { + t.Helper() + require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) +} From eb76c58f1a1459d0bbcbeb4eca72094b1d6bafc5 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 11 Jul 2023 15:33:21 +0200 Subject: [PATCH 07/15] Correctly saves the buffer capacity in BufferedRowReaderIterator --- pkg/parquet/row_reader.go | 19 +++++++++++-------- pkg/parquet/row_reader_test.go | 22 ++++++++++++++++++++++ 2 files changed, 33 insertions(+), 8 deletions(-) diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 893e3036c..56ee26b17 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -84,7 +84,10 @@ func (s *MergeRowReader) ReadRows(rows []parquet.Row) (int, error) { } type BufferedRowReaderIterator struct { - reader parquet.RowReader + reader parquet.RowReader + bufferedRows []parquet.Row + + // buff keep the original slice capacity to avoid allocations buff []parquet.Row bufferSize int err error @@ -100,16 +103,16 @@ func NewBufferedRowReaderIterator(reader parquet.RowReader, bufferSize int) *Buf } func (r *BufferedRowReaderIterator) Next() bool { - if len(r.buff) > 1 { - r.buff = r.buff[1:] + if len(r.bufferedRows) > 1 { + r.bufferedRows = r.bufferedRows[1:] return true } if cap(r.buff) < r.bufferSize { r.buff = make([]parquet.Row, r.bufferSize) } - r.buff = r.buff[:r.bufferSize] - n, err := r.reader.ReadRows(r.buff) + r.bufferedRows = r.buff[:r.bufferSize] + n, err := r.reader.ReadRows(r.bufferedRows) if err != nil && err != io.EOF { r.err = err return false @@ -118,15 +121,15 @@ func (r *BufferedRowReaderIterator) Next() bool { return false } - r.buff = r.buff[:n] + r.bufferedRows = r.bufferedRows[:n] return true } func (r *BufferedRowReaderIterator) At() parquet.Row { - if len(r.buff) == 0 { + if len(r.bufferedRows) == 0 { return parquet.Row{} } - return r.buff[0] + return r.bufferedRows[0] } func (r *BufferedRowReaderIterator) Err() error { diff --git a/pkg/parquet/row_reader_test.go b/pkg/parquet/row_reader_test.go index 075d44bc7..ba92cc4ab 100644 --- a/pkg/parquet/row_reader_test.go +++ b/pkg/parquet/row_reader_test.go @@ -141,3 +141,25 @@ func TestNewMergeRowReader(t *testing.T) { }) } } + +type SomeRow struct { + Col1 int +} + +func BenchmarkBufferedRowReader(b *testing.B) { + buff := parquet.NewGenericBuffer[SomeRow]() + for i := 0; i < 1000000; i++ { + _, err := buff.Write([]SomeRow{{Col1: (i)}}) + if err != nil { + b.Fatal(err) + } + } + reader := NewBufferedRowReaderIterator(buff.Rows(), 100) + defer reader.Close() + b.ResetTimer() + for i := 0; i < b.N; i++ { + for reader.Next() { + _ = reader.At() + } + } +} From 206f6361cf004dc8e537cf75cd60489f241e10e6 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Tue, 11 Jul 2023 16:02:06 +0200 Subject: [PATCH 08/15] remove files --- afte.txt | 12 ------------ before.txt | 12 ------------ 2 files changed, 24 deletions(-) delete mode 100644 afte.txt delete mode 100644 before.txt diff --git a/afte.txt b/afte.txt deleted file mode 100644 index b8b2f0c44..000000000 --- a/afte.txt +++ /dev/null @@ -1,12 +0,0 @@ -goos: darwin -goarch: amd64 -pkg: github.com/grafana/phlare/pkg/parquet -cpu: Intel(R) Core(TM) i9-9900K CPU @ 3.60GHz -BenchmarkBufferedRowReader -BenchmarkBufferedRowReader-16 3533784 308.4 ns/op 0 B/op 0 allocs/op -BenchmarkBufferedRowReader-16 3675910 302.3 ns/op 0 B/op 0 allocs/op -BenchmarkBufferedRowReader-16 3494991 302.9 ns/op 0 B/op 0 allocs/op -BenchmarkBufferedRowReader-16 3506053 298.5 ns/op 0 B/op 0 allocs/op -BenchmarkBufferedRowReader-16 3651944 316.2 ns/op 0 B/op 0 allocs/op -PASS -ok github.com/grafana/phlare/pkg/parquet 7.258s diff --git a/before.txt b/before.txt deleted file mode 100644 index e65947d8a..000000000 --- a/before.txt +++ /dev/null @@ -1,12 +0,0 @@ -goos: darwin -goarch: amd64 -pkg: github.com/grafana/phlare/pkg/parquet -cpu: Intel(R) Core(TM) i9-9900K CPU @ 3.60GHz -BenchmarkBufferedRowReader -BenchmarkBufferedRowReader-16 1656484 807.3 ns/op 2718 B/op 1 allocs/op -BenchmarkBufferedRowReader-16 1669701 674.6 ns/op 2718 B/op 1 allocs/op -BenchmarkBufferedRowReader-16 1710991 679.3 ns/op 2717 B/op 1 allocs/op -BenchmarkBufferedRowReader-16 1737392 692.4 ns/op 2717 B/op 1 allocs/op -BenchmarkBufferedRowReader-16 1665608 690.9 ns/op 2718 B/op 1 allocs/op -PASS -ok github.com/grafana/phlare/pkg/parquet 14.778s From eca7ab61e9c7bf2c574042bfa37b9bf606e6ba24 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Jul 2023 10:35:48 +0200 Subject: [PATCH 09/15] Fixes a bug and test compact locally --- pkg/parquet/row_reader.go | 5 ++- pkg/phlaredb/compact.go | 3 +- pkg/phlaredb/compact_test.go | 80 +++++++++++++++++++++++++++++++----- 3 files changed, 75 insertions(+), 13 deletions(-) diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 73ad93531..850ee3d4d 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -88,7 +88,10 @@ func (it *IteratorRowReader) ReadRows(rows []parquet.Row) (int, error) { } return n, io.EOF } - rows[n] = it.At() + rows[n] = rows[n][:0] + for _, c := range it.At() { + rows[n] = append(rows[n], c.Clone()) + } n++ } return n, nil diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 4dd37e775..d1c3f24e3 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -63,7 +63,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er rowsIt = newSymbolsRewriter(rowsIt) reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) - _, _, err = phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) + total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { return block.Meta{}, err } @@ -77,6 +77,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er return block.Meta{}, err } // todo: block meta + meta.Stats.NumProfiles = total if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err } diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index 8ed790ebe..e4d840d49 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -3,6 +3,7 @@ package phlaredb import ( "context" "net/http" + "sort" "sync" "testing" "time" @@ -13,10 +14,13 @@ import ( "github.com/prometheus/prometheus/storage" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" + "golang.org/x/sync/errgroup" typesv1 "github.com/grafana/phlare/api/gen/proto/go/types/v1" phlaremodel "github.com/grafana/phlare/pkg/model" + phlareobj "github.com/grafana/phlare/pkg/objstore" "github.com/grafana/phlare/pkg/objstore/client" + "github.com/grafana/phlare/pkg/objstore/providers/filesystem" "github.com/grafana/phlare/pkg/objstore/providers/gcs" "github.com/grafana/phlare/pkg/phlaredb/block" schemav1 "github.com/grafana/phlare/pkg/phlaredb/schemas/v1" @@ -44,27 +48,81 @@ func TestCompact(t *testing.T) { require.NoError(t, err) now := time.Now() var ( - src []BlockReader - mtx sync.Mutex + meta []*block.Meta + mtx sync.Mutex ) err = block.IterBlockMetas(ctx, bkt, now.Add(-24*time.Hour), now, func(m *block.Meta) { mtx.Lock() defer mtx.Unlock() - // only test on the 3 latest blocks. - if len(src) >= 3 { - return - } - b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) - err := b.Open(ctx) - require.NoError(t, err) - src = append(src, b) + meta = append(meta, m) }) require.NoError(t, err) dst := t.TempDir() - new, err := Compact(ctx, src, dst) + + sort.Slice(meta, func(i, j int) bool { + return meta[i].MinTime.Before(meta[j].MinTime) + }) + + // only test on the 4 latest blocks. + meta = meta[len(meta)-4:] + testCompact(t, meta, bkt, dst) +} + +func TestCompactLocal(t *testing.T) { + t.TempDir() + ctx := context.Background() + bkt, err := client.NewBucket(ctx, client.Config{ + StorageBackendConfig: client.StorageBackendConfig{ + Backend: client.Filesystem, + Filesystem: filesystem.Config{ + Directory: "/Users/cyril/work/phlare-data/", + }, + }, + StoragePrefix: "", + }, "test") + require.NoError(t, err) + var metas []*block.Meta + + metaMap, err := block.ListBlocks("/Users/cyril/work/phlare-data/", time.Time{}) + require.NoError(t, err) + for _, m := range metaMap { + metas = append(metas, m) + } + dst := t.TempDir() + testCompact(t, metas, bkt, dst) +} + +func testCompact(t *testing.T, metas []*block.Meta, bkt phlareobj.Bucket, dst string) { + t.Helper() + g, ctx := errgroup.WithContext(context.Background()) + var src []BlockReader + now := time.Now() + for i, m := range metas { + t.Log("src block(#", i, ")", + "ID", m.ULID.String(), + "minTime", m.MinTime.Time().Format(time.RFC3339Nano), + "maxTime", m.MaxTime.Time().Format(time.RFC3339Nano), + "numSeries", m.Stats.NumSeries, + "numProfiles", m.Stats.NumProfiles, + "numSamples", m.Stats.NumSamples) + b := NewSingleBlockQuerierFromMeta(ctx, bkt, m) + g.Go(func() error { + return b.Open(ctx) + }) + + src = append(src, b) + } + + require.NoError(t, g.Wait()) + + new, err := Compact(context.Background(), src, dst) require.NoError(t, err) t.Log(new, dst) + t.Log("Compaction duration", time.Since(now)) + t.Log("numSeries", new.Stats.NumSeries, + "numProfiles", new.Stats.NumProfiles, + "numSamples", new.Stats.NumSamples) } func TestProfileRowIterator(t *testing.T) { From ff89057cccca752da58d7ffc78483e108f628da1 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Jul 2023 11:11:39 +0200 Subject: [PATCH 10/15] add tests instructions --- pkg/parquet/row_reader.go | 2 +- pkg/phlaredb/compact_test.go | 7 +++++++ 2 files changed, 8 insertions(+), 1 deletion(-) diff --git a/pkg/parquet/row_reader.go b/pkg/parquet/row_reader.go index 850ee3d4d..e5db329ee 100644 --- a/pkg/parquet/row_reader.go +++ b/pkg/parquet/row_reader.go @@ -3,9 +3,9 @@ package parquet import ( "io" + "github.com/grafana/dskit/runutil" "github.com/segmentio/parquet-go" - "github.com/grafana/dskit/runutil" "github.com/grafana/phlare/pkg/iter" "github.com/grafana/phlare/pkg/util" "github.com/grafana/phlare/pkg/util/loser" diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index e4d840d49..213ddac24 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -69,6 +69,13 @@ func TestCompact(t *testing.T) { testCompact(t, meta, bkt, dst) } +// to download the blocks: +// gsutil -m cp -r \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H53WJEAB43S3GJ26XMSNRSJA" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H5454JBEV80V2J7CKYHPCBG8" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H54553SYKH43FNJN5BVR1M2H" \ +// "gs://dev-us-central-0-profiles-dev-001-data/1218/phlaredb/01H5457Q89WYYH9FCK8PZ6XG75" \ +// . func TestCompactLocal(t *testing.T) { t.TempDir() ctx := context.Background() From 2e3aa7dc2c887e6d34a66d788fc3d49f6a742ef3 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Jul 2023 13:08:14 +0200 Subject: [PATCH 11/15] Refactor IterBlockMetas --- pkg/phlaredb/block/list.go | 6 ++ pkg/storegateway/bucket.go | 121 +++++-------------------------------- 2 files changed, 20 insertions(+), 107 deletions(-) diff --git a/pkg/phlaredb/block/list.go b/pkg/phlaredb/block/list.go index 09caf1cca..abde0100c 100644 --- a/pkg/phlaredb/block/list.go +++ b/pkg/phlaredb/block/list.go @@ -41,6 +41,12 @@ func ListBlocks(path string, ulidMinTime time.Time) (map[ulid.ULID]*Meta, error) return result, nil } +// IterBlockMetas iterates over all block metas in the given time range. +// It calls the given function for each block meta. +// It returns the first error returned by the function. +// It returns nil if all calls succeed. +// The function is called concurrently. +// Currently doesn't work with filesystem bucket. func IterBlockMetas(ctx context.Context, bkt phlareobj.Bucket, from, to time.Time, fn func(*Meta)) error { allIDs, err := listAllBlockByPrefixes(ctx, bkt, from, to) if err != nil { diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 4a2c89034..3212fed91 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -2,7 +2,6 @@ package storegateway import ( "context" - "fmt" "os" "path" "path/filepath" @@ -17,8 +16,6 @@ import ( "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/samber/lo" - "github.com/thanos-io/objstore" - "golang.org/x/sync/errgroup" phlareobj "github.com/grafana/phlare/pkg/objstore" "github.com/grafana/phlare/pkg/phlaredb" @@ -258,48 +255,25 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block to = time.Now() from = to.Add(-time.Hour * 24 * 31) // todo make this configurable ) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(128) - allIDs, err := s.listAllBlockByPrefixes(ctx, from, to) - if err != nil { - return nil, err - } - totalMeta := 0 - for _, ids := range allIDs { - totalMeta += len(ids) - } - metas := make([]*block.Meta, totalMeta) - idx := 0 + var ( + metas []*block.Meta + mtx sync.Mutex + ) + start := time.Now() - level.Debug(s.logger).Log("msg", "fetching blocks meta", "total", totalMeta) + level.Debug(s.logger).Log("msg", "fetching blocks meta", "from", from, "to", to) defer func() { - level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", totalMeta, "elapsed", time.Since(start)) + level.Debug(s.logger).Log("msg", "fetched blocks meta", "total", len(metas), "elapsed", time.Since(start)) }() - // fetch all meta.json - for _, ids := range allIDs { - for _, id := range ids { - id := id - currentIdx := idx - idx++ - g.Go(func() error { - r, err := s.bucket.Get(ctx, id+block.MetaFilename) - if err != nil { - return err - } - - m, err := block.Read(r) - if err != nil { - return err - } - metas[currentIdx] = m - return nil - }) - } - } - if err := g.Wait(); err != nil { - return nil, err + if err := block.IterBlockMetas(ctx, s.bucket, from, to, func(m *block.Meta) { + mtx.Lock() + defer mtx.Unlock() + metas = append(metas, m) + }); err != nil { + return nil, errors.Wrap(err, "iter block metas") } + metaMap := lo.SliceToMap(metas, func(item *block.Meta) (ulid.ULID, *block.Meta) { return item.ULID, item }) @@ -316,73 +290,6 @@ func (s *BucketStore) fetchBlocksMeta(ctx context.Context) (map[ulid.ULID]*block return metaMap, nil } -func (s *BucketStore) listAllBlockByPrefixes(ctx context.Context, from, to time.Time) ([][]string, error) { - // todo: We should cache prefixes listing per tenants. - blockPrefixes, err := blockPrefixesFromTo(from, to, 4) - if err != nil { - return nil, err - } - ids := make([][]string, len(blockPrefixes)) - g, ctx := errgroup.WithContext(ctx) - g.SetLimit(64) - - for i, prefix := range blockPrefixes { - prefix := prefix - i := i - g.Go(func() error { - level.Debug(s.logger).Log("msg", "listing blocks", "prefix", prefix, "i", i) - prefixIds := []string{} - err := s.bucket.Iter(ctx, prefix, func(name string) error { - if _, ok := block.IsBlockDir(name); ok { - prefixIds = append(prefixIds, name) - } - return nil - }, objstore.WithoutApendingDirDelim) - if err != nil { - return err - } - ids[i] = prefixIds - return nil - }) - } - if err := g.Wait(); err != nil { - return nil, err - } - return ids, nil -} - -// orderOfSplit is the number of bytes of the ulid id used for the split. The duration of the split is: -// 0: 1114y -// 1: 34.8y -// 2: 1y -// 3: 12.4d -// 4: 9h19m -// TODO: To needs to be adapted based on the MaxBlockDuration. -func blockPrefixesFromTo(from, to time.Time, orderOfSplit uint8) (prefixes []string, err error) { - var id ulid.ULID - - if orderOfSplit > 9 { - return nil, fmt.Errorf("order of split must be between 0 and 9") - } - - byteShift := (9 - orderOfSplit) * 5 - - ms := uint64(from.UnixMilli()) >> byteShift - ms = ms << byteShift - for ms <= uint64(to.UnixMilli()) { - if err := id.SetTime(ms); err != nil { - return nil, err - } - prefixes = append(prefixes, id.String()[:orderOfSplit+1]) - - ms = ms >> byteShift - ms += 1 - ms = ms << byteShift - } - - return prefixes, nil -} - // bucketBlockSet holds all blocks. type bucketBlockSet struct { mtx sync.RWMutex From 75655470d1e6f11a1a12e9aa4f263cdc3b24d172 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Jul 2023 17:41:49 +0200 Subject: [PATCH 12/15] Add more meta informations and rewrite stacktraceIDs --- pkg/phlaredb/block_querier.go | 11 +- pkg/phlaredb/compact.go | 136 +++++++++++++++++++++-- pkg/phlaredb/schemas/v1/profiles.go | 31 +++++- pkg/phlaredb/schemas/v1/profiles_test.go | 60 ++++++++++ 4 files changed, 223 insertions(+), 15 deletions(-) diff --git a/pkg/phlaredb/block_querier.go b/pkg/phlaredb/block_querier.go index 39dcd8042..e431f14f5 100644 --- a/pkg/phlaredb/block_querier.go +++ b/pkg/phlaredb/block_querier.go @@ -435,6 +435,13 @@ func (b *singleBlockQuerier) Index() IndexReader { return b.index } +func (b *singleBlockQuerier) Meta() block.Meta { + if b.meta == nil { + return block.Meta{} + } + return *b.meta +} + func (b *singleBlockQuerier) Close() error { b.openLock.Lock() defer func() { @@ -941,9 +948,7 @@ func (b *singleBlockQuerier) SelectMatchingProfiles(ctx context.Context, params } } - var ( - buf [][]parquet.Value - ) + var buf [][]parquet.Value pIt := query.NewBinaryJoinIterator( 0, diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index d1c3f24e3..1469faaa1 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -6,6 +6,7 @@ import ( "os" "path/filepath" + "github.com/oklog/ulid" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" @@ -23,20 +24,17 @@ import ( ) type BlockReader interface { + Meta() block.Meta Profiles() []parquet.RowGroup Index() IndexReader - // Symbols() SymbolReader -} - -type SymbolReader interface { - // todo + // todo symbdb } func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { if len(src) <= 1 { return block.Meta{}, errors.New("not enough blocks to compact") } - meta := block.NewMeta() + meta := compactedMeta(src) blockPath := filepath.Join(dst, meta.ULID.String()) if err := os.MkdirAll(blockPath, 0o777); err != nil { return block.Meta{}, err @@ -59,9 +57,9 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er if err != nil { return block.Meta{}, err } - rowsIt = newSeriesRewriter(rowsIt, indexw) - rowsIt = newSymbolsRewriter(rowsIt) - reader := phlareparquet.NewIteratorRowReader(newRowsIterator(rowsIt)) + seriesRewriter := newSeriesRewriter(rowsIt, indexw) + symbolsRewriter := newSymbolsRewriter(seriesRewriter) + reader := phlareparquet.NewIteratorRowReader(newRowsIterator(symbolsRewriter)) total, _, err := phlareparquet.CopyAsRowGroups(profileWriter, reader, defaultParquetConfig.MaxBufferRowCount) if err != nil { @@ -78,12 +76,61 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er } // todo: block meta meta.Stats.NumProfiles = total + meta.Stats.NumSeries = seriesRewriter.NumSeries() + meta.Stats.NumSamples = symbolsRewriter.NumSamples() + if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err } return *meta, nil } +func compactedMeta(src []BlockReader) *block.Meta { + meta := block.NewMeta() + highestCompactionLevel := 0 + ulids := make([]ulid.ULID, len(src)) + parents := make([]tsdb.BlockDesc, len(src)) + minTime, maxTime := model.Latest, model.Earliest + labels := make(map[string]string) + for _, b := range src { + if b.Meta().Compaction.Level > highestCompactionLevel { + highestCompactionLevel = b.Meta().Compaction.Level + } + ulids = append(ulids, b.Meta().ULID) + parents = append(parents, tsdb.BlockDesc{ + ULID: b.Meta().ULID, + MinTime: int64(b.Meta().MinTime), + MaxTime: int64(b.Meta().MaxTime), + }) + if b.Meta().MinTime < minTime { + minTime = b.Meta().MinTime + } + if b.Meta().MaxTime > maxTime { + maxTime = b.Meta().MaxTime + } + for k, v := range b.Meta().Labels { + if k == block.HostnameLabel { + continue + } + labels[k] = v + } + } + if hostname, err := os.Hostname(); err == nil { + labels[block.HostnameLabel] = hostname + } + meta.Source = block.CompactorSource + meta.Compaction = tsdb.BlockMetaCompaction{ + Deletable: meta.Stats.NumSamples == 0, + Level: highestCompactionLevel + 1, + Sources: ulids, + Parents: parents, + } + meta.MaxTime = maxTime + meta.MinTime = minTime + meta.Labels = labels + return meta +} + type profileRow struct { timeNanos int64 @@ -205,14 +252,77 @@ func newMergeRowProfileIterator(src []BlockReader) (iter.Iterator[profileRow], e }, nil } +type noopStacktraceRewriter struct{} + +func (noopStacktraceRewriter) RewriteStacktraces(src, dst []uint32) error { + copy(dst, src) + return nil +} + +type StacktraceRewriter interface { + RewriteStacktraces(src, dst []uint32) error +} + type symbolsRewriter struct { iter.Iterator[profileRow] + err error + + rewriter StacktraceRewriter + src, dst []uint32 + numSamples uint64 } // todo remap symbols & ingest symbols func newSymbolsRewriter(it iter.Iterator[profileRow]) *symbolsRewriter { return &symbolsRewriter{ Iterator: it, + rewriter: noopStacktraceRewriter{}, + } +} + +func (s *symbolsRewriter) NumSamples() uint64 { + return s.numSamples +} + +func (s *symbolsRewriter) Next() bool { + if !s.Iterator.Next() { + return false + } + var err error + s.Iterator.At().row.ForStacktraceIDsValues(func(values []parquet.Value) { + s.numSamples += uint64(len(values)) + s.loadStacktracesID(values) + err = s.rewriter.RewriteStacktraces(s.src, s.dst) + if err != nil { + return + } + for i, v := range values { + values[i] = parquet.Int64Value(int64(s.dst[i])).Level(v.RepetitionLevel(), v.DefinitionLevel(), v.Column()) + } + }) + if err != nil { + s.err = err + return false + } + return true +} + +func (s *symbolsRewriter) Err() error { + if s.err != nil { + return s.err + } + return s.Iterator.Err() +} + +func (s *symbolsRewriter) loadStacktracesID(values []parquet.Value) { + if cap(s.src) < len(values) { + s.src = make([]uint32, len(values)*2) + s.dst = make([]uint32, len(values)*2) + } + s.src = s.src[:len(values)] + s.dst = s.dst[:len(values)] + for i := range values { + s.src[i] = values[i].Uint32() } } @@ -226,6 +336,8 @@ type seriesRewriter struct { previousFp model.Fingerprint currentChunkMeta index.ChunkMeta err error + + numSeries uint64 } func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seriesRewriter { @@ -235,6 +347,10 @@ func newSeriesRewriter(it iter.Iterator[profileRow], indexw *index.Writer) *seri } } +func (s *seriesRewriter) NumSeries() uint64 { + return s.numSeries +} + func (s *seriesRewriter) Next() bool { if !s.Iterator.Next() { if s.previousFp != 0 { @@ -242,6 +358,7 @@ func (s *seriesRewriter) Next() bool { s.err = err return false } + s.numSeries++ } return false } @@ -253,6 +370,7 @@ func (s *seriesRewriter) Next() bool { s.err = err return false } + s.numSeries++ } s.seriesRef++ s.labels = currentProfile.labels.Clone() diff --git a/pkg/phlaredb/schemas/v1/profiles.go b/pkg/phlaredb/schemas/v1/profiles.go index e18cf45ec..c3806ced9 100644 --- a/pkg/phlaredb/schemas/v1/profiles.go +++ b/pkg/phlaredb/schemas/v1/profiles.go @@ -42,9 +42,10 @@ var ( phlareparquet.NewGroupField("DefaultSampleType", parquet.Optional(parquet.Int(64))), }) - maxProfileRow parquet.Row - seriesIndexColIndex int - timeNanoColIndex int + maxProfileRow parquet.Row + seriesIndexColIndex int + stacktraceIDColIndex int + timeNanoColIndex int ) func init() { @@ -62,6 +63,11 @@ func init() { panic(fmt.Errorf("TimeNanos column not found")) } timeNanoColIndex = timeCol.ColumnIndex + stacktraceIDCol, ok := profilesSchema.Lookup("Samples", "list", "element", "StacktraceID") + if !ok { + panic(fmt.Errorf("StacktraceID column not found")) + } + stacktraceIDColIndex = stacktraceIDCol.ColumnIndex } type Sample struct { @@ -479,3 +485,22 @@ func (p ProfileRow) TimeNanos() int64 { func (p ProfileRow) SetSeriesIndex(v uint32) { p[seriesIndexColIndex] = parquet.Int32Value(int32(v)).Level(0, 0, seriesIndexColIndex) } + +func (p ProfileRow) ForStacktraceIDsValues(fn func([]parquet.Value)) { + start := -1 + var i int + for i = 0; i < len(p); i++ { + col := p[i].Column() + if col == stacktraceIDColIndex && p[i].DefinitionLevel() == 1 { + if start == -1 { + start = i + } + } + if col > stacktraceIDColIndex { + break + } + } + if start != -1 { + fn(p[start:i]) + } +} diff --git a/pkg/phlaredb/schemas/v1/profiles_test.go b/pkg/phlaredb/schemas/v1/profiles_test.go index ec94ffd2b..5de8046a7 100644 --- a/pkg/phlaredb/schemas/v1/profiles_test.go +++ b/pkg/phlaredb/schemas/v1/profiles_test.go @@ -207,6 +207,66 @@ func TestLessProfileRows(t *testing.T) { } } +func TestProfileRowStacktraceIDs(t *testing.T) { + for _, tc := range []struct { + name string + expected []uint32 + profile InMemoryProfile + }{ + {"empty", nil, InMemoryProfile{}}, + {"one sample", []uint32{1}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{StacktraceIDs: []uint32{1}, Values: []uint64{1}}, + }}, + {"many", []uint32{1, 1, 2, 3, 4}, InMemoryProfile{ + SeriesIndex: 1, + StacktracePartition: 2, + TotalValue: 3, + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }}, + } { + tc := tc + t.Run(tc.name, func(t *testing.T) { + rows := generateProfileRow([]InMemoryProfile{tc.profile}) + var ids []uint32 + ProfileRow(rows[0]).ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, tc.expected, ids) + }) + } +} + +func TestProfileRowMutateValues(t *testing.T) { + row := ProfileRow(generateProfileRow([]InMemoryProfile{ + { + Samples: Samples{ + StacktraceIDs: []uint32{1, 1, 2, 3, 4}, + Values: []uint64{4, 2, 4, 5, 2}, + }, + }, + })[0]) + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for i := range values { + values[i] = parquet.Int32Value(1).Level(0, 1, values[i].Column()) + } + }) + var ids []uint32 + row.ForStacktraceIDsValues(func(values []parquet.Value) { + for _, v := range values { + ids = append(ids, v.Uint32()) + } + }) + require.Equal(t, []uint32{1, 1, 1, 1, 1}, ids) +} + func BenchmarkProfileRows(b *testing.B) { a := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 1}})[0] a1 := generateProfileRow([]InMemoryProfile{{SeriesIndex: 1, TimeNanos: 2}})[0] From 465ab18a6c473de40f4337bf908d9755a681f2e1 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Wed, 12 Jul 2023 17:43:44 +0200 Subject: [PATCH 13/15] nit todo --- pkg/phlaredb/compact.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 1469faaa1..7d1f57370 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -74,7 +74,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er if err := profileWriter.Close(); err != nil { return block.Meta{}, err } - // todo: block meta + // todo: block meta files. meta.Stats.NumProfiles = total meta.Stats.NumSeries = seriesRewriter.NumSeries() meta.Stats.NumSamples = symbolsRewriter.NumSamples() From be3e6d8afadc2fb9d26edbd96f30f333a26a7ab4 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Thu, 13 Jul 2023 07:53:09 +0200 Subject: [PATCH 14/15] Refactoring code --- pkg/phlaredb/compact.go | 95 ++++++++++++++++++++++++++++++++--------- 1 file changed, 74 insertions(+), 21 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 7d1f57370..2249f50c0 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -2,11 +2,14 @@ package phlaredb import ( "context" + "io/fs" "math" "os" "path/filepath" + "strings" "github.com/oklog/ulid" + "github.com/opentracing/opentracing-go" "github.com/pkg/errors" "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" @@ -30,21 +33,43 @@ type BlockReader interface { // todo symbdb } -func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, error) { +func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Meta, err error) { + srcMetas := make([]block.Meta, len(src)) + ulids := make([]string, len(src)) + + for i, b := range src { + srcMetas[i] = b.Meta() + ulids[i] = b.Meta().ULID.String() + } + meta = compactMetas(srcMetas) + blockPath := filepath.Join(dst, meta.ULID.String()) + indexPath := filepath.Join(blockPath, block.IndexFilename) + profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + + sp, ctx := opentracing.StartSpanFromContext(ctx, "Compact") + defer func() { + // todo: context propagation is not working through objstore + // This is because the BlockReader has no context. + sp.SetTag("src", ulids) + sp.SetTag("block_id", meta.ULID.String()) + if err != nil { + sp.SetTag("error", err) + } + sp.Finish() + }() + if len(src) <= 1 { return block.Meta{}, errors.New("not enough blocks to compact") } - meta := compactedMeta(src) - blockPath := filepath.Join(dst, meta.ULID.String()) if err := os.MkdirAll(blockPath, 0o777); err != nil { return block.Meta{}, err } - indexPath := filepath.Join(blockPath, block.IndexFilename) + indexw, err := prepareIndexWriter(ctx, indexPath, src) if err != nil { return block.Meta{}, err } - profilePath := filepath.Join(blockPath, (&schemav1.ProfilePersister{}).Name()+block.ParquetSuffix) + profileFile, err := os.OpenFile(profilePath, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) if err != nil { return block.Meta{}, err @@ -74,18 +99,46 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (block.Meta, er if err := profileWriter.Close(); err != nil { return block.Meta{}, err } - // todo: block meta files. + + metaFiles, err := metaFilesFromDir(blockPath) + if err != nil { + return block.Meta{}, err + } + meta.Files = metaFiles meta.Stats.NumProfiles = total meta.Stats.NumSeries = seriesRewriter.NumSeries() meta.Stats.NumSamples = symbolsRewriter.NumSamples() - if _, err := meta.WriteToFile(util.Logger, blockPath); err != nil { return block.Meta{}, err } - return *meta, nil + return meta, nil +} + +// todo implement and tests +func metaFilesFromDir(dir string) ([]block.File, error) { + var files []block.File + err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { + if err != nil { + return err + } + if info.IsDir() { + return nil + } + switch filepath.Ext(info.Name()) { + case strings.TrimPrefix(block.ParquetSuffix, "."): + // todo parquet file + case filepath.Ext(block.IndexFilename): + // todo tsdb index file + default: + // todo other files + } + return nil + }) + return files, err } -func compactedMeta(src []BlockReader) *block.Meta { +// todo write tests +func compactMetas(src []block.Meta) block.Meta { meta := block.NewMeta() highestCompactionLevel := 0 ulids := make([]ulid.ULID, len(src)) @@ -93,22 +146,22 @@ func compactedMeta(src []BlockReader) *block.Meta { minTime, maxTime := model.Latest, model.Earliest labels := make(map[string]string) for _, b := range src { - if b.Meta().Compaction.Level > highestCompactionLevel { - highestCompactionLevel = b.Meta().Compaction.Level + if b.Compaction.Level > highestCompactionLevel { + highestCompactionLevel = b.Compaction.Level } - ulids = append(ulids, b.Meta().ULID) + ulids = append(ulids, b.ULID) parents = append(parents, tsdb.BlockDesc{ - ULID: b.Meta().ULID, - MinTime: int64(b.Meta().MinTime), - MaxTime: int64(b.Meta().MaxTime), + ULID: b.ULID, + MinTime: int64(b.MinTime), + MaxTime: int64(b.MaxTime), }) - if b.Meta().MinTime < minTime { - minTime = b.Meta().MinTime + if b.MinTime < minTime { + minTime = b.MinTime } - if b.Meta().MaxTime > maxTime { - maxTime = b.Meta().MaxTime + if b.MaxTime > maxTime { + maxTime = b.MaxTime } - for k, v := range b.Meta().Labels { + for k, v := range b.Labels { if k == block.HostnameLabel { continue } @@ -128,7 +181,7 @@ func compactedMeta(src []BlockReader) *block.Meta { meta.MaxTime = maxTime meta.MinTime = minTime meta.Labels = labels - return meta + return *meta } type profileRow struct { From 6ac527f8ee258853fbc87519a21a8ce54ee007f2 Mon Sep 17 00:00:00 2001 From: Cyril Tovena Date: Mon, 17 Jul 2023 14:11:37 +0200 Subject: [PATCH 15/15] Adds meta files information to dst meta --- pkg/phlaredb/compact.go | 52 +++++++++++++--- pkg/phlaredb/compact_test.go | 103 +++++++++++++++++++++++++++++++ pkg/phlaredb/tsdb/index/index.go | 12 ++++ 3 files changed, 160 insertions(+), 7 deletions(-) diff --git a/pkg/phlaredb/compact.go b/pkg/phlaredb/compact.go index 2249f50c0..433b851fc 100644 --- a/pkg/phlaredb/compact.go +++ b/pkg/phlaredb/compact.go @@ -6,7 +6,6 @@ import ( "math" "os" "path/filepath" - "strings" "github.com/oklog/ulid" "github.com/opentracing/opentracing-go" @@ -114,7 +113,7 @@ func Compact(ctx context.Context, src []BlockReader, dst string) (meta block.Met return meta, nil } -// todo implement and tests +// metaFilesFromDir returns a list of block files description from a directory. func metaFilesFromDir(dir string) ([]block.File, error) { var files []block.File err := filepath.Walk(dir, func(path string, info fs.FileInfo, err error) error { @@ -124,19 +123,58 @@ func metaFilesFromDir(dir string) ([]block.File, error) { if info.IsDir() { return nil } + var f block.File switch filepath.Ext(info.Name()) { - case strings.TrimPrefix(block.ParquetSuffix, "."): - // todo parquet file + case block.ParquetSuffix: + f, err = parquetMetaFile(path, info.Size()) + if err != nil { + return err + } case filepath.Ext(block.IndexFilename): - // todo tsdb index file - default: - // todo other files + f, err = tsdbMetaFile(path) + if err != nil { + return err + } + } + f.RelPath, err = filepath.Rel(dir, path) + if err != nil { + return err } + f.SizeBytes = uint64(info.Size()) + files = append(files, f) return nil }) return files, err } +func tsdbMetaFile(filePath string) (block.File, error) { + idxReader, err := index.NewFileReader(filePath) + if err != nil { + return block.File{}, err + } + + return idxReader.FileInfo(), nil +} + +func parquetMetaFile(filePath string, size int64) (block.File, error) { + f, err := os.Open(filePath) + if err != nil { + return block.File{}, err + } + defer f.Close() + + pqFile, err := parquet.OpenFile(f, size) + if err != nil { + return block.File{}, err + } + return block.File{ + Parquet: &block.ParquetFile{ + NumRowGroups: uint64(len(pqFile.RowGroups())), + NumRows: uint64(pqFile.NumRows()), + }, + }, nil +} + // todo write tests func compactMetas(src []block.Meta) block.Meta { meta := block.NewMeta() diff --git a/pkg/phlaredb/compact_test.go b/pkg/phlaredb/compact_test.go index 213ddac24..63a793cc4 100644 --- a/pkg/phlaredb/compact_test.go +++ b/pkg/phlaredb/compact_test.go @@ -2,7 +2,10 @@ package phlaredb import ( "context" + "fmt" "net/http" + "os" + "path/filepath" "sort" "sync" "testing" @@ -12,6 +15,7 @@ import ( "github.com/prometheus/common/model" "github.com/prometheus/prometheus/storage" + "github.com/segmentio/parquet-go" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "golang.org/x/sync/errgroup" @@ -191,3 +195,102 @@ func addSeries(t *testing.T, idxw *index.Writer, idx int, labels phlaremodel.Lab t.Helper() require.NoError(t, idxw.AddSeries(storage.SeriesRef(idx), labels, model.Fingerprint(labels.Hash()), index.ChunkMeta{SeriesIndex: uint32(idx)})) } + +func TestMetaFilesFromDir(t *testing.T) { + dst := t.TempDir() + generateParquetFile(t, filepath.Join(dst, "foo.parquet")) + generateParquetFile(t, filepath.Join(dst, "symbols", "bar.parquet")) + generateFile(t, filepath.Join(dst, "symbols", "index.symdb"), 100) + generateFile(t, filepath.Join(dst, "symbols", "stacktraces.symdb"), 200) + generateIndexFile(t, dst) + actual, err := metaFilesFromDir(dst) + + require.NoError(t, err) + require.Equal(t, 5, len(actual)) + require.Equal(t, []block.File{ + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: "foo.parquet", + SizeBytes: fileSize(t, filepath.Join(dst, "foo.parquet")), + }, + { + RelPath: block.IndexFilename, + SizeBytes: fileSize(t, filepath.Join(dst, block.IndexFilename)), + TSDB: &block.TSDBFile{ + NumSeries: 3, + }, + }, + { + Parquet: &block.ParquetFile{ + NumRows: 100, + NumRowGroups: 10, + }, + RelPath: filepath.Join("symbols", "bar.parquet"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "bar.parquet")), + }, + { + RelPath: filepath.Join("symbols", "index.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "index.symdb")), + }, + { + RelPath: filepath.Join("symbols", "stacktraces.symdb"), + SizeBytes: fileSize(t, filepath.Join(dst, "symbols", "stacktraces.symdb")), + }, + }, actual) +} + +func fileSize(t *testing.T, path string) uint64 { + t.Helper() + fi, err := os.Stat(path) + require.NoError(t, err) + return uint64(fi.Size()) +} + +func generateFile(t *testing.T, path string, size int) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + f, err := os.Create(path) + require.NoError(t, err) + defer f.Close() + require.NoError(t, f.Truncate(int64(size))) +} + +func generateIndexFile(t *testing.T, dir string) { + t.Helper() + filePath := filepath.Join(dir, block.IndexFilename) + idxw, err := index.NewWriter(context.Background(), filePath) + require.NoError(t, err) + require.NoError(t, idxw.AddSymbol("a")) + require.NoError(t, idxw.AddSymbol("b")) + require.NoError(t, idxw.AddSymbol("c")) + addSeries(t, idxw, 0, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "b"}, + }) + addSeries(t, idxw, 1, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "a", Value: "c"}, + }) + addSeries(t, idxw, 2, phlaremodel.Labels{ + &typesv1.LabelPair{Name: "b", Value: "a"}, + }) + require.NoError(t, idxw.Close()) +} + +func generateParquetFile(t *testing.T, path string) { + t.Helper() + require.NoError(t, os.MkdirAll(filepath.Dir(path), 0o755)) + file, err := os.OpenFile(path, os.O_RDWR|os.O_CREATE|os.O_EXCL, 0o644) + require.NoError(t, err) + defer file.Close() + + writer := parquet.NewGenericWriter[struct{ Name string }](file, parquet.MaxRowsPerRowGroup(10)) + defer writer.Close() + for i := 0; i < 100; i++ { + _, err := writer.Write([]struct{ Name string }{ + {Name: fmt.Sprintf("name-%d", i)}, + }) + require.NoError(t, err) + } +} diff --git a/pkg/phlaredb/tsdb/index/index.go b/pkg/phlaredb/tsdb/index/index.go index 7a7ef4fc7..583ac873f 100644 --- a/pkg/phlaredb/tsdb/index/index.go +++ b/pkg/phlaredb/tsdb/index/index.go @@ -1334,9 +1334,21 @@ func (r *Reader) Version() int { // FileInfo returns some general stats about the underlying file func (r *Reader) FileInfo() block.File { + k, v := AllPostingsKey() + postings, err := r.Postings(k, nil, v) + if err != nil { + panic(err) + } + var numSeries uint64 + for postings.Next() { + numSeries++ + } return block.File{ RelPath: block.IndexFilename, SizeBytes: uint64(r.Size()), + TSDB: &block.TSDBFile{ + NumSeries: numSeries, + }, } }