diff --git a/ring/ring.go b/ring/ring.go index 947f3290f..0c54bb1c5 100644 --- a/ring/ring.go +++ b/ring/ring.go @@ -75,6 +75,9 @@ type ReadRing interface { // CleanupShuffleShardCache should delete cached shuffle-shard subrings for given identifier. CleanupShuffleShardCache(identifier string) + + // GetTokenRangesForInstance returns the token ranges owned by an instance in the ring + GetTokenRangesForInstance(instanceID string) (TokenRanges, error) } var ( @@ -360,6 +363,26 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, return ReplicationSet{}, ErrEmptyRing } + instances, err := r.findInstancesForKey(key, op, bufDescs, bufHosts, bufZones, nil) + if err != nil { + return ReplicationSet{}, err + } + + healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) + if err != nil { + return ReplicationSet{}, err + } + + return ReplicationSet{ + Instances: healthyInstances, + MaxErrors: maxFailure, + }, nil +} + +// Returns instances for given key and operation. Instances are not filtered through ReplicationStrategy. +// InstanceFilter can ignore uninteresting instances that would otherwise be part of the output, and can also stop search early. +// This function needs to be called with read lock on the ring. +func (r *Ring) findInstancesForKey(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts []string, bufZones []string, instanceFilter func(instanceID string) (include, keepGoing bool)) ([]InstanceDesc, error) { var ( n = r.cfg.ReplicationFactor instances = bufDescs[:0] @@ -382,7 +405,7 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, info, ok := r.ringInstanceByToken[token] if !ok { // This should never happen unless a bug in the ring code. - return ReplicationSet{}, ErrInconsistentTokensInfo + return nil, ErrInconsistentTokensInfo } // We want n *distinct* instances && distinct zones. @@ -410,18 +433,18 @@ func (r *Ring) Get(key uint32, op Operation, bufDescs []InstanceDesc, bufHosts, distinctZones = append(distinctZones, info.Zone) } - instances = append(instances, instance) - } - - healthyInstances, maxFailure, err := r.strategy.Filter(instances, op, r.cfg.ReplicationFactor, r.cfg.HeartbeatTimeout, r.cfg.ZoneAwarenessEnabled) - if err != nil { - return ReplicationSet{}, err + include, keepGoing := true, true + if instanceFilter != nil { + include, keepGoing = instanceFilter(info.InstanceID) + } + if include { + instances = append(instances, instance) + } + if !keepGoing { + break + } } - - return ReplicationSet{ - Instances: healthyInstances, - MaxErrors: maxFailure, - }, nil + return instances, nil } // GetAllHealthy implements ReadRing. @@ -1078,3 +1101,36 @@ func (op Operation) ShouldExtendReplicaSetOnState(s InstanceState) bool { // All states are healthy, no states extend replica set. var allStatesRingOperation = Operation(0x0000ffff) + +// numberOfKeysOwnedByInstance returns how many of the supplied keys are owned by given instance. +func (r *Ring) numberOfKeysOwnedByInstance(keys []uint32, op Operation, instanceID string, bufDescs []InstanceDesc, bufHosts []string, bufZones []string) (int, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + if r.ringDesc == nil || len(r.ringTokens) == 0 { + return 0, ErrEmptyRing + } + + // Instance is not in this ring, it can't own any key. + if _, ok := r.ringDesc.Ingesters[instanceID]; !ok { + return 0, nil + } + + owned := 0 + for _, tok := range keys { + i, err := r.findInstancesForKey(tok, op, bufDescs, bufHosts, bufZones, func(foundInstanceID string) (include, keepGoing bool) { + if foundInstanceID == instanceID { + // If we've found our instance, we can stop. + return true, false + } + return false, true + }) + if err != nil { + return 0, err + } + if len(i) > 0 { + owned++ + } + } + return owned, nil +} diff --git a/ring/ring_test.go b/ring/ring_test.go index e3fcf0990..940d0d1bd 100644 --- a/ring/ring_test.go +++ b/ring/ring_test.go @@ -1257,7 +1257,7 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { ) // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), numInstances, numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, @@ -1290,6 +1290,12 @@ func TestRing_ShuffleShard_Stability(t *testing.T) { } } +func initTokenGenerator(t testing.TB) TokenGenerator { + seed := time.Now().UnixNano() + t.Log("token generator seed:", seed) + return NewRandomTokenGeneratorWithSeed(seed) +} + func TestRing_ShuffleShard_Shuffling(t *testing.T) { var ( numTenants = 1000 @@ -1423,8 +1429,9 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { for _, s := range scenarios { t.Run(s.name, func(t *testing.T) { + gen := initTokenGenerator(t) // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(s.numInstances, s.numZones, 128)} + ringDesc := &Desc{Ingesters: generateRingInstances(gen, s.numInstances, s.numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, @@ -1449,7 +1456,7 @@ func TestRing_ShuffleShard_Consistency(t *testing.T) { // Update the ring. switch s.ringChange { case add: - newID, newDesc := generateRingInstance(s.numInstances+1, 0, 128) + newID, newDesc := generateRingInstance(gen, s.numInstances+1, 0, 128) ringDesc.Ingesters[newID] = newDesc case remove: // Remove the first one. @@ -1483,7 +1490,7 @@ func TestRing_ShuffleShard_ConsistencyOnShardSizeChanged(t *testing.T) { // Create 30 instances in 3 zones. ringInstances := map[string]InstanceDesc{} for i := 0; i < 30; i++ { - name, desc := generateRingInstance(i, i%3, 128) + name, desc := generateRingInstance(initTokenGenerator(t), i, i%3, 128) ringInstances[name] = desc } @@ -1560,7 +1567,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { // Create 20 instances in 2 zones. ringInstances := map[string]InstanceDesc{} for i := 0; i < 20; i++ { - name, desc := generateRingInstance(i, i%2, 128) + name, desc := generateRingInstance(initTokenGenerator(t), i, i%2, 128) ringInstances[name] = desc } @@ -1599,7 +1606,7 @@ func TestRing_ShuffleShard_ConsistencyOnZonesChanged(t *testing.T) { // Scale up cluster, adding 10 instances in 1 new zone. for i := 20; i < 30; i++ { - name, desc := generateRingInstance(i, 2, 128) + name, desc := generateRingInstance(initTokenGenerator(t), i, 2, 128) ringInstances[name] = desc } @@ -1934,7 +1941,7 @@ func TestRing_ShuffleShardWithLookback_CorrectnessWithFuzzy(t *testing.T) { t.Log("random generator seed:", seed) // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, 128)} + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), numInstances, numZones, 128)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, @@ -2570,7 +2577,7 @@ func BenchmarkRing_ShuffleShard_LargeShardSize(b *testing.B) { func benchmarkShuffleSharding(b *testing.B, numInstances, numZones, numTokens, shardSize int, cache bool) { // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(numInstances, numZones, numTokens)} + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), numInstances, numZones, numTokens)} ring := Ring{ cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: !cache}, ringDesc: ringDesc, @@ -2620,7 +2627,7 @@ func BenchmarkRing_Get(b *testing.B) { for benchName, benchCase := range benchCases { // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(benchCase.numInstances, benchCase.numZones, numTokens)} + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), benchCase.numInstances, benchCase.numZones, numTokens)} ring := Ring{ cfg: Config{ HeartbeatTimeout: time.Hour, @@ -2658,7 +2665,7 @@ func BenchmarkRing_Get(b *testing.B) { func TestRing_Get_NoMemoryAllocations(t *testing.T) { // Initialise the ring. - ringDesc := &Desc{Ingesters: generateRingInstances(3, 3, 128)} + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), 3, 3, 128)} ring := Ring{ cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: 3}, ringDesc: ringDesc, @@ -2697,22 +2704,22 @@ func generateTokensLinear(instanceID, numInstances, numTokens int) []uint32 { return tokens } -func generateRingInstances(numInstances, numZones, numTokens int) map[string]InstanceDesc { +func generateRingInstances(gen TokenGenerator, numInstances, numZones, numTokens int) map[string]InstanceDesc { instances := make(map[string]InstanceDesc, numInstances) for i := 1; i <= numInstances; i++ { - id, desc := generateRingInstance(i, i%numZones, numTokens) + id, desc := generateRingInstance(gen, i, i%numZones, numTokens) instances[id] = desc } return instances } -func generateRingInstance(id, zone, numTokens int) (string, InstanceDesc) { +func generateRingInstance(gen TokenGenerator, id, zone, numTokens int) (string, InstanceDesc) { instanceID := fmt.Sprintf("instance-%d", id) zoneID := fmt.Sprintf("zone-%d", zone) - return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, GenerateTokens(numTokens, nil), time.Now()) + return instanceID, generateRingInstanceWithInfo(instanceID, zoneID, gen.GenerateTokens(numTokens, nil), time.Now()) } func generateRingInstanceWithInfo(addr, zone string, tokens []uint32, registeredAt time.Time) InstanceDesc { diff --git a/ring/token_generator.go b/ring/token_generator.go index 159d9ffd6..93f029954 100644 --- a/ring/token_generator.go +++ b/ring/token_generator.go @@ -3,6 +3,7 @@ package ring import ( "math/rand" "sort" + "sync" "time" ) @@ -21,10 +22,17 @@ type TokenGenerator interface { CanJoinEnabled() bool } -type RandomTokenGenerator struct{} +type RandomTokenGenerator struct { + m sync.Mutex + r *rand.Rand +} func NewRandomTokenGenerator() *RandomTokenGenerator { - return &RandomTokenGenerator{} + return &RandomTokenGenerator{r: rand.New(rand.NewSource(time.Now().UnixNano()))} +} + +func NewRandomTokenGeneratorWithSeed(seed int64) *RandomTokenGenerator { + return &RandomTokenGenerator{r: rand.New(rand.NewSource(seed))} } // GenerateTokens generates at most requestedTokensCount unique random tokens, none of which clashes with @@ -35,8 +43,6 @@ func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTaken return []uint32{} } - r := rand.New(rand.NewSource(time.Now().UnixNano())) - used := make(map[uint32]bool, len(allTakenTokens)) for _, v := range allTakenTokens { used[v] = true @@ -44,7 +50,10 @@ func (t *RandomTokenGenerator) GenerateTokens(requestedTokensCount int, allTaken tokens := make([]uint32, 0, requestedTokensCount) for i := 0; i < requestedTokensCount; { - candidate := r.Uint32() + t.m.Lock() + candidate := t.r.Uint32() + t.m.Unlock() + if used[candidate] { continue } diff --git a/ring/token_range.go b/ring/token_range.go new file mode 100644 index 000000000..232ae24ea --- /dev/null +++ b/ring/token_range.go @@ -0,0 +1,158 @@ +package ring + +import ( + "math" + + "github.com/pkg/errors" + "golang.org/x/exp/slices" // using exp/slices until moving to go 1.21. +) + +// TokenRanges describes token ranges owned by an instance. +// It consists of [start, end] pairs, where both start and end are inclusive. +type TokenRanges []uint32 + +func (tr TokenRanges) IncludesKey(key uint32) bool { + switch { + case len(tr) == 0: + return false + case key < tr[0]: + // key comes before the first range + return false + case key > tr[len(tr)-1]: + // key comes after the last range + return false + } + + index, found := slices.BinarySearch(tr, key) + switch { + case found: + // ranges are closed + return true + case index%2 == 1: + // hash would be inserted after the start of a range (even index) + return true + default: + return false + } +} + +// GetTokenRangesForInstance returns the token ranges owned by an instance in the ring. +// +// Current implementation only works with multizone setup, where number of zones is equal to replication factor. +func (r *Ring) GetTokenRangesForInstance(instanceID string) (TokenRanges, error) { + r.mtx.RLock() + defer r.mtx.RUnlock() + + instance, ok := r.ringDesc.Ingesters[instanceID] + if !ok { + return nil, ErrInstanceNotFound + } + if instance.Zone == "" { + return nil, errors.New("zone not set") + } + + rf := r.cfg.ReplicationFactor + numZones := len(r.ringTokensByZone) + + // To simplify computation of token ranges, we currently only support case where zone-awareness is enabled, + // and replicaction factor is equal to number of zones. + if !r.cfg.ZoneAwarenessEnabled || rf != numZones { + // if zoneAwareness is disabled we need to treat the whole ring as one big zone, and we would + // need to walk the ring backwards looking for RF-1 tokens from other instances to determine the range. + return nil, errors.New("can't use ring configuration for computing token ranges") + } + + // at this point zone-aware replication is enabled, and rf == numZones + // this means that we will write to one replica in each zone, so we can just consider the zonal ring for our instance + subringTokens, ok := r.ringTokensByZone[instance.Zone] + if !ok || len(subringTokens) == 0 { + return nil, errors.New("no tokens for zone") + } + + ranges := make([]uint32, 0, 2*(len(instance.Tokens)+1)) // 1 range (2 values) per token + one additional if we need to split the rollover range + + tokenInfo, ok := r.ringInstanceByToken[subringTokens[0]] + if !ok { + // This should never happen unless there's a bug in the ring code. + return nil, ErrInconsistentTokensInfo + } + firstTokenOwned := tokenInfo.InstanceID == instanceID + + addMaxSingleton := false + currIndex := 0 + rangeStart := uint32(0) + rangeEnd := uint32(math.MaxUint32) + for { + var token uint32 + // We are looking for the highest token not owned by instanceID, + // to add it as the next sub-range start. + for currIndex < len(subringTokens) { + token = subringTokens[currIndex] + tokenInfo, ok := r.ringInstanceByToken[token] + if !ok { + // This should never happen unless there's a bug in the ring code. + return nil, ErrInconsistentTokensInfo + } + if tokenInfo.InstanceID == instanceID { + break + } + rangeStart = token + currIndex++ + } + + if currIndex == len(subringTokens) { + if firstTokenOwned { + // If we reach the end of the ring without finding other tokens owned by instanceID, + // another sub-range is added if the first token is owned by instanceID. + ranges = append(ranges, rangeStart, math.MaxUint32) + } + break + } + + // At this point we have the next sub-range start, and we are looking for the next sub-range end. + if token != 0 { + rangeEnd = token - 1 + } + currIndex++ + // We are looking for the highest token owned by instanceID, + // to add it as the next sub-range end. + for currIndex < len(subringTokens) { + token = subringTokens[currIndex] + tokenInfo, ok := r.ringInstanceByToken[token] + if !ok { + // This should never happen unless there's a bug in the ring code. + return nil, ErrInconsistentTokensInfo + } + if tokenInfo.InstanceID != instanceID { + break + } + rangeEnd = token - 1 + currIndex++ + } + + // If we reached the end of the ring having only the tokens owned by instanceID, + // we add the last sub-range and terminate. + if currIndex == len(subringTokens) { + if firstTokenOwned { + rangeEnd = math.MaxUint32 + } + ranges = append(ranges, rangeStart, rangeEnd) + break + } + + // At this point we have the next sub-range end. + if rangeEnd == math.MaxUint32 { + addMaxSingleton = true + } else { + ranges = append(ranges, rangeStart, rangeEnd) + } + rangeStart = token + currIndex++ + } + + if addMaxSingleton && ranges[len(ranges)-1] != math.MaxUint32 { + ranges = append(ranges, math.MaxUint32, math.MaxUint32) + } + + return ranges, nil +} diff --git a/ring/token_range_test.go b/ring/token_range_test.go new file mode 100644 index 000000000..b640b12b0 --- /dev/null +++ b/ring/token_range_test.go @@ -0,0 +1,327 @@ +package ring + +import ( + "fmt" + "math" + "testing" + "time" + + "github.com/stretchr/testify/assert" + "github.com/stretchr/testify/require" +) + +func TestKeyInTokenRanges(t *testing.T) { + ranges := TokenRanges{4, 8, 12, 16} + + require.False(t, ranges.IncludesKey(0)) + require.True(t, ranges.IncludesKey(4)) + require.True(t, ranges.IncludesKey(6)) + require.True(t, ranges.IncludesKey(8)) + require.False(t, ranges.IncludesKey(10)) + require.False(t, ranges.IncludesKey(20)) + + ranges = TokenRanges{0, 4, 8, 12, 16, 18} + + require.True(t, ranges.IncludesKey(0)) + require.True(t, ranges.IncludesKey(4)) + require.False(t, ranges.IncludesKey(6)) + require.True(t, ranges.IncludesKey(8)) + require.True(t, ranges.IncludesKey(10)) + require.False(t, ranges.IncludesKey(20)) +} + +func TestGetTokenRangesForInstance(t *testing.T) { + numZones := 3 + + tests := map[string]struct { + zoneTokens map[string][]uint32 + expected map[string]TokenRanges + }{ + "single instance in zone": { + zoneTokens: map[string][]uint32{ + "instance-0-0": GenerateTokens(512, nil), + }, + expected: map[string]TokenRanges{ + "instance-0-0": {0, math.MaxUint32}, + }, + }, + "simple ranges": { + zoneTokens: map[string][]uint32{ + "instance-0-0": {0, 25, 75}, + "instance-0-1": {10, 50, 100}, + }, + expected: map[string]TokenRanges{ + "instance-0-0": {10, 24, 50, 74, 100, math.MaxUint32}, + "instance-0-1": {0, 9, 25, 49, 75, 99}, + }, + }, + "mixed ranges": { + zoneTokens: map[string][]uint32{ + "instance-0-0": {25, 27, 75}, + "instance-0-1": {10, 26, 50, 100}, + }, + expected: map[string]TokenRanges{ + "instance-0-0": {10, 24, 26, 26, 50, 74}, + "instance-0-1": {0, 9, 25, 25, 27, 49, 75, math.MaxUint32}, + }, + }, + "grouped tokens": { + zoneTokens: map[string][]uint32{ + "instance-0-0": {10, 20, 30, 40, 50}, + "instance-0-1": {1000, 2000, 3000, 4000}, + }, + expected: map[string]TokenRanges{ + "instance-0-0": {0, 49, 4000, math.MaxUint32}, + "instance-0-1": {50, 3999}, + }, + }, + "consecutive tokens": { + zoneTokens: map[string][]uint32{ + "instance-0-0": {99}, + "instance-0-1": {100}, + }, + expected: map[string]TokenRanges{ + "instance-0-0": {0, 98, 100, math.MaxUint32}, + "instance-0-1": {99, 99}, + }, + }, + "extremes": { + zoneTokens: map[string][]uint32{ + "instance-0-0": {0}, + "instance-0-1": {math.MaxUint32}, + }, + expected: map[string]TokenRanges{ + "instance-0-0": {math.MaxUint32, math.MaxUint32}, + "instance-0-1": {0, math.MaxUint32 - 1}, + }, + }, + } + + for testName, testData := range tests { + t.Run(testName, func(t *testing.T) { + instances := map[string]InstanceDesc{} + allTokens := []uint32{} + + // generate test zone + for id, tokens := range testData.zoneTokens { + instances[id] = generateRingInstanceWithInfo(id, "zone-0", tokens, time.Now()) + allTokens = append(allTokens, tokens...) + } + + // generate other zones + for z := 1; z < numZones; z++ { + for i := 0; i < len(testData.zoneTokens); i++ { + id := fmt.Sprintf("instance-%d-%d", z, i) + tokens := GenerateTokens(512, allTokens) + instances[id] = generateRingInstanceWithInfo(id, fmt.Sprintf("zone-%d", z), tokens, time.Now()) + allTokens = append(allTokens, tokens...) + } + } + + // Initialise the ring. + ringDesc := &Desc{Ingesters: instances} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), + } + + for id, exp := range testData.expected { + ranges, err := ring.GetTokenRangesForInstance(id) + require.NoError(t, err) + assert.Equal(t, exp, ranges) + + // validate that the endpoints of the ranges map to the expected instances + for _, token := range ranges { + zoneTokens := ring.ringTokensByZone["zone-0"] + i := searchToken(zoneTokens, token) + assert.Equal(t, id, ring.ringInstanceByToken[zoneTokens[i]].InstanceID) + } + } + }) + } +} + +func BenchmarkGetTokenRangesForInstance(b *testing.B) { + instancesPerZone := []int{1, 3, 9, 27, 81, 243, 729} + + for _, n := range instancesPerZone { + b.Run(fmt.Sprintf("%d_instancesperzone", n), func(b *testing.B) { + benchmarkGetTokenRangesForInstance(b, n) + }) + } +} + +func benchmarkGetTokenRangesForInstance(b *testing.B, instancesPerZone int) { + numZones := 3 + numTokens := 512 + + gen := initTokenGenerator(b) + + b.ResetTimer() + + for n := 0; n < b.N; n++ { + b.StopTimer() + // Initialise the ring. + ringDesc := &Desc{Ingesters: generateRingInstances(gen, instancesPerZone*numZones, numZones, numTokens)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: true, ReplicationFactor: numZones}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), + } + b.StartTimer() + + _, _ = ring.GetTokenRangesForInstance("instance-1") + } +} + +func TestCheckingOfKeyOwnership(t *testing.T) { + const instancesPerZone = 100 + const numZones = 3 + const numTokens = 512 + const replicationFactor = numZones // This is the only config supported by GetTokenRangesForInstance right now. + + // Generate users with different number of tokens + userTokens := map[string][]uint32{} + shardSizes := map[string]int{} + for _, cnt := range []int{1000, 5000, 10000, 25000, 50000, 100000, 250000, 500000} { + uid := fmt.Sprintf("%dk", cnt/1000) + userTokens[uid] = GenerateTokens(cnt, nil) + + shardSize := cnt / 7500 + shardSize = (shardSize / numZones) * numZones // round down to numZones + if shardSize < numZones { + shardSize = numZones + } + shardSizes[uid] = shardSize + } + + // Generate ring + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(t), instancesPerZone*numZones, numZones, numTokens)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: replicationFactor}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), + } + + for uid, tokens := range userTokens { + shardSize := shardSizes[uid] + + subRing := ring.ShuffleShard(uid, shardSize) + sr := subRing.(*Ring) + + // find some instance in subring + var instanceID string + for id := range sr.ringDesc.Ingesters { + instanceID = id + break + } + + // Compute owned tokens by using token ranges. + ranges, err := subRing.GetTokenRangesForInstance(instanceID) + require.NoError(t, err) + + cntViaTokens := 0 + for _, t := range tokens { + if ranges.IncludesKey(t) { + cntViaTokens++ + } + } + + // Compute owned tokens using numberOfKeysOwnedByInstance. + bufDescs := make([]InstanceDesc, 5) + bufHosts := make([]string, 5) + bufZones := make([]string, numZones) + + cntViaGet, err := sr.numberOfKeysOwnedByInstance(tokens, WriteNoExtend, instanceID, bufDescs, bufHosts, bufZones) + require.NoError(t, err) + + assert.Equal(t, cntViaTokens, cntViaGet) + } +} + +func BenchmarkCompareCountingOfSeriesViaRingAndTokenRanges(b *testing.B) { + const instancesPerZone = 100 + const numZones = 3 + const numTokens = 512 + const userTokens = 500000 + const userShardsize = 60 + + seriesTokens := initTokenGenerator(b).GenerateTokens(userTokens, nil) + + // Generate ring + ringDesc := &Desc{Ingesters: generateRingInstances(initTokenGenerator(b), instancesPerZone*numZones, numZones, numTokens)} + ring := Ring{ + cfg: Config{HeartbeatTimeout: time.Hour, ZoneAwarenessEnabled: true, SubringCacheDisabled: false, ReplicationFactor: numZones}, + ringDesc: ringDesc, + ringTokens: ringDesc.GetTokens(), + ringTokensByZone: ringDesc.getTokensByZone(), + ringInstanceByToken: ringDesc.getTokensInfo(), + ringZones: getZones(ringDesc.getTokensByZone()), + shuffledSubringCache: map[subringCacheKey]*Ring{}, + strategy: NewDefaultReplicationStrategy(), + lastTopologyChange: time.Now(), + } + + // compute and cache subrings for each user + subRing := ring.ShuffleShard("user", userShardsize) + sr := subRing.(*Ring) + + // find some instance in subring + var instanceID string + for id := range sr.ringDesc.Ingesters { + instanceID = id + break + } + + b.Run("GetTokenRangesForInstance", func(b *testing.B) { + tokenRange, err := subRing.GetTokenRangesForInstance(instanceID) + require.NoError(b, err) + + b.ResetTimer() + for i := 0; i < b.N; i++ { + cntViaTokens := 0 + for _, t := range seriesTokens { + if tokenRange.IncludesKey(t) { + cntViaTokens++ + } + } + if cntViaTokens <= 0 { + b.Fatal("no owned tokens found!") + } + } + }) + + b.Run("numberOfKeysOwnedByInstance", func(b *testing.B) { + bufDescs := make([]InstanceDesc, 5) + bufHosts := make([]string, 5) + bufZones := make([]string, numZones) + + for i := 0; i < b.N; i++ { + cntViaGet, err := sr.numberOfKeysOwnedByInstance(seriesTokens, WriteNoExtend, instanceID, bufDescs, bufHosts, bufZones) + require.NoError(b, err) + + if cntViaGet <= 0 { + b.Fatal("no owned tokens found!") + } + } + }) +} diff --git a/ring/util_test.go b/ring/util_test.go index 7c18364cd..0f4ceb08a 100644 --- a/ring/util_test.go +++ b/ring/util_test.go @@ -67,6 +67,10 @@ func (r *RingMock) HasInstance(_ string) bool { func (r *RingMock) CleanupShuffleShardCache(_ string) {} +func (r *RingMock) GetTokenRangesForInstance(_ string) (TokenRanges, error) { + return []uint32{0, math.MaxUint32}, nil +} + func createStartingRing() *Ring { // Init the ring. ringDesc := &Desc{Ingesters: map[string]InstanceDesc{