Skip to content

Commit

Permalink
Include out-or-order label in blocks queried metric (#7262)
Browse files Browse the repository at this point in the history
Add an `out_or_order` label to the count of blocks queried by level and
source. This helps with confirming the reason for non-compacted blocks
being queried by the store-gateways.

Signed-off-by: Nick Pillitteri <[email protected]>
  • Loading branch information
56quarters authored Jan 31, 2024
1 parent 3bb7b7f commit 8bbb883
Show file tree
Hide file tree
Showing 4 changed files with 37 additions and 33 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,7 +56,7 @@
* [ENHANCEMENT] Distributor: set `-distributor.reusable-ingester-push-workers=2000` by default and mark feature as `advanced`. #7128
* [ENHANCEMENT] All: set `-server.grpc.num-workers=100` by default and mark feature as `advanced`. #7131
* [ENHANCEMENT] Distributor: invalid metric name error message gets cleaned up to not include non-ascii strings. #7146
* [ENHANCEMENT] Store-gateway: add `source` and `level` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by source and compaction level. #7112
* [ENHANCEMENT] Store-gateway: add `source`, `level`, and `out_or_order` to `cortex_bucket_store_series_blocks_queried` metric that indicates the number of blocks that were queried from store gateways by block metadata. #7112 #7262
* [BUGFIX] Ingester: don't ignore errors encountered while iterating through chunks or samples in response to a query request. #6451
* [BUGFIX] Fix issue where queries can fail or omit OOO samples if OOO head compaction occurs between creating a querier and reading chunks #6766
* [BUGFIX] Fix issue where concatenatingChunkIterator can obscure errors #6766
Expand Down
38 changes: 16 additions & 22 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -1088,11 +1088,11 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
strategy seriesIteratorStrategy,
) (seriesChunkRefsSetIterator, error) {
var (
mtx = sync.Mutex{}
batches = make([]seriesChunkRefsSetIterator, 0, len(blocks))
g, _ = errgroup.WithContext(ctx)
begin = time.Now()
blocksQueriedBySourceAndLevel = make(map[blockSourceAndLevel]int)
mtx = sync.Mutex{}
batches = make([]seriesChunkRefsSetIterator, 0, len(blocks))
g, _ = errgroup.WithContext(ctx)
begin = time.Now()
blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int)
)
for i, b := range blocks {
b := b
Expand Down Expand Up @@ -1139,10 +1139,7 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(
return nil
})

blocksQueriedBySourceAndLevel[blockSourceAndLevel{
source: b.meta.Thanos.Source,
level: b.meta.Compaction.Level,
}]++
blocksQueriedByBlockMeta[newBlockQueriedMeta(b.meta)]++
}

