Skip to content

Commit

Permalink
userTSDB: Avoid race condition when setting db (#9708)
Browse files Browse the repository at this point in the history
* userTSDB: Avoid race condition when setting db

---------

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 authored Oct 28, 2024
1 parent 11337c6 commit d14d52c
Show file tree
Hide file tree
Showing 2 changed files with 12 additions and 1 deletion.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -56,6 +56,7 @@
* [BUGFIX] Fix issue where active series requests error when encountering a stale posting. #9580
* [BUGFIX] Fix pooling buffer reuse logic when `-distributor.max-request-pool-buffer-size` is set. #9666
* [BUGFIX] Fix issue when using the experimental `-ruler.max-independent-rule-evaluation-concurrency` feature, where the ruler could panic as it updates a running ruleset or shutdowns. #9726
* [BUGFIX] Ingester: Fix race condition in per-tenant TSDB creation. #9708

### Mixin

Expand Down
12 changes: 11 additions & 1 deletion pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -30,6 +30,7 @@ import (
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/services"
"github.com/grafana/dskit/tenant"
"github.com/oklog/ulid"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"
Expand Down Expand Up @@ -2666,6 +2667,14 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
}
userDB.triggerRecomputeOwnedSeries(recomputeOwnedSeriesReasonNewUser)

userDBHasDB := atomic.NewBool(false)
blocksToDelete := func(blocks []*tsdb.Block) map[ulid.ULID]struct{} {
if !userDBHasDB.Load() {
return nil
}
return userDB.blocksToDelete(blocks)
}

oooTW := i.limits.OutOfOrderTimeWindow(userID)
// Create a new user database
db, err := tsdb.Open(udir, userLogger, tsdbPromReg, &tsdb.Options{
Expand All @@ -2680,7 +2689,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
WALSegmentSize: i.cfg.BlocksStorageConfig.TSDB.WALSegmentSizeBytes,
WALReplayConcurrency: walReplayConcurrency,
SeriesLifecycleCallback: userDB,
BlocksToDelete: userDB.blocksToDelete,
BlocksToDelete: blocksToDelete,
EnableExemplarStorage: true, // enable for everyone so we can raise the limit later
MaxExemplars: int64(i.limiter.maxExemplarsPerUser(userID)),
SeriesHashCache: i.seriesHashCache,
Expand Down Expand Up @@ -2721,6 +2730,7 @@ func (i *Ingester) createTSDB(userID string, walReplayConcurrency int) (*userTSD
}

userDB.db = db
userDBHasDB.Store(true)
// We set the limiter here because we don't want to limit
// series during WAL replay.
userDB.limiter = i.limiter
Expand Down

0 comments on commit d14d52c

Please sign in to comment.