diff --git a/pkg/storegateway/bucket.go b/pkg/storegateway/bucket.go index 1fd9dd7275..3764df4c98 100644 --- a/pkg/storegateway/bucket.go +++ b/pkg/storegateway/bucket.go @@ -296,6 +296,122 @@ func (s *BucketStore) Stats() BucketStoreStats { } } +// TenantIndexHeaderMap is a thread-safe nested map structure. +// This type serves as a synchronized map[string]map[ulid.ULID]objstore.ObjectAttributes. +// The objstore.ObjectAttributes are used to get the index header size. +type TenantIndexHeaderMap struct { + blockIndexHeaderMap sync.Map // map[string]sync.Map +} + +func (t *TenantIndexHeaderMap) String() string { + s := "TenantIndexHeaderMap: " + t.blockIndexHeaderMap.Range(func(key, value interface{}) bool { + tenantID := key.(string) + indexHeaders := value.(*sync.Map) + indexHeaders.Range(func(key, value interface{}) bool { + blockID := key.(ulid.ULID) + attrs := value.(objstore.ObjectAttributes) + s += fmt.Sprintf("tenantID: %s, blockID: %s, attrs: %v --- ", tenantID, blockID, attrs) + return true + }) + return true + }) + return s +} + +func (t *TenantIndexHeaderMap) Contains(tenantID string, blockID ulid.ULID) bool { + indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID) + if !ok { + return false + } + indexHeaders := indexHeadersVal.(*sync.Map) + _, ok = indexHeaders.Load(blockID) + return ok +} + +func (t *TenantIndexHeaderMap) Add( + tenantID string, + blockID ulid.ULID, + indexHeaderAttrs objstore.ObjectAttributes, +) error { + indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID) + if !ok { + newIndexHeaders := &sync.Map{} // map[ulid.ULID]objstore.ObjectAttributes + newIndexHeaders.Store(blockID, indexHeaderAttrs) + t.blockIndexHeaderMap.Store(tenantID, newIndexHeaders) + return nil + } + indexHeaders := indexHeadersVal.(*sync.Map) + _, ok = indexHeaders.LoadOrStore(blockID, indexHeaderAttrs) + if ok { + // This should not ever happen. + return fmt.Errorf("block %s already exists in the set", blockID) + } + return nil +} + +func (s *BucketStore) DiscoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error { + return s.discoverIndexHeaders(ctx, tenantIndexHeaderMap) +} + +func (s *BucketStore) discoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error { + metas, _, metaFetchErr := s.fetcher.Fetch(ctx) + // For partial view allow adding new blocks at least. + if metaFetchErr != nil && metas == nil { + return metaFetchErr + } + + errGroup, ctx := errgroup.WithContext(ctx) + blockc := make(chan *block.Meta) + + for i := 0; i < s.blockSyncConcurrency; i++ { + errGroup.Go(func() error { + for meta := range blockc { + attrs, err := s.blockIndexHeaderAttrs(ctx, meta.ULID) + if err != nil { + return err + } + err = tenantIndexHeaderMap.Add(s.userID, meta.ULID, attrs) + if err != nil { + return err + } + } + return nil + }) + } + + for blockID, meta := range metas { + if tenantIndexHeaderMap.Contains(s.userID, blockID) { + continue + } + select { + case <-ctx.Done(): + case blockc <- meta: + } + } + + close(blockc) + err := errGroup.Wait() + if err != nil { + return err + } + + if metaFetchErr != nil { + return metaFetchErr + } + + return nil +} + +func (s *BucketStore) blockIndexHeaderAttrs(ctx context.Context, blockID ulid.ULID) (objstore.ObjectAttributes, error) { + indexFilepath := filepath.Join(blockID.String(), block.IndexFilename) + attrs, err := s.bkt.Attributes(ctx, indexFilepath) + if err != nil { + err = errors.Wrapf(err, "get object attributes of %s", indexFilepath) + } + return attrs, nil +} + // SyncBlocks synchronizes the stores state with the Bucket bucket. // It will reuse disk space as persistent cache based on s.dir param. func (s *BucketStore) SyncBlocks(ctx context.Context) error { diff --git a/pkg/storegateway/bucket_stores.go b/pkg/storegateway/bucket_stores.go index a5bb52ef49..e0836f5a1d 100644 --- a/pkg/storegateway/bucket_stores.go +++ b/pkg/storegateway/bucket_stores.go @@ -192,8 +192,22 @@ func (u *BucketStores) stopBucketStores(error) error { // initialSync does an initial synchronization of blocks for all users. func (u *BucketStores) initialSync(ctx context.Context) error { - level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") + start := time.Now() + level.Info(u.logger).Log("msg", "discovering TSDB index headers for all users") + tenantBlockIndexHeaders, err := u.DiscoverAllIndexHeaders(ctx) + if err != nil { + level.Warn(u.logger).Log("msg", "failed to discover TSDB index headers", "err", err) + return fmt.Errorf("initial discovery with bucket: %w", err) + } + level.Info(u.logger).Log( + "msg", "successfully discovered TSDB index headers for all users", + "tenantBlockIndexHeaders", tenantBlockIndexHeaders.String(), + "duration", time.Since(start).String(), + ) + + start = time.Now() + level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users") if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error { return store.InitialSync(ctx) }); err != nil { @@ -201,15 +215,56 @@ func (u *BucketStores) initialSync(ctx context.Context) error { return fmt.Errorf("initial synchronisation with bucket: %w", err) } - level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users") + level.Info(u.logger).Log( + "msg", "successfully synchronized TSDB blocks for all users", + "duration", time.Since(start).String(), + ) return nil } // SyncBlocks synchronizes the stores state with the Bucket store for every user. func (u *BucketStores) SyncBlocks(ctx context.Context) error { - return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error { + f := func(ctx context.Context, store *BucketStore) error { return store.SyncBlocks(ctx) - }) + } + return u.syncUsersBlocksWithRetries(ctx, f) +} + +func (u *BucketStores) DiscoverAllIndexHeaders(ctx context.Context) (*TenantIndexHeaderMap, error) { + tenantBlockIndexHeaders := &TenantIndexHeaderMap{} + discoverFunc := func(ctx context.Context, store *BucketStore) error { + return store.DiscoverIndexHeaders(ctx, tenantBlockIndexHeaders) + } + err := u.discoverAllIndexHeadersWithRetries(ctx, discoverFunc) + return tenantBlockIndexHeaders, err +} + +func (u *BucketStores) discoverAllIndexHeadersWithRetries( + ctx context.Context, + discoverFunc func(context.Context, *BucketStore) error, +) error { + retries := backoff.New(ctx, u.syncBackoffConfig) + + var lastErr error + for retries.Ongoing() { + userIDs, err := u.allowedUsers(ctx) + if err != nil { + retries.Wait() + continue + } + lastErr = u.discoverAllowedTenantsIndexHeaders(ctx, userIDs, discoverFunc) + if lastErr == nil { + return nil + } + + retries.Wait() + } + + if lastErr == nil { + return retries.Err() + } + + return lastErr } func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *BucketStore) error) error { @@ -237,6 +292,15 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co return lastErr } +func (u *BucketStores) allowedUsers(ctx context.Context) ([]string, error) { + userIDs, err := u.scanUsers(ctx) + if err != nil { + return nil, err + } + u.tenantsDiscovered.Set(float64(len(userIDs))) + return userIDs, nil +} + func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) { userIDs, err := u.scanUsers(ctx) if err != nil { @@ -252,6 +316,83 @@ func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) { return ownedUserIDs, nil } +func (u *BucketStores) discoverAllowedTenantsIndexHeaders( + ctx context.Context, + allowedTenantIDs []string, + discoverFunc func(context.Context, *BucketStore) error, +) (returnErr error) { + defer func(start time.Time) { + u.syncTimes.Observe(time.Since(start).Seconds()) + if returnErr == nil { + u.syncLastSuccess.SetToCurrentTime() + } + }(time.Now()) + + type job struct { + tenantID string + store *BucketStore + } + + wg := &sync.WaitGroup{} + jobs := make(chan job) + errs := tsdb_errors.NewMulti() + errsMx := sync.Mutex{} + + u.tenantsSynced.Set(float64(len(allowedTenantIDs))) + + // Create a pool of workers which will synchronize blocks. The pool size + // is limited in order to avoid to concurrently sync a lot of tenants in + // a large cluster. + for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ { + wg.Add(1) + go func() { + defer wg.Done() + + for job := range jobs { + if err := discoverFunc(ctx, job.store); err != nil { + errsMx.Lock() + errs.Add(errors.Wrapf(err, "failed to discover TSDB index header for user %s", job.tenantID)) + errsMx.Unlock() + } + } + }() + } + + // Lazily create a bucket store for each new user found + // and submit a sync job for each user. + for _, userID := range allowedTenantIDs { + bs, err := u.getOrCreateStoreNoShardingFilter(ctx, userID) + if err != nil { + errsMx.Lock() + errs.Add(err) + errsMx.Unlock() + + continue + } + + select { + case jobs <- job{tenantID: userID, store: bs}: + // Nothing to do. Will loop to push more jobs. + case <-ctx.Done(): + // Wait until all workers have done, so the goroutines leak detector doesn't + // report any issue. This is expected to be quick, considering the done ctx + // is used by the worker callback function too. + close(jobs) + wg.Wait() + + return ctx.Err() + } + } + + // Wait until all workers completed. + close(jobs) + wg.Wait() + + //u.closeBucketStoreAndDeleteLocalFilesForExcludedTenants(UserIDs) + + return errs.Err() +} + func (u *BucketStores) syncUsersBlocks(ctx context.Context, includeUserIDs []string, f func(context.Context, *BucketStore) error) (returnErr error) { defer func(start time.Time) { u.syncTimes.Observe(time.Since(start).Seconds()) @@ -477,6 +618,86 @@ func (t timeoutGate) Done() { t.delegate.Done() } +func (u *BucketStores) getOrCreateStoreNoShardingFilter(ctx context.Context, userID string) (*BucketStore, error) { + userStoreKey := userID + "-nofilter" + // Check if the store already exists. + bs := u.getStore(userStoreKey) + if bs != nil { + return bs, nil + } + + u.storesMu.Lock() + defer u.storesMu.Unlock() + + // Check again for the store in the event it was created in-between locks. + bs = u.stores[userStoreKey] + if bs != nil { + return bs, nil + } + + userLogger := util_log.WithUserID(userID, u.logger) + + level.Info(userLogger).Log("msg", "creating user bucket store") + + userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits) + fetcherReg := prometheus.NewRegistry() + + // The sharding strategy filter MUST be before the ones we create here (order matters). + filters := []block.MetadataFilter{ + newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin), + // Use our own custom implementation. + NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksInStoreGatewayDelay, u.cfg.BucketStore.MetaSyncConcurrency), + // The duplicate filter has been intentionally omitted because it could cause troubles with + // the consistency check done on the querier. The duplicate filter removes redundant blocks + // but if the store-gateway removes redundant blocks before the querier discovers them, the + // consistency check on the querier will fail. + } + fetcher := NewBucketIndexMetadataFetcher( + userID, + u.bucket, + u.limits, + u.logger, + fetcherReg, + filters, + ) + bucketStoreOpts := []BucketStoreOption{ + WithLogger(userLogger), + WithIndexCache(u.indexCache), + WithQueryGate(u.queryGate), + WithLazyLoadingGate(u.lazyLoadingGate), + } + + bs, err := NewBucketStore( + userID, + userBkt, + fetcher, + u.syncDirForUser(userID), + u.cfg.BucketStore, + worstCaseFetchedDataStrategy{postingListActualSizeFactor: u.cfg.BucketStore.SeriesFetchPreference}, + NewChunksLimiterFactory(func() uint64 { + return uint64(u.limits.MaxChunksPerQuery(userID)) + }), + NewSeriesLimiterFactory(func() uint64 { + return uint64(u.limits.MaxFetchedSeriesPerQuery(userID)) + }), + u.partitioners, + u.seriesHashCache, + u.bucketStoreMetrics, + bucketStoreOpts..., + ) + if err != nil { + return nil, err + } + if err = services.StartAndAwaitRunning(ctx, bs); err != nil { + return nil, fmt.Errorf("starting bucket store for tenant %s: %w", userID, err) + } + + u.stores[userStoreKey] = bs + u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg) + + return bs, nil +} + func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*BucketStore, error) { // Check if the store already exists. bs := u.getStore(userID)