Skip to content

Commit

Permalink
add discovery of index header sizes for all tenants, all blocks - onl…
Browse files Browse the repository at this point in the history
…y on startup for now, no loading of any already existing on disk
  • Loading branch information
francoposa committed Mar 5, 2025
1 parent 17c4b74 commit c4b04db
Show file tree
Hide file tree
Showing 2 changed files with 341 additions and 4 deletions.
116 changes: 116 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,122 @@ func (s *BucketStore) Stats() BucketStoreStats {
}
}

// TenantIndexHeaderMap is a thread-safe nested map structure.
// This type serves as a synchronized map[string]map[ulid.ULID]objstore.ObjectAttributes.
// The objstore.ObjectAttributes are used to get the index header size.
type TenantIndexHeaderMap struct {
blockIndexHeaderMap sync.Map // map[string]sync.Map
}

func (t *TenantIndexHeaderMap) String() string {
s := "TenantIndexHeaderMap: "
t.blockIndexHeaderMap.Range(func(key, value interface{}) bool {
tenantID := key.(string)
indexHeaders := value.(*sync.Map)
indexHeaders.Range(func(key, value interface{}) bool {
blockID := key.(ulid.ULID)
attrs := value.(objstore.ObjectAttributes)
s += fmt.Sprintf("tenantID: %s, blockID: %s, attrs: %v --- ", tenantID, blockID, attrs)
return true
})
return true
})
return s
}

func (t *TenantIndexHeaderMap) Contains(tenantID string, blockID ulid.ULID) bool {
indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID)
if !ok {
return false
}
indexHeaders := indexHeadersVal.(*sync.Map)
_, ok = indexHeaders.Load(blockID)
return ok
}

func (t *TenantIndexHeaderMap) Add(
tenantID string,
blockID ulid.ULID,
indexHeaderAttrs objstore.ObjectAttributes,
) error {
indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID)
if !ok {
newIndexHeaders := &sync.Map{} // map[ulid.ULID]objstore.ObjectAttributes
newIndexHeaders.Store(blockID, indexHeaderAttrs)
t.blockIndexHeaderMap.Store(tenantID, newIndexHeaders)
return nil
}
indexHeaders := indexHeadersVal.(*sync.Map)
_, ok = indexHeaders.LoadOrStore(blockID, indexHeaderAttrs)
if ok {
// This should not ever happen.
return fmt.Errorf("block %s already exists in the set", blockID)
}
return nil
}

func (s *BucketStore) DiscoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error {
return s.discoverIndexHeaders(ctx, tenantIndexHeaderMap)
}

func (s *BucketStore) discoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error {
metas, _, metaFetchErr := s.fetcher.Fetch(ctx)
// For partial view allow adding new blocks at least.
if metaFetchErr != nil && metas == nil {
return metaFetchErr
}

errGroup, ctx := errgroup.WithContext(ctx)
blockc := make(chan *block.Meta)

for i := 0; i < s.blockSyncConcurrency; i++ {
errGroup.Go(func() error {
for meta := range blockc {
attrs, err := s.blockIndexHeaderAttrs(ctx, meta.ULID)
if err != nil {
return err
}
err = tenantIndexHeaderMap.Add(s.userID, meta.ULID, attrs)
if err != nil {
return err
}
}
return nil
})
}

for blockID, meta := range metas {
if tenantIndexHeaderMap.Contains(s.userID, blockID) {
continue
}
select {
case <-ctx.Done():
case blockc <- meta:
}
}

close(blockc)
err := errGroup.Wait()
if err != nil {
return err
}

if metaFetchErr != nil {
return metaFetchErr
}

return nil
}

func (s *BucketStore) blockIndexHeaderAttrs(ctx context.Context, blockID ulid.ULID) (objstore.ObjectAttributes, error) {
indexFilepath := filepath.Join(blockID.String(), block.IndexFilename)
attrs, err := s.bkt.Attributes(ctx, indexFilepath)
if err != nil {
err = errors.Wrapf(err, "get object attributes of %s", indexFilepath)
}
return attrs, nil
}

// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
Expand Down
229 changes: 225 additions & 4 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,24 +192,79 @@ func (u *BucketStores) stopBucketStores(error) error {

// initialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) initialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
start := time.Now()
level.Info(u.logger).Log("msg", "discovering TSDB index headers for all users")
tenantBlockIndexHeaders, err := u.DiscoverAllIndexHeaders(ctx)
if err != nil {
level.Warn(u.logger).Log("msg", "failed to discover TSDB index headers", "err", err)
return fmt.Errorf("initial discovery with bucket: %w", err)
}

level.Info(u.logger).Log(
"msg", "successfully discovered TSDB index headers for all users",
"tenantBlockIndexHeaders", tenantBlockIndexHeaders.String(),
"duration", time.Since(start).String(),
)

start = time.Now()
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error {
return store.InitialSync(ctx)
}); err != nil {
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
return fmt.Errorf("initial synchronisation with bucket: %w", err)
}

level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users")
level.Info(u.logger).Log(
"msg", "successfully synchronized TSDB blocks for all users",
"duration", time.Since(start).String(),
)
return nil
}

// SyncBlocks synchronizes the stores state with the Bucket store for every user.
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error {
f := func(ctx context.Context, store *BucketStore) error {
return store.SyncBlocks(ctx)
})
}
return u.syncUsersBlocksWithRetries(ctx, f)
}

func (u *BucketStores) DiscoverAllIndexHeaders(ctx context.Context) (*TenantIndexHeaderMap, error) {
tenantBlockIndexHeaders := &TenantIndexHeaderMap{}
discoverFunc := func(ctx context.Context, store *BucketStore) error {
return store.DiscoverIndexHeaders(ctx, tenantBlockIndexHeaders)
}
err := u.discoverAllIndexHeadersWithRetries(ctx, discoverFunc)
return tenantBlockIndexHeaders, err
}

func (u *BucketStores) discoverAllIndexHeadersWithRetries(
ctx context.Context,
discoverFunc func(context.Context, *BucketStore) error,
) error {
retries := backoff.New(ctx, u.syncBackoffConfig)

var lastErr error
for retries.Ongoing() {
userIDs, err := u.allowedUsers(ctx)
if err != nil {
retries.Wait()
continue
}
lastErr = u.discoverAllowedTenantsIndexHeaders(ctx, userIDs, discoverFunc)
if lastErr == nil {
return nil
}

retries.Wait()
}

if lastErr == nil {
return retries.Err()
}

return lastErr
}

func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *BucketStore) error) error {
Expand Down Expand Up @@ -237,6 +292,15 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co
return lastErr
}

func (u *BucketStores) allowedUsers(ctx context.Context) ([]string, error) {
userIDs, err := u.scanUsers(ctx)
if err != nil {
return nil, err
}
u.tenantsDiscovered.Set(float64(len(userIDs)))
return userIDs, nil
}

func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) {
userIDs, err := u.scanUsers(ctx)
if err != nil {
Expand All @@ -252,6 +316,83 @@ func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) {
return ownedUserIDs, nil
}

func (u *BucketStores) discoverAllowedTenantsIndexHeaders(
ctx context.Context,
allowedTenantIDs []string,
discoverFunc func(context.Context, *BucketStore) error,
) (returnErr error) {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
if returnErr == nil {
u.syncLastSuccess.SetToCurrentTime()
}
}(time.Now())

type job struct {
tenantID string
store *BucketStore
}

wg := &sync.WaitGroup{}
jobs := make(chan job)
errs := tsdb_errors.NewMulti()
errsMx := sync.Mutex{}

u.tenantsSynced.Set(float64(len(allowedTenantIDs)))

// Create a pool of workers which will synchronize blocks. The pool size
// is limited in order to avoid to concurrently sync a lot of tenants in
// a large cluster.
for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for job := range jobs {
if err := discoverFunc(ctx, job.store); err != nil {
errsMx.Lock()
errs.Add(errors.Wrapf(err, "failed to discover TSDB index header for user %s", job.tenantID))
errsMx.Unlock()
}
}
}()
}

