Skip to content

Commit

Permalink
block-builder: Add cortex_blockbuilder_blocks_produced_total metric (#…
Browse files Browse the repository at this point in the history
…10538)

* block-builder: Add cortex_blockbuilder_blocks_produced_total metric

Signed-off-by: Ganesh Vernekar <[email protected]>

* Fix tests

Signed-off-by: Ganesh Vernekar <[email protected]>

* Fix comments

Signed-off-by: Ganesh Vernekar <[email protected]>

* Fix race

Signed-off-by: Ganesh Vernekar <[email protected]>

---------

Signed-off-by: Ganesh Vernekar <[email protected]>
  • Loading branch information
codesome authored Feb 6, 2025
1 parent fd72e41 commit b45851c
Show file tree
Hide file tree
Showing 5 changed files with 109 additions and 39 deletions.
57 changes: 38 additions & 19 deletions pkg/blockbuilder/blockbuilder.go
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,7 @@ import (
otgrpc "github.com/opentracing-contrib/go-grpc"
"github.com/opentracing/opentracing-go"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/prometheus/tsdb"
"github.com/thanos-io/objstore"
"github.com/twmb/franz-go/pkg/kadm"
"github.com/twmb/franz-go/pkg/kgo"
Expand Down Expand Up @@ -74,10 +75,6 @@ func newWithSchedulerClient(
limits *validation.Overrides,
schedulerClient schedulerpb.SchedulerClient,
) (*BlockBuilder, error) {
if cfg.NoPartiallyConsumedRegion {
// We should not have a large buffer if we are putting all the records into a block.
cfg.ConsumeIntervalBuffer = 5 * time.Minute
}

b := &BlockBuilder{
cfg: cfg,
Expand Down Expand Up @@ -489,7 +486,7 @@ func (b *BlockBuilder) consumePartitionSection(
return state, nil
}

var numBlocks int
var blockMetas []tsdb.BlockMeta
defer func(t time.Time, startState PartitionState) {
// No need to log or track time of the unfinished section. Just bail out.
if errors.Is(retErr, context.Canceled) {
Expand All @@ -507,7 +504,7 @@ func (b *BlockBuilder) consumePartitionSection(
level.Info(logger).Log("msg", "done consuming", "duration", dur,
"last_block_end", startState.LastBlockEnd, "curr_block_end", blockEnd,
"last_seen_offset", startState.LastSeenOffset, "curr_seen_offset", retState.LastSeenOffset,
"num_blocks", numBlocks)
"num_blocks", len(blockMetas))
}(time.Now(), state)

// We always rewind the partition's offset to the commit offset by reassigning the partition to the client (this triggers partition assignment).
Expand Down Expand Up @@ -610,11 +607,16 @@ consumerLoop:
}

var err error
numBlocks, err = builder.CompactAndUpload(ctx, b.uploadBlocks)
blockMetas, err = builder.CompactAndUpload(ctx, b.uploadBlocks)
if err != nil {
return state, err
}

prev, curr, next := getBlockCategoryCount(sectionEndTime, blockMetas)
b.blockBuilderMetrics.blockCounts.WithLabelValues("previous").Add(float64(prev))
b.blockBuilderMetrics.blockCounts.WithLabelValues("current").Add(float64(curr))
b.blockBuilderMetrics.blockCounts.WithLabelValues("next").Add(float64(next))

// We should take the max of last seen offsets. If the partition was lagging due to some record not being processed
// because of a future sample, we might be coming back to the same consume cycle again.
lastSeenOffset := max(lastRec.Offset, state.LastSeenOffset)
Expand Down Expand Up @@ -643,6 +645,26 @@ consumerLoop:
return newState, nil
}

func getBlockCategoryCount(sectionEndTime time.Time, blockMetas []tsdb.BlockMeta) (prev, curr, next int) {
// Doing -30m will take care of ConsumeIntervalBuffer up to 30 mins.
// For sectionEndTime of 13:15, the 2-hour block will be 12:00-14:00.
// For sectionEndTime of 14:15, the 2-hour block will be 14:00-16:00.
currHour := sectionEndTime.Add(-30 * time.Minute).Truncate(2 * time.Hour).Hour()
for _, m := range blockMetas {
// The min and max time can be aligned to the 2hr mark. The MaxTime is exclusive of the last sample.
// So taking average of both will remove any edge cases.
hour := time.UnixMilli(m.MinTime/2 + m.MaxTime/2).Truncate(2 * time.Hour).Hour()
if hour < currHour {
prev++
} else if hour > currHour {
next++
} else {
curr++
}
}
return
}

type stateCommitter interface {
commitState(context.Context, *BlockBuilder, log.Logger, string, PartitionState) error
}
Expand Down Expand Up @@ -684,21 +706,18 @@ func (c *noOpCommitter) commitState(_ context.Context, _ *BlockBuilder, _ log.Lo

var _ stateCommitter = &noOpCommitter{}

func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, blockIDs []string) error {
func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error {
buc := bucket.NewUserBucketClient(tenantID, b.bucket, b.limits)
for _, bid := range blockIDs {
blockDir := path.Join(dbDir, bid)
meta, err := block.ReadMetaFromDir(blockDir)
if err != nil {
return fmt.Errorf("read block meta for block %s (tenant %s): %w", bid, tenantID, err)
}

if meta.Stats.NumSamples == 0 {
for _, m := range metas {
if m.Stats.NumSamples == 0 {
// No need to upload empty block.
level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", bid)
level.Info(b.logger).Log("msg", "skip uploading empty block", "tenant", tenantID, "block", m.ULID.String())
return nil
}

meta := &block.Meta{BlockMeta: m}
blockDir := path.Join(dbDir, meta.ULID.String())

meta.Thanos.Source = block.BlockBuilderSource
meta.Thanos.SegmentFiles = block.GetSegmentFiles(blockDir)

Expand All @@ -717,11 +736,11 @@ func (b *BlockBuilder) uploadBlocks(ctx context.Context, tenantID, dbDir string,
if err == nil {
break
}
level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", bid, "tenant", tenantID)
level.Warn(b.logger).Log("msg", "failed to upload block; will retry", "err", err, "block", meta.ULID.String(), "tenant", tenantID)
boff.Wait()
}
if err := boff.ErrCause(); err != nil {
return fmt.Errorf("upload block %s (tenant %s): %w", bid, tenantID, err)
return fmt.Errorf("upload block %s (tenant %s): %w", meta.ULID.String(), tenantID, err)
}
}
return nil
Expand Down
48 changes: 45 additions & 3 deletions pkg/blockbuilder/blockbuilder_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1144,6 +1144,7 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {

cfg, overrides := blockBuilderConfig(t, kafkaAddr)
cfg.NoPartiallyConsumedRegion = true
cfg.ConsumeIntervalBuffer = 5 * time.Minute

// Set up a hook to track commits from block-builder to kafka. Those indicate the end of a cycle.
kafkaCommits := atomic.NewInt32(0)
Expand All @@ -1155,9 +1156,6 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {
bb, err := New(cfg, test.NewTestingLogger(t), prometheus.NewPedanticRegistry(), overrides)
require.NoError(t, err)

// NoPartiallyConsumedRegion changes the buffer to 5 mins.
require.Equal(t, 5*time.Minute, bb.cfg.ConsumeIntervalBuffer)

require.NoError(t, bb.starting(ctx))
t.Cleanup(func() {
require.NoError(t, bb.stoppingStandaloneMode(nil))
Expand Down Expand Up @@ -1212,6 +1210,50 @@ func TestNoPartiallyConsumedRegions(t *testing.T) {
require.Equal(t, len(producedSamples)-1, int(state.LastSeenOffset))
}

// This is a temporary test for quick iteration on the new way of consuming partition.
// TODO: integrate it with other tests.
func TestGetBlockCategoryCount(t *testing.T) {
evenHrBounary := time.UnixMilli(10 * time.Hour.Milliseconds())
oddHrBounary := time.UnixMilli(9 * time.Hour.Milliseconds())
cases := []struct {
sectionEndTime time.Time
metas []tsdb.BlockMeta
prev, curr, next int
}{
{
sectionEndTime: evenHrBounary,
metas: []tsdb.BlockMeta{
{MinTime: evenHrBounary.Add(-4 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli()},
{MinTime: evenHrBounary.Add(-2 * time.Hour).UnixMilli(), MaxTime: evenHrBounary.UnixMilli()},
{MinTime: evenHrBounary.UnixMilli(), MaxTime: evenHrBounary.Add(5 * time.Minute).UnixMilli()},
},
prev: 1, curr: 1, next: 1,
},
{
sectionEndTime: oddHrBounary,
metas: []tsdb.BlockMeta{
{MinTime: oddHrBounary.Add(-3 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli()},
{MinTime: oddHrBounary.Add(-1 * time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()},
// Although this is after the sectionEndTime, it is still the same 2h block.
{MinTime: oddHrBounary.UnixMilli(), MaxTime: oddHrBounary.Add(time.Hour).UnixMilli()},
{MinTime: oddHrBounary.Add(time.Hour).UnixMilli(), MaxTime: oddHrBounary.Add(2 * time.Hour).UnixMilli()},
},
prev: 1, curr: 2, next: 1,
},
}
for i, c := range cases {
// Buffer to add to sectionEndTime.
for _, buffer := range []time.Duration{0, 5 * time.Minute, 10 * time.Minute, 15 * time.Minute} {
t.Run(fmt.Sprintf("%d,buffer=%s", i, buffer.String()), func(t *testing.T) {
prev, curr, next := getBlockCategoryCount(c.sectionEndTime.Add(buffer), c.metas)
require.Equal(t, c.prev, prev)
require.Equal(t, c.curr, curr)
require.Equal(t, c.next, next)
})
}
}
}

func blockBuilderPullModeConfig(t *testing.T, addr string) (Config, *validation.Overrides) {
cfg, overrides := blockBuilderConfig(t, addr)
cfg.SchedulerConfig = SchedulerConfig{
Expand Down
9 changes: 9 additions & 0 deletions pkg/blockbuilder/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -12,6 +12,7 @@ type blockBuilderMetrics struct {
processPartitionDuration *prometheus.HistogramVec
fetchErrors *prometheus.CounterVec
consumerLagRecords *prometheus.GaugeVec
blockCounts *prometheus.CounterVec
}

func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
Expand Down Expand Up @@ -40,6 +41,14 @@ func newBlockBuilderMetrics(reg prometheus.Registerer) blockBuilderMetrics {
Help: "The per-topic-partition number of records, instance needs to work through each cycle.",
}, []string{"partition"})

// block_time can be "next", "current" or "previous".
// If the block belongs to the current 2h block range, it goes in "current".
// "next" or "previous" are used for the blocks that are not in the current 2h block range.
m.blockCounts = promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_blockbuilder_blocks_produced_total",
Help: "Total number of blocks produced for specific block ranges (next, current, previous).",
}, []string{"block_time"})

return m
}

Expand Down
26 changes: 13 additions & 13 deletions pkg/blockbuilder/tsdb.go
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,6 @@ import (
"github.com/prometheus/prometheus/tsdb"
"github.com/prometheus/prometheus/tsdb/chunks"
"github.com/twmb/franz-go/pkg/kgo"
"go.uber.org/atomic"
"golang.org/x/sync/errgroup"

"github.com/grafana/mimir/pkg/mimirpb"
Expand Down Expand Up @@ -311,15 +310,16 @@ func (b *TSDBBuilder) newTSDB(tenant tsdbTenant) (*userTSDB, error) {
}

// Function to upload the blocks.
type blockUploader func(_ context.Context, tenantID, dbDir string, blockIDs []string) error
type blockUploader func(_ context.Context, tenantID, dbDir string, metas []tsdb.BlockMeta) error

// CompactAndUpload compacts the blocks of all the TSDBs and uploads them.
// uploadBlocks is a function that uploads the blocks to the required storage.
// All the DBs are closed and directories cleared irrespective of success or failure of this function.
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (_ int, err error) {
func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUploader) (metas []tsdb.BlockMeta, err error) {
var (
closedDBsMu sync.Mutex
closedDBs = make(map[*userTSDB]bool)
closedDBsMu, metasMu sync.Mutex

closedDBs = make(map[*userTSDB]bool)
)

b.tsdbsMu.Lock()
Expand All @@ -343,11 +343,9 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
level.Info(b.logger).Log("msg", "compacting and uploading blocks", "num_tsdb", len(b.tsdbs))

if len(b.tsdbs) == 0 {
return 0, nil
return nil, nil
}

numBlocks := atomic.NewInt64(0)

eg, ctx := errgroup.WithContext(ctx)
if b.blocksStorageCfg.TSDB.ShipConcurrency > 0 {
eg.SetLimit(b.blocksStorageCfg.TSDB.ShipConcurrency)
Expand All @@ -371,12 +369,14 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
return err
}

var blockIDs []string
dbDir := db.Dir()
var localMetas []tsdb.BlockMeta
for _, b := range db.Blocks() {
blockIDs = append(blockIDs, b.Meta().ULID.String())
localMetas = append(localMetas, b.Meta())
}
numBlocks.Add(int64(len(blockIDs)))
metasMu.Lock()
metas = append(metas, localMetas...)
metasMu.Unlock()

if err := db.Close(); err != nil {
return err
Expand All @@ -386,7 +386,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
closedDBs[db] = true
closedDBsMu.Unlock()

if err := uploadBlocks(ctx, tenant.tenantID, dbDir, blockIDs); err != nil {
if err := uploadBlocks(ctx, tenant.tenantID, dbDir, localMetas); err != nil {
return err
}

Expand All @@ -395,7 +395,7 @@ func (b *TSDBBuilder) CompactAndUpload(ctx context.Context, uploadBlocks blockUp
})
}
err = eg.Wait()
return int(numBlocks.Load()), err
return metas, err
}

// Close closes all DBs and deletes their data directories.
Expand Down
8 changes: 4 additions & 4 deletions pkg/blockbuilder/tsdb_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -299,7 +299,7 @@ func TestTSDBBuilder_CompactAndUpload_fail(t *testing.T) {
require.NoError(t, err)

errUploadFailed := fmt.Errorf("upload failed")
_, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []string) error {
_, err = builder.CompactAndUpload(context.Background(), func(_ context.Context, _, _ string, _ []tsdb.BlockMeta) error {
return errUploadFailed
})
require.ErrorIs(t, err, errUploadFailed)
Expand Down Expand Up @@ -364,9 +364,9 @@ func compareQuery(t *testing.T, db *tsdb.DB, expSamples []mimirpb.Sample, expHis
}

func mockUploaderFunc(t *testing.T, destDir string) blockUploader {
return func(_ context.Context, _, dbDir string, blockIDs []string) error {
for _, bid := range blockIDs {
blockDir := path.Join(dbDir, bid)
return func(_ context.Context, _, dbDir string, metas []tsdb.BlockMeta) error {
for _, meta := range metas {
blockDir := path.Join(dbDir, meta.ULID.String())
err := os.Rename(blockDir, path.Join(destDir, path.Base(blockDir)))
require.NoError(t, err)
}
Expand Down

0 comments on commit b45851c

Please sign in to comment.