Skip to content

Commit

Permalink
Perform initial owned series calculation before starting lifecycler (#…
Browse files Browse the repository at this point in the history
…7087)

* Perform initial owned series calculation before starting lifecycler

* Enable owned series tracking in integration tests

* Enable limiting based on owned series in e2e tests

* Run ingester limits e2e tests with owned series enabled and disabled

* Clean up

* Update CHANGELOG
  • Loading branch information
pr00se authored Jan 16, 2024
1 parent f299ba7 commit 81af14a
Show file tree
Hide file tree
Showing 4 changed files with 208 additions and 183 deletions.
2 changes: 1 addition & 1 deletion CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -164,7 +164,7 @@
* [FEATURE] Add the experimental `-<prefix>.s3.send-content-md5` flag (defaults to `false`) to configure S3 Put Object requests to send a `Content-MD5` header. Setting this flag is not recommended unless your object storage does not support checksums. #6622
* [FEATURE] Distributor: add an experimental flag `-distributor.reusable-ingester-push-worker` that can be used to pre-allocate a pool of workers to be used to send push requests to the ingesters. #6660
* [FEATURE] Distributor: Support enabling of automatically generated name suffixes for metrics ingested via OTLP, through the flag `-distributor.otel-metric-suffixes-enabled`. #6542
* [FEATURE] Ingester: ingester can now track which of the user's series the ingester actually owns according to the ring, and only consider owned series when checking for user series limit. This helps to avoid hitting the user's series limit when scaling up ingesters or changing user's ingester shard size. Feature is currently experimental, and disabled by default. It can be enabled by setting `-ingester.use-ingester-owned-series-for-limits` (to use owned series for limiting). This is currently limited to multi-zone ingester setup, with replication factor being equal to number of zones. #6718
* [FEATURE] Ingester: ingester can now track which of the user's series the ingester actually owns according to the ring, and only consider owned series when checking for user series limit. This helps to avoid hitting the user's series limit when scaling up ingesters or changing user's ingester shard size. Feature is currently experimental, and disabled by default. It can be enabled by setting `-ingester.use-ingester-owned-series-for-limits` (to use owned series for limiting). This is currently limited to multi-zone ingester setup, with replication factor being equal to number of zones. #6718 #7087
* [ENHANCEMENT] Query-frontend: don't treat cancel as an error. #4648
* [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845
* [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879
Expand Down
344 changes: 181 additions & 163 deletions integration/ingester_limits_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,103 +42,112 @@ func TestIngesterGlobalLimits(t *testing.T) {
},
}

ossTests := map[string]bool{
"ownedSeriesService enabled": true,
"ownedSeriesService disabled": false,
}

for testName, testData := range tests {
t.Run(testName, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
)
flags["-ingester.ring.replication-factor"] = "1"
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
flags["-ingester.max-global-series-per-user"] = strconv.Itoa(testData.maxGlobalSeriesPerTenant)
flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(testData.maxGlobalSeriesPerMetric)
flags["-ingester.ring.heartbeat-period"] = "1s"

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags)
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags)
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until ingesters have heartbeated the ring after all ingesters were active,
// in order to update the number of instances. Since we have no metric, we have to
// rely on a ugly sleep.
time.Sleep(2 * time.Second)

now := time.Now()
client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

numSeriesWithSameMetricName := 0
numSeriesTotal := 0
maxErrorsBeforeStop := 100

// Try to push as many series with the same metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _, _ := generateAlternatingSeries(i)("test_limit_per_metric", now, prompb.Label{
Name: "cardinality",
Value: strconv.Itoa(rand.Int()),
})
for ossName, ossEnabled := range ossTests {
t.Run(fmt.Sprintf("%s/%s", testName, ossName), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

res, err := client.Push(series)
flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
)
flags["-ingester.ring.replication-factor"] = "1"
flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize)
flags["-ingester.max-global-series-per-user"] = strconv.Itoa(testData.maxGlobalSeriesPerTenant)
flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(testData.maxGlobalSeriesPerMetric)
flags["-ingester.ring.heartbeat-period"] = "1s"
flags["-ingester.track-ingester-owned-series"] = strconv.FormatBool(ossEnabled)
flags["-ingester.use-ingester-owned-series-for-limits"] = strconv.FormatBool(ossEnabled)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags)
ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags)
ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags)
ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

