Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

[WIP] store-gateway-binpacking-hackathon-scratch #10800

Draft
wants to merge 1 commit into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
116 changes: 116 additions & 0 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -296,6 +296,122 @@ func (s *BucketStore) Stats() BucketStoreStats {
}
}

// TenantIndexHeaderMap is a thread-safe nested map structure.
// This type serves as a synchronized map[string]map[ulid.ULID]objstore.ObjectAttributes.
// The objstore.ObjectAttributes are used to get the index header size.
type TenantIndexHeaderMap struct {
blockIndexHeaderMap sync.Map // map[string]sync.Map
}

func (t *TenantIndexHeaderMap) String() string {
s := "TenantIndexHeaderMap: "
t.blockIndexHeaderMap.Range(func(key, value interface{}) bool {
tenantID := key.(string)
indexHeaders := value.(*sync.Map)
indexHeaders.Range(func(key, value interface{}) bool {
blockID := key.(ulid.ULID)
attrs := value.(objstore.ObjectAttributes)
s += fmt.Sprintf("tenantID: %s, blockID: %s, attrs: %v --- ", tenantID, blockID, attrs)
return true
})
return true
})
return s
}

func (t *TenantIndexHeaderMap) Contains(tenantID string, blockID ulid.ULID) bool {
indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID)
if !ok {
return false
}
indexHeaders := indexHeadersVal.(*sync.Map)
_, ok = indexHeaders.Load(blockID)
return ok
}

func (t *TenantIndexHeaderMap) Add(
tenantID string,
blockID ulid.ULID,
indexHeaderAttrs objstore.ObjectAttributes,
) error {
indexHeadersVal, ok := t.blockIndexHeaderMap.Load(tenantID)
if !ok {
newIndexHeaders := &sync.Map{} // map[ulid.ULID]objstore.ObjectAttributes
newIndexHeaders.Store(blockID, indexHeaderAttrs)
t.blockIndexHeaderMap.Store(tenantID, newIndexHeaders)
return nil
}
indexHeaders := indexHeadersVal.(*sync.Map)
_, ok = indexHeaders.LoadOrStore(blockID, indexHeaderAttrs)
if ok {
// This should not ever happen.
return fmt.Errorf("block %s already exists in the set", blockID)
}
return nil
}

func (s *BucketStore) DiscoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error {
return s.discoverIndexHeaders(ctx, tenantIndexHeaderMap)
}

func (s *BucketStore) discoverIndexHeaders(ctx context.Context, tenantIndexHeaderMap *TenantIndexHeaderMap) error {
metas, _, metaFetchErr := s.fetcher.Fetch(ctx)
// For partial view allow adding new blocks at least.
if metaFetchErr != nil && metas == nil {
return metaFetchErr
}

errGroup, ctx := errgroup.WithContext(ctx)
blockc := make(chan *block.Meta)

for i := 0; i < s.blockSyncConcurrency; i++ {
errGroup.Go(func() error {
for meta := range blockc {
attrs, err := s.blockIndexHeaderAttrs(ctx, meta.ULID)
if err != nil {
return err
}
err = tenantIndexHeaderMap.Add(s.userID, meta.ULID, attrs)
if err != nil {
return err
}
}
return nil
})
}

for blockID, meta := range metas {
if tenantIndexHeaderMap.Contains(s.userID, blockID) {
continue
}
select {
case <-ctx.Done():
case blockc <- meta:
}
}

close(blockc)
err := errGroup.Wait()
if err != nil {
return err
}

if metaFetchErr != nil {
return metaFetchErr
}

return nil
}

func (s *BucketStore) blockIndexHeaderAttrs(ctx context.Context, blockID ulid.ULID) (objstore.ObjectAttributes, error) {
indexFilepath := filepath.Join(blockID.String(), block.IndexFilename)
attrs, err := s.bkt.Attributes(ctx, indexFilepath)
if err != nil {
err = errors.Wrapf(err, "get object attributes of %s", indexFilepath)
}
return attrs, nil
}