// Lazily create a bucket store for each new user found
// and submit a sync job for each user.
for _, userID := range allowedTenantIDs {
bs, err := u.getOrCreateStoreNoShardingFilter(ctx, userID)
if err != nil {
errsMx.Lock()
errs.Add(err)
errsMx.Unlock()

continue
}

select {
case jobs <- job{tenantID: userID, store: bs}:
// Nothing to do. Will loop to push more jobs.
case <-ctx.Done():
// Wait until all workers have done, so the goroutines leak detector doesn't
// report any issue. This is expected to be quick, considering the done ctx
// is used by the worker callback function too.
close(jobs)
wg.Wait()

return ctx.Err()
}
}

// Wait until all workers completed.
close(jobs)
wg.Wait()

//u.closeBucketStoreAndDeleteLocalFilesForExcludedTenants(UserIDs)

return errs.Err()
}

func (u *BucketStores) syncUsersBlocks(ctx context.Context, includeUserIDs []string, f func(context.Context, *BucketStore) error) (returnErr error) {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -477,6 +618,86 @@ func (t timeoutGate) Done() {
t.delegate.Done()
}

func (u *BucketStores) getOrCreateStoreNoShardingFilter(ctx context.Context, userID string) (*BucketStore, error) {
userStoreKey := userID + "-nofilter"
// Check if the store already exists.
bs := u.getStore(userStoreKey)
if bs != nil {
return bs, nil
}

u.storesMu.Lock()
defer u.storesMu.Unlock()

// Check again for the store in the event it was created in-between locks.
bs = u.stores[userStoreKey]
if bs != nil {
return bs, nil
}

userLogger := util_log.WithUserID(userID, u.logger)

level.Info(userLogger).Log("msg", "creating user bucket store")

userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
fetcherReg := prometheus.NewRegistry()

// The sharding strategy filter MUST be before the ones we create here (order matters).
filters := []block.MetadataFilter{
newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin),
// Use our own custom implementation.
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksInStoreGatewayDelay, u.cfg.BucketStore.MetaSyncConcurrency),
// The duplicate filter has been intentionally omitted because it could cause troubles with
// the consistency check done on the querier. The duplicate filter removes redundant blocks
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}
fetcher := NewBucketIndexMetadataFetcher(
userID,
u.bucket,
u.limits,
u.logger,
fetcherReg,
filters,
)
bucketStoreOpts := []BucketStoreOption{
WithLogger(userLogger),
WithIndexCache(u.indexCache),
WithQueryGate(u.queryGate),
WithLazyLoadingGate(u.lazyLoadingGate),
}

bs, err := NewBucketStore(
userID,
userBkt,
fetcher,
u.syncDirForUser(userID),
u.cfg.BucketStore,
worstCaseFetchedDataStrategy{postingListActualSizeFactor: u.cfg.BucketStore.SeriesFetchPreference},
NewChunksLimiterFactory(func() uint64 {
return uint64(u.limits.MaxChunksPerQuery(userID))
}),
NewSeriesLimiterFactory(func() uint64 {
return uint64(u.limits.MaxFetchedSeriesPerQuery(userID))
}),
u.partitioners,
u.seriesHashCache,
u.bucketStoreMetrics,
bucketStoreOpts...,
)
if err != nil {
return nil, err
}
if err = services.StartAndAwaitRunning(ctx, bs); err != nil {
return nil, fmt.Errorf("starting bucket store for tenant %s: %w", userID, err)
}

u.stores[userStoreKey] = bs
u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg)

return bs, nil
}

func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
Expand Down

0 comments on commit c4b04db

Please sign in to comment.