// Wait until ingesters have heartbeated the ring after all ingesters were active,
// in order to update the number of instances. Since we have no metric, we have to
// rely on a ugly sleep.
time.Sleep(2 * time.Second)

now := time.Now()
client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal++
numSeriesWithSameMetricName++
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
numSeriesWithSameMetricName := 0
numSeriesTotal := 0
maxErrorsBeforeStop := 100

// Try to push as many series with the same metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _, _ := generateAlternatingSeries(i)("test_limit_per_metric", now, prompb.Label{
Name: "cardinality",
Value: strconv.Itoa(rand.Int()),
})

res, err := client.Push(series)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal++
numSeriesWithSameMetricName++
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
}
}
}
}

// Try to push as many series with the different metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _, _ := generateAlternatingSeries(i)(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now)
res, err := client.Push(series)
require.NoError(t, err)
// Try to push as many series with the different metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _, _ := generateAlternatingSeries(i)(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now)
res, err := client.Push(series)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal++
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
if res.StatusCode == 200 {
numSeriesTotal++
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
}
}
}
}

// We expect the number of series we've been successfully pushed to be around
// the limit. Due to how the global limit implementation works (lack of centralised
// coordination) the actual number of written series could be slightly different
// than the global limit, so we allow a 10% difference.
delta := 0.1
assert.InDelta(t, testData.maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(testData.maxGlobalSeriesPerMetric)*delta)
assert.InDelta(t, testData.maxGlobalSeriesPerTenant, numSeriesTotal, float64(testData.maxGlobalSeriesPerTenant)*delta)

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester1)
assertServiceMetricsPrefixes(t, Ingester, ingester2)
assertServiceMetricsPrefixes(t, Ingester, ingester3)
})
// We expect the number of series we've been successfully pushed to be around
// the limit. Due to how the global limit implementation works (lack of centralised
// coordination) the actual number of written series could be slightly different
// than the global limit, so we allow a 10% difference.
delta := 0.1
assert.InDelta(t, testData.maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(testData.maxGlobalSeriesPerMetric)*delta)
assert.InDelta(t, testData.maxGlobalSeriesPerTenant, numSeriesTotal, float64(testData.maxGlobalSeriesPerTenant)*delta)

// Ensure no service-specific metrics prefix is used by the wrong service.
assertServiceMetricsPrefixes(t, Distributor, distributor)
assertServiceMetricsPrefixes(t, Ingester, ingester1)
assertServiceMetricsPrefixes(t, Ingester, ingester2)
assertServiceMetricsPrefixes(t, Ingester, ingester3)
})
}
}
}

Expand Down Expand Up @@ -173,96 +182,105 @@ overrides:
"histogram series": generateNHistogramSeries,
}

ossTests := map[string]bool{
"ownedSeriesService enabled": true,
"ownedSeriesService disabled": false,
}

for testName, testData := range tests {
for fName, generator := range generators {
t.Run(fmt.Sprintf("%s/%s", testName, fName), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Write blank overrides file, so we can check if they are updated later
require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte{}))

// Start Cortex in single binary mode, reading the config from file.
require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile))

flags := map[string]string{
"-runtime-config.reload-period": "100ms",
"-blocks-storage.backend": "filesystem",
"-blocks-storage.filesystem.dir": "/tmp",
"-blocks-storage.storage-prefix": "blocks",
"-ruler-storage.backend": "filesystem",
"-ruler-storage.local.directory": "/tmp", // Avoid warning "unable to list rules".
"-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, overridesFile),
}
cortex1 := e2emimir.NewSingleBinary("cortex-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095))
require.NoError(t, s.StartAndWaitReady(cortex1))

// Populate the overrides we want, then wait long enough for it to be read.
// (max-exemplars will be taken from config on first sample received)
overrides := buildConfigFromTemplate(overridesTemplate, &testData)
require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte(overrides)))
time.Sleep(500 * time.Millisecond)

now := time.Now()
client, err := e2emimir.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

numSeriesWithSameMetricName := 0
numSeriesTotal := 0
maxErrorsBeforeStop := 1
for ossName, ossEnabled := range ossTests {
t.Run(fmt.Sprintf("%s/%s/%s", testName, fName, ossName), func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

// Write blank overrides file, so we can check if they are updated later
require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte{}))