// SyncBlocks synchronizes the stores state with the Bucket bucket.
// It will reuse disk space as persistent cache based on s.dir param.
func (s *BucketStore) SyncBlocks(ctx context.Context) error {
Expand Down
229 changes: 225 additions & 4 deletions pkg/storegateway/bucket_stores.go
Original file line number Diff line number Diff line change
Expand Up @@ -192,24 +192,79 @@ func (u *BucketStores) stopBucketStores(error) error {

// initialSync does an initial synchronization of blocks for all users.
func (u *BucketStores) initialSync(ctx context.Context) error {
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
start := time.Now()
level.Info(u.logger).Log("msg", "discovering TSDB index headers for all users")
tenantBlockIndexHeaders, err := u.DiscoverAllIndexHeaders(ctx)
if err != nil {
level.Warn(u.logger).Log("msg", "failed to discover TSDB index headers", "err", err)
return fmt.Errorf("initial discovery with bucket: %w", err)
}

level.Info(u.logger).Log(
"msg", "successfully discovered TSDB index headers for all users",
"tenantBlockIndexHeaders", tenantBlockIndexHeaders.String(),
"duration", time.Since(start).String(),
)

start = time.Now()
level.Info(u.logger).Log("msg", "synchronizing TSDB blocks for all users")
if err := u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error {
return store.InitialSync(ctx)
}); err != nil {
level.Warn(u.logger).Log("msg", "failed to synchronize TSDB blocks", "err", err)
return fmt.Errorf("initial synchronisation with bucket: %w", err)
}

level.Info(u.logger).Log("msg", "successfully synchronized TSDB blocks for all users")
level.Info(u.logger).Log(
"msg", "successfully synchronized TSDB blocks for all users",
"duration", time.Since(start).String(),
)
return nil
}

// SyncBlocks synchronizes the stores state with the Bucket store for every user.
func (u *BucketStores) SyncBlocks(ctx context.Context) error {
return u.syncUsersBlocksWithRetries(ctx, func(ctx context.Context, store *BucketStore) error {
f := func(ctx context.Context, store *BucketStore) error {
return store.SyncBlocks(ctx)
})
}
return u.syncUsersBlocksWithRetries(ctx, f)
}

func (u *BucketStores) DiscoverAllIndexHeaders(ctx context.Context) (*TenantIndexHeaderMap, error) {
tenantBlockIndexHeaders := &TenantIndexHeaderMap{}
discoverFunc := func(ctx context.Context, store *BucketStore) error {
return store.DiscoverIndexHeaders(ctx, tenantBlockIndexHeaders)
}
err := u.discoverAllIndexHeadersWithRetries(ctx, discoverFunc)
return tenantBlockIndexHeaders, err
}

func (u *BucketStores) discoverAllIndexHeadersWithRetries(
ctx context.Context,
discoverFunc func(context.Context, *BucketStore) error,
) error {
retries := backoff.New(ctx, u.syncBackoffConfig)

var lastErr error
for retries.Ongoing() {
userIDs, err := u.allowedUsers(ctx)
if err != nil {
retries.Wait()
continue
}
lastErr = u.discoverAllowedTenantsIndexHeaders(ctx, userIDs, discoverFunc)
if lastErr == nil {
return nil
}

retries.Wait()
}

if lastErr == nil {
return retries.Err()
}

return lastErr
}

func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(context.Context, *BucketStore) error) error {
Expand Down Expand Up @@ -237,6 +292,15 @@ func (u *BucketStores) syncUsersBlocksWithRetries(ctx context.Context, f func(co
return lastErr
}

func (u *BucketStores) allowedUsers(ctx context.Context) ([]string, error) {
userIDs, err := u.scanUsers(ctx)
if err != nil {
return nil, err
}
u.tenantsDiscovered.Set(float64(len(userIDs)))
return userIDs, nil
}

func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) {
userIDs, err := u.scanUsers(ctx)
if err != nil {
Expand All @@ -252,6 +316,83 @@ func (u *BucketStores) ownedUsers(ctx context.Context) ([]string, error) {
return ownedUserIDs, nil
}

func (u *BucketStores) discoverAllowedTenantsIndexHeaders(
ctx context.Context,
allowedTenantIDs []string,
discoverFunc func(context.Context, *BucketStore) error,
) (returnErr error) {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
if returnErr == nil {
u.syncLastSuccess.SetToCurrentTime()
}
}(time.Now())

type job struct {
tenantID string
store *BucketStore
}

wg := &sync.WaitGroup{}
jobs := make(chan job)
errs := tsdb_errors.NewMulti()
errsMx := sync.Mutex{}

u.tenantsSynced.Set(float64(len(allowedTenantIDs)))

// Create a pool of workers which will synchronize blocks. The pool size
// is limited in order to avoid to concurrently sync a lot of tenants in
// a large cluster.
for i := 0; i < u.cfg.BucketStore.TenantSyncConcurrency; i++ {
wg.Add(1)
go func() {
defer wg.Done()

for job := range jobs {
if err := discoverFunc(ctx, job.store); err != nil {
errsMx.Lock()
errs.Add(errors.Wrapf(err, "failed to discover TSDB index header for user %s", job.tenantID))
errsMx.Unlock()
}
}
}()
}

// Lazily create a bucket store for each new user found
// and submit a sync job for each user.
for _, userID := range allowedTenantIDs {
bs, err := u.getOrCreateStoreNoShardingFilter(ctx, userID)
if err != nil {
errsMx.Lock()
errs.Add(err)
errsMx.Unlock()

continue
}

select {
case jobs <- job{tenantID: userID, store: bs}:
// Nothing to do. Will loop to push more jobs.
case <-ctx.Done():
// Wait until all workers have done, so the goroutines leak detector doesn't
// report any issue. This is expected to be quick, considering the done ctx
// is used by the worker callback function too.
close(jobs)
wg.Wait()

return ctx.Err()
}
}

// Wait until all workers completed.
close(jobs)
wg.Wait()

//u.closeBucketStoreAndDeleteLocalFilesForExcludedTenants(UserIDs)

return errs.Err()
}

func (u *BucketStores) syncUsersBlocks(ctx context.Context, includeUserIDs []string, f func(context.Context, *BucketStore) error) (returnErr error) {
defer func(start time.Time) {
u.syncTimes.Observe(time.Since(start).Seconds())
Expand Down Expand Up @@ -477,6 +618,86 @@ func (t timeoutGate) Done() {
t.delegate.Done()
}

func (u *BucketStores) getOrCreateStoreNoShardingFilter(ctx context.Context, userID string) (*BucketStore, error) {
userStoreKey := userID + "-nofilter"
// Check if the store already exists.
bs := u.getStore(userStoreKey)
if bs != nil {
return bs, nil
}

u.storesMu.Lock()
defer u.storesMu.Unlock()

// Check again for the store in the event it was created in-between locks.
bs = u.stores[userStoreKey]
if bs != nil {
return bs, nil
}

userLogger := util_log.WithUserID(userID, u.logger)

level.Info(userLogger).Log("msg", "creating user bucket store")

userBkt := bucket.NewUserBucketClient(userID, u.bucket, u.limits)
fetcherReg := prometheus.NewRegistry()

// The sharding strategy filter MUST be before the ones we create here (order matters).
filters := []block.MetadataFilter{
newMinTimeMetaFilter(u.cfg.BucketStore.IgnoreBlocksWithin),
// Use our own custom implementation.
NewIgnoreDeletionMarkFilter(userLogger, userBkt, u.cfg.BucketStore.IgnoreDeletionMarksInStoreGatewayDelay, u.cfg.BucketStore.MetaSyncConcurrency),
// The duplicate filter has been intentionally omitted because it could cause troubles with
// the consistency check done on the querier. The duplicate filter removes redundant blocks
// but if the store-gateway removes redundant blocks before the querier discovers them, the
// consistency check on the querier will fail.
}
fetcher := NewBucketIndexMetadataFetcher(
userID,
u.bucket,
u.limits,
u.logger,
fetcherReg,
filters,
)
bucketStoreOpts := []BucketStoreOption{
WithLogger(userLogger),
WithIndexCache(u.indexCache),
WithQueryGate(u.queryGate),
WithLazyLoadingGate(u.lazyLoadingGate),
}

bs, err := NewBucketStore(
userID,
userBkt,
fetcher,
u.syncDirForUser(userID),
u.cfg.BucketStore,
worstCaseFetchedDataStrategy{postingListActualSizeFactor: u.cfg.BucketStore.SeriesFetchPreference},
NewChunksLimiterFactory(func() uint64 {
return uint64(u.limits.MaxChunksPerQuery(userID))
}),
NewSeriesLimiterFactory(func() uint64 {
return uint64(u.limits.MaxFetchedSeriesPerQuery(userID))
}),
u.partitioners,
u.seriesHashCache,
u.bucketStoreMetrics,
bucketStoreOpts...,
)
if err != nil {
return nil, err
}
if err = services.StartAndAwaitRunning(ctx, bs); err != nil {
return nil, fmt.Errorf("starting bucket store for tenant %s: %w", userID, err)
}

u.stores[userStoreKey] = bs
u.metaFetcherMetrics.AddUserRegistry(userID, fetcherReg)

return bs, nil
}

func (u *BucketStores) getOrCreateStore(ctx context.Context, userID string) (*BucketStore, error) {
// Check if the store already exists.
bs := u.getStore(userID)
Expand Down
Loading