err := g.Wait()
Expand All @@ -1152,8 +1149,8 @@ func (s *BucketStore) getSeriesIteratorFromBlocks(

stats.update(func(stats *queryStats) {
stats.blocksQueried = len(batches)
for sl, count := range blocksQueriedBySourceAndLevel {
stats.blocksQueriedBySourceAndLevel[sl] = count
for sl, count := range blocksQueriedByBlockMeta {
stats.blocksQueriedByBlockMeta[sl] = count
}
stats.streamingSeriesExpandPostingsDuration += time.Since(begin)
})
Expand Down Expand Up @@ -1184,8 +1181,8 @@ func (s *BucketStore) recordSeriesCallResult(safeStats *safeQueryStats) {
s.metrics.seriesDataFetched.WithLabelValues("chunks", "refetched").Observe(float64(stats.chunksRefetched))
s.metrics.seriesDataSizeFetched.WithLabelValues("chunks", "refetched").Observe(float64(stats.chunksRefetchedSizeSum))

for sl, count := range stats.blocksQueriedBySourceAndLevel {
s.metrics.seriesBlocksQueried.WithLabelValues(string(sl.source), strconv.Itoa(sl.level)).Observe(float64(count))
for m, count := range stats.blocksQueriedByBlockMeta {
s.metrics.seriesBlocksQueried.WithLabelValues(string(m.source), strconv.Itoa(m.level), strconv.FormatBool(m.outOfOrder)).Observe(float64(count))
}

s.metrics.seriesDataTouched.WithLabelValues("chunks", "processed").Observe(float64(stats.chunksTouched))
Expand All @@ -1205,8 +1202,8 @@ func (s *BucketStore) recordLabelNamesCallResult(safeStats *safeQueryStats) {
s.recordSeriesHashCacheStats(stats)
s.recordStreamingSeriesStats(stats)

for sl, count := range stats.blocksQueriedBySourceAndLevel {
s.metrics.seriesBlocksQueried.WithLabelValues(string(sl.source), strconv.Itoa(sl.level)).Observe(float64(count))
for m, count := range stats.blocksQueriedByBlockMeta {
s.metrics.seriesBlocksQueried.WithLabelValues(string(m.source), strconv.Itoa(m.level), strconv.FormatBool(m.outOfOrder)).Observe(float64(count))
}
}

Expand Down Expand Up @@ -1335,7 +1332,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

var mtx sync.Mutex
var sets [][]string
var blocksQueriedBySourceAndLevel = make(map[blockSourceAndLevel]int)
var blocksQueriedByBlockMeta = make(map[blockQueriedMeta]int)
seriesLimiter := s.seriesLimiterFactory(s.metrics.queriesDropped.WithLabelValues("series"))

for _, b := range s.blocks {
Expand All @@ -1348,10 +1345,7 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq
}

resHints.AddQueriedBlock(b.meta.ULID)
blocksQueriedBySourceAndLevel[blockSourceAndLevel{
source: b.meta.Thanos.Source,
level: b.meta.Compaction.Level,
}]++
blocksQueriedByBlockMeta[newBlockQueriedMeta(b.meta)]++

indexr := b.loadedIndexReader(gctx, s.postingsStrategy, stats)

Expand Down Expand Up @@ -1385,8 +1379,8 @@ func (s *BucketStore) LabelNames(ctx context.Context, req *storepb.LabelNamesReq

stats.update(func(stats *queryStats) {
stats.blocksQueried = len(sets)
for sl, count := range blocksQueriedBySourceAndLevel {
stats.blocksQueriedBySourceAndLevel[sl] = count
for sl, count := range blocksQueriedByBlockMeta {
stats.blocksQueriedByBlockMeta[sl] = count
}
})

Expand Down
2 changes: 1 addition & 1 deletion pkg/storegateway/bucket_store_metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -95,7 +95,7 @@ func NewBucketStoreMetrics(reg prometheus.Registerer) *BucketStoreMetrics {
m.seriesBlocksQueried = promauto.With(reg).NewSummaryVec(prometheus.SummaryOpts{
Name: "cortex_bucket_store_series_blocks_queried",
Help: "Number of blocks in a bucket store that were touched to satisfy a query.",
}, []string{"source", "level"})
}, []string{"source", "level", "out_of_order"})
m.seriesRefetches = promauto.With(reg).NewCounter(prometheus.CounterOpts{
Name: "cortex_bucket_store_series_refetches_total",
Help: "Total number of cases where the built-in max series size was not enough to fetch series from index, resulting in refetch.",
Expand Down
28 changes: 19 additions & 9 deletions pkg/storegateway/stats.go
Original file line number Diff line number Diff line change
Expand Up @@ -14,8 +14,8 @@ import (

// queryStats holds query statistics. This data structure is NOT concurrency safe.
type queryStats struct {
blocksQueried int
blocksQueriedBySourceAndLevel map[blockSourceAndLevel]int
blocksQueried int
blocksQueriedByBlockMeta map[blockQueriedMeta]int

postingsTouched int
postingsTouchedSizeSum int
Expand Down Expand Up @@ -81,20 +81,30 @@ type queryStats struct {

func newQueryStats() *queryStats {
return &queryStats{
blocksQueriedBySourceAndLevel: make(map[blockSourceAndLevel]int),
blocksQueriedByBlockMeta: make(map[blockQueriedMeta]int),
}
}

// blockSourceAndLevel encapsulate a block's thanos source and compaction level
type blockSourceAndLevel struct {
source block.SourceType
level int
// blockQueriedMeta encapsulate a block's thanos source, compaction level, and if it
// was created from out-or-order samples
type blockQueriedMeta struct {
source block.SourceType
level int
outOfOrder bool
}

func newBlockQueriedMeta(meta *block.Meta) blockQueriedMeta {
return blockQueriedMeta{
source: meta.Thanos.Source,
level: meta.Compaction.Level,
outOfOrder: meta.OutOfOrder,
}
}

func (s queryStats) merge(o *queryStats) *queryStats {
s.blocksQueried += o.blocksQueried
for sl, count := range o.blocksQueriedBySourceAndLevel {
s.blocksQueriedBySourceAndLevel[sl] += count
for m, count := range o.blocksQueriedByBlockMeta {
s.blocksQueriedByBlockMeta[m] += count
}

s.postingsTouched += o.postingsTouched
Expand Down

0 comments on commit 8bbb883

Please sign in to comment.