// Start Cortex in single binary mode, reading the config from file.
require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile))

flags := map[string]string{
"-runtime-config.reload-period": "100ms",
"-blocks-storage.backend": "filesystem",
"-blocks-storage.filesystem.dir": "/tmp",
"-blocks-storage.storage-prefix": "blocks",
"-ruler-storage.backend": "filesystem",
"-ruler-storage.local.directory": "/tmp", // Avoid warning "unable to list rules".
"-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, overridesFile),
"-ingester.track-ingester-owned-series": strconv.FormatBool(ossEnabled),
"-ingester.use-ingester-owned-series-for-limits": strconv.FormatBool(ossEnabled),
}
cortex1 := e2emimir.NewSingleBinary("cortex-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095))
require.NoError(t, s.StartAndWaitReady(cortex1))

// Try to push as many series with the same metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _ := generator(10, 1, func() string { return "test_limit_per_metric" }, now, func() []prompb.Label {
return []prompb.Label{{
Name: "cardinality",
Value: strconv.Itoa(rand.Int()),
}}
})
// Populate the overrides we want, then wait long enough for it to be read.
// (max-exemplars will be taken from config on first sample received)
overrides := buildConfigFromTemplate(overridesTemplate, &testData)
require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte(overrides)))
time.Sleep(500 * time.Millisecond)

res, err := client.Push(series)
now := time.Now()
client, err := e2emimir.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal += 10
numSeriesWithSameMetricName += 10
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
numSeriesWithSameMetricName := 0
numSeriesTotal := 0
maxErrorsBeforeStop := 1

// Try to push as many series with the same metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _ := generator(10, 1, func() string { return "test_limit_per_metric" }, now, func() []prompb.Label {
return []prompb.Label{{
Name: "cardinality",
Value: strconv.Itoa(rand.Int()),
}}
})

res, err := client.Push(series)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal += 10
numSeriesWithSameMetricName += 10
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
}
}
}
}

// Try to push as many series with the different metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _ := generator(10, 1, func() string { return fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()) }, now, nil)
res, err := client.Push(series)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal += 10
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
// Try to push as many series with the different metric name as we can.
for i, errs := 0, 0; i < 10000; i++ {
series, _ := generator(10, 1, func() string { return fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()) }, now, nil)
res, err := client.Push(series)
require.NoError(t, err)

if res.StatusCode == 200 {
numSeriesTotal += 10
} else {
require.Equal(t, 400, res.StatusCode)
if errs++; errs >= maxErrorsBeforeStop {
break
}
}
}
}

// With just one ingester we expect to hit the limit exactly
assert.Equal(t, testData.MaxGlobalSeriesPerMetric, numSeriesWithSameMetricName)
assert.Equal(t, testData.MaxGlobalSeriesPerTenant, numSeriesTotal)
// With just one ingester we expect to hit the limit exactly
assert.Equal(t, testData.MaxGlobalSeriesPerMetric, numSeriesWithSameMetricName)
assert.Equal(t, testData.MaxGlobalSeriesPerTenant, numSeriesTotal)

// Check metrics
metricNumSeries, err := cortex1.SumMetrics([]string{"cortex_ingester_memory_series"})
require.NoError(t, err)
assert.Equal(t, testData.MaxGlobalSeriesPerTenant, int(e2e.SumValues(metricNumSeries)))
metricNumExemplars, err := cortex1.SumMetrics([]string{"cortex_ingester_tsdb_exemplar_exemplars_in_storage"})
require.NoError(t, err)
assert.Equal(t, testData.MaxGlobalExemplarsPerUser, int(e2e.SumValues(metricNumExemplars)))
})
// Check metrics
metricNumSeries, err := cortex1.SumMetrics([]string{"cortex_ingester_memory_series"})
require.NoError(t, err)
assert.Equal(t, testData.MaxGlobalSeriesPerTenant, int(e2e.SumValues(metricNumSeries)))
metricNumExemplars, err := cortex1.SumMetrics([]string{"cortex_ingester_tsdb_exemplar_exemplars_in_storage"})
require.NoError(t, err)
assert.Equal(t, testData.MaxGlobalExemplarsPerUser, int(e2e.SumValues(metricNumExemplars)))
})
}
}
}
}
Loading

0 comments on commit 81af14a

Please sign in to comment.