Skip to content

Commit

Permalink
store-gateway: avoid unnecessarily writing lazy-loaded blocks snapshot (
Browse files Browse the repository at this point in the history
#10740)

* store-gateway: avoid unnecessarily writing lazy-loaded blocks snapshot

### Summary

This PR introduces two key optimizations to the `indexheader.Snapshotter` component:SHA-1 Checksum Mechanism: Added a checksum-based optimization to avoid persisting unchanged JSON data, reducing unnecessary IOPS.

Simplified Snapshot Format: Changed the snapshot format from `map[ulid.ULID]int64` to `map[ulid.ULID]struct{}`, eliminating unused timestamp data.

### Motivation

In clusters with many tenants, the `Snapshotter` component can generate significant disk I/O by repeatedly writing identical JSON data to disk. This is particularly problematic on systems with low-performance disks. Additionally, the previous implementation stored timestamps that were never actually used in practice.

### Compatibility Considerations

#### Backward Compatibility

This approach maintains backward compatibility. `RestoreLoadedBlocks()`  still reads the old format with timestamps and converts it to the new format. The JSON structure remains the same. This ensures that existing snapshots continue to work without requiring any migration.

### Forward Compatibility

Forward compatibility is maintained because we continue to write the same JSON structure, just with simplified content. The `indexHeaderLastUsedTime` field is still present in the JSON, ensuring that older code can still parse the structure. Since older code only cares about the keys (block IDs) and not the values (timestamps), the change in value type doesn't affect functionality.

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Fix comments

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Use sha256

Signed-off-by: Dimitar Dimitrov <[email protected]>

* Change BlocksLoader to return slice instead of map

Signed-off-by: Dimitar Dimitrov <[email protected]>

---------

Signed-off-by: Dimitar Dimitrov <[email protected]>
  • Loading branch information
dimitarvdimitrov authored Mar 1, 2025
1 parent 0a1509f commit b65fb5a
Show file tree
Hide file tree
Showing 6 changed files with 157 additions and 50 deletions.
4 changes: 2 additions & 2 deletions pkg/storegateway/bucket.go
Original file line number Diff line number Diff line change
Expand Up @@ -383,7 +383,7 @@ func (s *BucketStore) InitialSync(ctx context.Context) error {
return nil
}

func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 {
func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]struct{} {
previouslyLoadedBlocks, err := indexheader.RestoreLoadedBlocks(s.dir)
if err != nil {
level.Warn(s.logger).Log(
Expand All @@ -397,7 +397,7 @@ func (s *BucketStore) tryRestoreLoadedBlocksSet() map[ulid.ULID]int64 {
return previouslyLoadedBlocks
}

func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]int64) {
func (s *BucketStore) loadBlocks(ctx context.Context, blocks map[ulid.ULID]struct{}) {
// This is not happening during a request so we can ignore the stats.
ignoredStats := newSafeQueryStats()
// We ignore the time the block was used because it can only be in the map if it was still loaded before the shutdown
Expand Down
39 changes: 14 additions & 25 deletions pkg/storegateway/bucket_e2e_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -687,49 +687,37 @@ func TestBucketStore_EagerLoading(t *testing.T) {
testCases := map[string]struct {
eagerLoadReaderEnabled bool
expectedEagerLoadedBlocks int
createLoadedBlocksSnapshotFn func([]ulid.ULID) map[ulid.ULID]int64
createLoadedBlocksSnapshotFn func([]ulid.ULID) []ulid.ULID
}{
"block is present in pre-shutdown loaded blocks and eager-loading is disabled": {
eagerLoadReaderEnabled: false,
expectedEagerLoadedBlocks: 0,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
}
return snapshot
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) []ulid.ULID {
return blockIDs
},
},
"block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header during initial sync": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 6,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
}
return snapshot
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) []ulid.ULID {
return blockIDs
},
},
"block is present in pre-shutdown loaded blocks and eager-loading is enabled, loading index header after initial sync": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 6,
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) map[ulid.ULID]int64 {
snapshot := make(map[ulid.ULID]int64)
for _, blockID := range blockIDs {
snapshot[blockID] = time.Now().UnixMilli()
}
return snapshot
createLoadedBlocksSnapshotFn: func(blockIDs []ulid.ULID) []ulid.ULID {
return blockIDs
},
},
"block is not present in pre-shutdown loaded blocks snapshot and eager-loading is enabled": {
eagerLoadReaderEnabled: true,
expectedEagerLoadedBlocks: 0, // although eager loading is enabled, this test will not do eager loading because the block ID is not in the lazy loaded file.
createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) map[ulid.ULID]int64 {
createLoadedBlocksSnapshotFn: func(_ []ulid.ULID) []ulid.ULID {
// let's create a random fake blockID to be stored in lazy loaded headers file
fakeBlockID := ulid.MustNew(ulid.Now(), nil)
// this snapshot will refer to fake block, hence eager load wouldn't be executed for the real block that we test
return map[ulid.ULID]int64{fakeBlockID: time.Now().UnixMilli()}
return []ulid.ULID{fakeBlockID}
},
},
"pre-shutdown loaded blocks snapshot doesn't exist and eager-loading is enabled": {
Expand Down Expand Up @@ -764,7 +752,8 @@ func TestBucketStore_EagerLoading(t *testing.T) {
if testData.createLoadedBlocksSnapshotFn != nil {
// Create the snapshot manually so that we don't rely on the periodic snapshotting.
loadedBlocks := store.store.blockSet.openBlocksULIDs()
staticLoader := staticLoadedBlocks(testData.createLoadedBlocksSnapshotFn(loadedBlocks))
blocksSlice := testData.createLoadedBlocksSnapshotFn(loadedBlocks)
staticLoader := staticLoadedBlocks(blocksSlice)
snapshotter := indexheader.NewSnapshotter(cfg.logger, indexheader.SnapshotterConfig{
PersistInterval: time.Hour,
Path: cfg.tempDir,
Expand Down Expand Up @@ -794,7 +783,7 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) {
cfg.bucketStoreConfig.IndexHeader.EagerLoadingStartupEnabled = true
cfg.bucketStoreConfig.IndexHeader.LazyLoadingIdleTimeout = persistInterval * 3
ctx := context.Background()
readBlocksInSnapshot := func() map[ulid.ULID]int64 {
readBlocksInSnapshot := func() map[ulid.ULID]struct{} {
blocks, err := indexheader.RestoreLoadedBlocks(cfg.tempDir)
assert.NoError(t, err)
return blocks
Expand Down Expand Up @@ -825,9 +814,9 @@ func TestBucketStore_PersistsLazyLoadedBlocks(t *testing.T) {
}, persistInterval*5, persistInterval/2)
}

type staticLoadedBlocks map[ulid.ULID]int64
type staticLoadedBlocks []ulid.ULID

func (b staticLoadedBlocks) LoadedBlocks() map[ulid.ULID]int64 {
func (b staticLoadedBlocks) LoadedBlocks() []ulid.ULID {
return b
}

Expand Down
7 changes: 3 additions & 4 deletions pkg/storegateway/indexheader/reader_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,16 +152,15 @@ func (p *ReaderPool) onLazyReaderClosed(r *LazyBinaryReader) {
delete(p.lazyReaders, r)
}

// LoadedBlocks returns a new map of lazy-loaded block IDs and the last time they were used in milliseconds.
func (p *ReaderPool) LoadedBlocks() map[ulid.ULID]int64 {
func (p *ReaderPool) LoadedBlocks() []ulid.ULID {
p.lazyReadersMx.Lock()
defer p.lazyReadersMx.Unlock()

blocks := make(map[ulid.ULID]int64, len(p.lazyReaders))
blocks := make([]ulid.ULID, 0, len(p.lazyReaders))
for r := range p.lazyReaders {
usedAt := r.usedAt.Load()
if usedAt != 0 {
blocks[r.blockID] = usedAt / int64(time.Millisecond)
blocks = append(blocks, r.blockID)
}
}

Expand Down
3 changes: 2 additions & 1 deletion pkg/storegateway/indexheader/reader_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -138,7 +138,8 @@ func TestReaderPool_LoadedBlocks(t *testing.T) {
lazyReaderEnabled: true,
lazyReaders: map[*LazyBinaryReader]struct{}{&lb: {}},
}
require.Equal(t, map[ulid.ULID]int64{id: usedAt.UnixMilli()}, rp.LoadedBlocks())
loadedBlocks := rp.LoadedBlocks()
require.Equal(t, []ulid.ULID{id}, loadedBlocks)
}

func prepareReaderPool(t *testing.T) (context.Context, string, *filesystem.Bucket, ulid.ULID, *ReaderPoolMetrics) {
Expand Down
49 changes: 43 additions & 6 deletions pkg/storegateway/indexheader/snapshotter.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,6 +5,7 @@ package indexheader
import (
"bytes"
"context"
"crypto/sha256"
"encoding/json"
"fmt"
"os"
Expand Down Expand Up @@ -37,6 +38,11 @@ type Snapshotter struct {
conf SnapshotterConfig

bl BlocksLoader

// lastChecksum stores the checksum of the last persisted JSON data
// to avoid writing the same data repeatedly, reducing IOPS.
// This is useful when running with many tenants on low-performance disks.
lastChecksum [sha256.Size]byte
}

func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader) *Snapshotter {
Expand All @@ -50,7 +56,7 @@ func NewSnapshotter(logger log.Logger, conf SnapshotterConfig, bl BlocksLoader)
}

type BlocksLoader interface {
LoadedBlocks() map[ulid.ULID]int64
LoadedBlocks() []ulid.ULID
}

func (s *Snapshotter) persist(context.Context) error {
Expand All @@ -64,20 +70,42 @@ func (s *Snapshotter) persist(context.Context) error {
}

func (s *Snapshotter) PersistLoadedBlocks() error {
loadedBlocks := s.bl.LoadedBlocks()

// Convert to the format we store on disk for backward compatibility
indexHeaderLastUsedTime := make(map[ulid.ULID]int64, len(loadedBlocks))
for _, blockID := range loadedBlocks {
// We use a constant timestamp since we no longer care about the actual timestamp
indexHeaderLastUsedTime[blockID] = 1
}

snapshot := &indexHeadersSnapshot{
IndexHeaderLastUsedTime: s.bl.LoadedBlocks(),
IndexHeaderLastUsedTime: indexHeaderLastUsedTime,
UserID: s.conf.UserID,
}

data, err := json.Marshal(snapshot)
if err != nil {
return err
}

// The json marshalling is deterministic, so the checksum will be the same for the same map contents.
checksum := sha256.Sum256(data)
if checksum == s.lastChecksum {
level.Debug(s.logger).Log("msg", "skipping persistence of index headers snapshot as data hasn't changed", "user", s.conf.UserID)
return nil
}

finalPath := filepath.Join(s.conf.Path, lazyLoadedHeadersListFileName)
return atomicfs.CreateFile(finalPath, bytes.NewReader(data))
err = atomicfs.CreateFile(finalPath, bytes.NewReader(data))
if err == nil {
// Only update the checksum if the write was successful
s.lastChecksum = checksum
}
return err
}

func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) {
func RestoreLoadedBlocks(directory string) (map[ulid.ULID]struct{}, error) {
var (
snapshot indexHeadersSnapshot
multiErr = multierror.MultiError{}
Expand All @@ -99,11 +127,20 @@ func RestoreLoadedBlocks(directory string) (map[ulid.ULID]int64, error) {
multiErr.Add(fmt.Errorf("removing the lazy-loaded index-header snapshot: %w", err))
}
}
return snapshot.IndexHeaderLastUsedTime, multiErr.Err()

// Snapshots used to be stored with their last-used timestamp. But that wasn't used and lead to constant file churn, so we removed it.
result := make(map[ulid.ULID]struct{}, len(snapshot.IndexHeaderLastUsedTime))
for blockID := range snapshot.IndexHeaderLastUsedTime {
result[blockID] = struct{}{}
}

return result, multiErr.Err()
}

type indexHeadersSnapshot struct {
// IndexHeaderLastUsedTime is map of index header ulid.ULID to timestamp in millisecond.
// IndexHeaderLastUsedTime is map of index header ulid.ULID to the number 1.
// The number used to be the last-used timestamp of each block.
// We keep this format for backward compatibility, but we no longer care about the timestamps.
IndexHeaderLastUsedTime map[ulid.ULID]int64 `json:"index_header_last_used_time"`
UserID string `json:"user_id"`
}
Expand Down
105 changes: 93 additions & 12 deletions pkg/storegateway/indexheader/snapshotter_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -20,13 +20,11 @@ import (
func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) {
tmpDir := t.TempDir()

usedAt := time.Now()
testBlockID := ulid.MustNew(ulid.Now(), rand.Reader)

origBlocks := map[ulid.ULID]int64{
testBlockID: usedAt.UnixMilli(),
}
testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 { return origBlocks })
origBlocks := []ulid.ULID{testBlockID}

testBlocksLoader := testBlocksLoaderFunc(func() []ulid.ULID { return origBlocks })

config := SnapshotterConfig{
Path: tmpDir,
Expand All @@ -42,22 +40,105 @@ func TestSnapshotter_PersistAndRestoreLoadedBlocks(t *testing.T) {
data, err := os.ReadFile(persistedFile)
require.NoError(t, err)

expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":%d},"user_id":"anonymous"}`, testBlockID, usedAt.UnixMilli())
expected := fmt.Sprintf(`{"index_header_last_used_time":{"%s":1},"user_id":"anonymous"}`, testBlockID)
require.JSONEq(t, expected, string(data))

restoredBlocks, err := RestoreLoadedBlocks(config.Path)
require.Equal(t, origBlocks, restoredBlocks)
require.Equal(t, map[ulid.ULID]struct{}{testBlockID: {}}, restoredBlocks)
require.NoError(t, err)
}

func TestSnapshotter_ChecksumOptimization(t *testing.T) {
tmpDir := t.TempDir()

firstBlockID := ulid.MustNew(ulid.Now(), rand.Reader)

origBlocks := []ulid.ULID{firstBlockID}
testBlocksLoader := testBlocksLoaderFunc(func() []ulid.ULID { return origBlocks })

config := SnapshotterConfig{
Path: tmpDir,
UserID: "anonymous",
}

// Create snapshotter and persist data
s := NewSnapshotter(log.NewNopLogger(), config, testBlocksLoader)

err := s.PersistLoadedBlocks()
require.NoError(t, err)

// Verify the content of the file using RestoreLoadedBlocks
restoredBlocks, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
require.Equal(t, map[ulid.ULID]struct{}{firstBlockID: {}}, restoredBlocks, "Restored blocks should match original blocks")

// Get file info after first write
persistedFile := filepath.Join(tmpDir, lazyLoadedHeadersListFileName)
firstStat, err := os.Stat(persistedFile)
require.NoError(t, err)
firstModTime := firstStat.ModTime()

// Wait a moment to ensure modification time would be different if file is written
time.Sleep(10 * time.Millisecond)

// Call persist again with the same data
err = s.PersistLoadedBlocks()
require.NoError(t, err)

// Get file info after second write attempt
secondStat, err := os.Stat(persistedFile)
require.NoError(t, err)
secondModTime := secondStat.ModTime()

// File should not have been modified since the data hasn't changed
require.Equal(t, firstModTime, secondModTime, "File was modified even though data hasn't changed")

// Verify the content has not changed using RestoreLoadedBlocks
restoredBlocksAfterSecondPersist, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
require.Equal(t, map[ulid.ULID]struct{}{firstBlockID: {}}, restoredBlocksAfterSecondPersist, "Restored blocks should match original blocks")

// Now change the data and persist again
secondBlockID := ulid.MustNew(ulid.Now(), rand.Reader)
newBlocks := []ulid.ULID{firstBlockID, secondBlockID}

// Create a new loader with updated data
updatedBlocksLoader := testBlocksLoaderFunc(func() []ulid.ULID { return newBlocks })
s.bl = updatedBlocksLoader

// Wait a moment to ensure modification time would be different if file is written
time.Sleep(10 * time.Millisecond)

// Persist the new data
err = s.PersistLoadedBlocks()
require.NoError(t, err)

// Get file info after third write
thirdStat, err := os.Stat(persistedFile)
require.NoError(t, err)
thirdModTime := thirdStat.ModTime()

// File should have been modified since the data has changed
require.NotEqual(t, secondModTime, thirdModTime, "File was not modified even though data has changed")

// Verify the content has changed using RestoreLoadedBlocks
restoredBlocksAfterThirdPersist, err := RestoreLoadedBlocks(config.Path)
require.NoError(t, err)
expectedBlocks := map[ulid.ULID]struct{}{
firstBlockID: {},
secondBlockID: {},
}
require.Equal(t, expectedBlocks, restoredBlocksAfterThirdPersist, "Restored blocks should match new blocks")
}

func TestSnapshotter_StartStop(t *testing.T) {
t.Run("stop after start", func(t *testing.T) {
tmpDir := t.TempDir()

testBlocksLoader := testBlocksLoaderFunc(func() map[ulid.ULID]int64 {
testBlocksLoader := testBlocksLoaderFunc(func() []ulid.ULID {
// We don't care about the content of the index header in this test.
return map[ulid.ULID]int64{
ulid.MustNew(ulid.Now(), rand.Reader): time.Now().UnixMilli(),
return []ulid.ULID{
ulid.MustNew(ulid.Now(), rand.Reader),
}
})

Expand All @@ -78,8 +159,8 @@ func TestSnapshotter_StartStop(t *testing.T) {
})
}

type testBlocksLoaderFunc func() map[ulid.ULID]int64
type testBlocksLoaderFunc func() []ulid.ULID

func (f testBlocksLoaderFunc) LoadedBlocks() map[ulid.ULID]int64 {
func (f testBlocksLoaderFunc) LoadedBlocks() []ulid.ULID {
return f()
}

0 comments on commit b65fb5a

Please sign in to comment.