From 81af14aa19e532913730acfb9c3bd367a33502ff Mon Sep 17 00:00:00 2001 From: Patryk Prus Date: Tue, 16 Jan 2024 10:14:58 -0500 Subject: [PATCH] Perform initial owned series calculation before starting lifecycler (#7087) * Perform initial owned series calculation before starting lifecycler * Enable owned series tracking in integration tests * Enable limiting based on owned series in e2e tests * Run ingester limits e2e tests with owned series enabled and disabled * Clean up * Update CHANGELOG --- CHANGELOG.md | 2 +- integration/ingester_limits_test.go | 344 +++++++++++++++------------- pkg/ingester/ingester.go | 25 ++ pkg/ingester/owned_series.go | 20 +- 4 files changed, 208 insertions(+), 183 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 2d4d55d1c4e..fd0d483309e 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -164,7 +164,7 @@ * [FEATURE] Add the experimental `-.s3.send-content-md5` flag (defaults to `false`) to configure S3 Put Object requests to send a `Content-MD5` header. Setting this flag is not recommended unless your object storage does not support checksums. #6622 * [FEATURE] Distributor: add an experimental flag `-distributor.reusable-ingester-push-worker` that can be used to pre-allocate a pool of workers to be used to send push requests to the ingesters. #6660 * [FEATURE] Distributor: Support enabling of automatically generated name suffixes for metrics ingested via OTLP, through the flag `-distributor.otel-metric-suffixes-enabled`. #6542 -* [FEATURE] Ingester: ingester can now track which of the user's series the ingester actually owns according to the ring, and only consider owned series when checking for user series limit. This helps to avoid hitting the user's series limit when scaling up ingesters or changing user's ingester shard size. Feature is currently experimental, and disabled by default. It can be enabled by setting `-ingester.use-ingester-owned-series-for-limits` (to use owned series for limiting). This is currently limited to multi-zone ingester setup, with replication factor being equal to number of zones. #6718 +* [FEATURE] Ingester: ingester can now track which of the user's series the ingester actually owns according to the ring, and only consider owned series when checking for user series limit. This helps to avoid hitting the user's series limit when scaling up ingesters or changing user's ingester shard size. Feature is currently experimental, and disabled by default. It can be enabled by setting `-ingester.use-ingester-owned-series-for-limits` (to use owned series for limiting). This is currently limited to multi-zone ingester setup, with replication factor being equal to number of zones. #6718 #7087 * [ENHANCEMENT] Query-frontend: don't treat cancel as an error. #4648 * [ENHANCEMENT] Ingester: exported summary `cortex_ingester_inflight_push_requests_summary` tracking total number of inflight requests in percentile buckets. #5845 * [ENHANCEMENT] Query-scheduler: add `cortex_query_scheduler_enqueue_duration_seconds` metric that records the time taken to enqueue or reject a query request. #5879 diff --git a/integration/ingester_limits_test.go b/integration/ingester_limits_test.go index 42e4e7c17be..4b8d7e46e91 100644 --- a/integration/ingester_limits_test.go +++ b/integration/ingester_limits_test.go @@ -42,103 +42,112 @@ func TestIngesterGlobalLimits(t *testing.T) { }, } + ossTests := map[string]bool{ + "ownedSeriesService enabled": true, + "ownedSeriesService disabled": false, + } + for testName, testData := range tests { - t.Run(testName, func(t *testing.T) { - s, err := e2e.NewScenario(networkName) - require.NoError(t, err) - defer s.Close() - - flags := mergeFlags( - BlocksStorageFlags(), - BlocksStorageS3Flags(), - ) - flags["-ingester.ring.replication-factor"] = "1" - flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize) - flags["-ingester.max-global-series-per-user"] = strconv.Itoa(testData.maxGlobalSeriesPerTenant) - flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(testData.maxGlobalSeriesPerMetric) - flags["-ingester.ring.heartbeat-period"] = "1s" - - // Start dependencies. - consul := e2edb.NewConsul() - minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) - require.NoError(t, s.StartAndWaitReady(consul, minio)) - - // Start Mimir components. - distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) - ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags) - ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags) - ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags) - require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3)) - - // Wait until distributor has updated the ring. - require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( - labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), - labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) - - // Wait until ingesters have heartbeated the ring after all ingesters were active, - // in order to update the number of instances. Since we have no metric, we have to - // rely on a ugly sleep. - time.Sleep(2 * time.Second) - - now := time.Now() - client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) - require.NoError(t, err) - - numSeriesWithSameMetricName := 0 - numSeriesTotal := 0 - maxErrorsBeforeStop := 100 - - // Try to push as many series with the same metric name as we can. - for i, errs := 0, 0; i < 10000; i++ { - series, _, _ := generateAlternatingSeries(i)("test_limit_per_metric", now, prompb.Label{ - Name: "cardinality", - Value: strconv.Itoa(rand.Int()), - }) + for ossName, ossEnabled := range ossTests { + t.Run(fmt.Sprintf("%s/%s", testName, ossName), func(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() - res, err := client.Push(series) + flags := mergeFlags( + BlocksStorageFlags(), + BlocksStorageS3Flags(), + ) + flags["-ingester.ring.replication-factor"] = "1" + flags["-distributor.ingestion-tenant-shard-size"] = strconv.Itoa(testData.tenantShardSize) + flags["-ingester.max-global-series-per-user"] = strconv.Itoa(testData.maxGlobalSeriesPerTenant) + flags["-ingester.max-global-series-per-metric"] = strconv.Itoa(testData.maxGlobalSeriesPerMetric) + flags["-ingester.ring.heartbeat-period"] = "1s" + flags["-ingester.track-ingester-owned-series"] = strconv.FormatBool(ossEnabled) + flags["-ingester.use-ingester-owned-series-for-limits"] = strconv.FormatBool(ossEnabled) + + // Start dependencies. + consul := e2edb.NewConsul() + minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"]) + require.NoError(t, s.StartAndWaitReady(consul, minio)) + + // Start Mimir components. + distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), flags) + ingester1 := e2emimir.NewIngester("ingester-1", consul.NetworkHTTPEndpoint(), flags) + ingester2 := e2emimir.NewIngester("ingester-2", consul.NetworkHTTPEndpoint(), flags) + ingester3 := e2emimir.NewIngester("ingester-3", consul.NetworkHTTPEndpoint(), flags) + require.NoError(t, s.StartAndWaitReady(distributor, ingester1, ingester2, ingester3)) + + // Wait until distributor has updated the ring. + require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(3), []string{"cortex_ring_members"}, e2e.WithLabelMatchers( + labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"), + labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE")))) + + // Wait until ingesters have heartbeated the ring after all ingesters were active, + // in order to update the number of instances. Since we have no metric, we have to + // rely on a ugly sleep. + time.Sleep(2 * time.Second) + + now := time.Now() + client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID) require.NoError(t, err) - if res.StatusCode == 200 { - numSeriesTotal++ - numSeriesWithSameMetricName++ - } else { - require.Equal(t, 400, res.StatusCode) - if errs++; errs >= maxErrorsBeforeStop { - break + numSeriesWithSameMetricName := 0 + numSeriesTotal := 0 + maxErrorsBeforeStop := 100 + + // Try to push as many series with the same metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _, _ := generateAlternatingSeries(i)("test_limit_per_metric", now, prompb.Label{ + Name: "cardinality", + Value: strconv.Itoa(rand.Int()), + }) + + res, err := client.Push(series) + require.NoError(t, err) + + if res.StatusCode == 200 { + numSeriesTotal++ + numSeriesWithSameMetricName++ + } else { + require.Equal(t, 400, res.StatusCode) + if errs++; errs >= maxErrorsBeforeStop { + break + } } } - } - // Try to push as many series with the different metric name as we can. - for i, errs := 0, 0; i < 10000; i++ { - series, _, _ := generateAlternatingSeries(i)(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now) - res, err := client.Push(series) - require.NoError(t, err) + // Try to push as many series with the different metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _, _ := generateAlternatingSeries(i)(fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()), now) + res, err := client.Push(series) + require.NoError(t, err) - if res.StatusCode == 200 { - numSeriesTotal++ - } else { - require.Equal(t, 400, res.StatusCode) - if errs++; errs >= maxErrorsBeforeStop { - break + if res.StatusCode == 200 { + numSeriesTotal++ + } else { + require.Equal(t, 400, res.StatusCode) + if errs++; errs >= maxErrorsBeforeStop { + break + } } } - } - // We expect the number of series we've been successfully pushed to be around - // the limit. Due to how the global limit implementation works (lack of centralised - // coordination) the actual number of written series could be slightly different - // than the global limit, so we allow a 10% difference. - delta := 0.1 - assert.InDelta(t, testData.maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(testData.maxGlobalSeriesPerMetric)*delta) - assert.InDelta(t, testData.maxGlobalSeriesPerTenant, numSeriesTotal, float64(testData.maxGlobalSeriesPerTenant)*delta) - - // Ensure no service-specific metrics prefix is used by the wrong service. - assertServiceMetricsPrefixes(t, Distributor, distributor) - assertServiceMetricsPrefixes(t, Ingester, ingester1) - assertServiceMetricsPrefixes(t, Ingester, ingester2) - assertServiceMetricsPrefixes(t, Ingester, ingester3) - }) + // We expect the number of series we've been successfully pushed to be around + // the limit. Due to how the global limit implementation works (lack of centralised + // coordination) the actual number of written series could be slightly different + // than the global limit, so we allow a 10% difference. + delta := 0.1 + assert.InDelta(t, testData.maxGlobalSeriesPerMetric, numSeriesWithSameMetricName, float64(testData.maxGlobalSeriesPerMetric)*delta) + assert.InDelta(t, testData.maxGlobalSeriesPerTenant, numSeriesTotal, float64(testData.maxGlobalSeriesPerTenant)*delta) + + // Ensure no service-specific metrics prefix is used by the wrong service. + assertServiceMetricsPrefixes(t, Distributor, distributor) + assertServiceMetricsPrefixes(t, Ingester, ingester1) + assertServiceMetricsPrefixes(t, Ingester, ingester2) + assertServiceMetricsPrefixes(t, Ingester, ingester3) + }) + } } } @@ -173,96 +182,105 @@ overrides: "histogram series": generateNHistogramSeries, } + ossTests := map[string]bool{ + "ownedSeriesService enabled": true, + "ownedSeriesService disabled": false, + } + for testName, testData := range tests { for fName, generator := range generators { - t.Run(fmt.Sprintf("%s/%s", testName, fName), func(t *testing.T) { - s, err := e2e.NewScenario(networkName) - require.NoError(t, err) - defer s.Close() - - // Write blank overrides file, so we can check if they are updated later - require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte{})) - - // Start Cortex in single binary mode, reading the config from file. - require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile)) - - flags := map[string]string{ - "-runtime-config.reload-period": "100ms", - "-blocks-storage.backend": "filesystem", - "-blocks-storage.filesystem.dir": "/tmp", - "-blocks-storage.storage-prefix": "blocks", - "-ruler-storage.backend": "filesystem", - "-ruler-storage.local.directory": "/tmp", // Avoid warning "unable to list rules". - "-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, overridesFile), - } - cortex1 := e2emimir.NewSingleBinary("cortex-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095)) - require.NoError(t, s.StartAndWaitReady(cortex1)) - - // Populate the overrides we want, then wait long enough for it to be read. - // (max-exemplars will be taken from config on first sample received) - overrides := buildConfigFromTemplate(overridesTemplate, &testData) - require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte(overrides))) - time.Sleep(500 * time.Millisecond) - - now := time.Now() - client, err := e2emimir.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID) - require.NoError(t, err) - - numSeriesWithSameMetricName := 0 - numSeriesTotal := 0 - maxErrorsBeforeStop := 1 + for ossName, ossEnabled := range ossTests { + t.Run(fmt.Sprintf("%s/%s/%s", testName, fName, ossName), func(t *testing.T) { + s, err := e2e.NewScenario(networkName) + require.NoError(t, err) + defer s.Close() + + // Write blank overrides file, so we can check if they are updated later + require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte{})) + + // Start Cortex in single binary mode, reading the config from file. + require.NoError(t, copyFileToSharedDir(s, "docs/configurations/single-process-config-blocks.yaml", mimirConfigFile)) + + flags := map[string]string{ + "-runtime-config.reload-period": "100ms", + "-blocks-storage.backend": "filesystem", + "-blocks-storage.filesystem.dir": "/tmp", + "-blocks-storage.storage-prefix": "blocks", + "-ruler-storage.backend": "filesystem", + "-ruler-storage.local.directory": "/tmp", // Avoid warning "unable to list rules". + "-runtime-config.file": filepath.Join(e2e.ContainerSharedDir, overridesFile), + "-ingester.track-ingester-owned-series": strconv.FormatBool(ossEnabled), + "-ingester.use-ingester-owned-series-for-limits": strconv.FormatBool(ossEnabled), + } + cortex1 := e2emimir.NewSingleBinary("cortex-1", flags, e2emimir.WithConfigFile(mimirConfigFile), e2emimir.WithPorts(9009, 9095)) + require.NoError(t, s.StartAndWaitReady(cortex1)) - // Try to push as many series with the same metric name as we can. - for i, errs := 0, 0; i < 10000; i++ { - series, _ := generator(10, 1, func() string { return "test_limit_per_metric" }, now, func() []prompb.Label { - return []prompb.Label{{ - Name: "cardinality", - Value: strconv.Itoa(rand.Int()), - }} - }) + // Populate the overrides we want, then wait long enough for it to be read. + // (max-exemplars will be taken from config on first sample received) + overrides := buildConfigFromTemplate(overridesTemplate, &testData) + require.NoError(t, writeFileToSharedDir(s, overridesFile, []byte(overrides))) + time.Sleep(500 * time.Millisecond) - res, err := client.Push(series) + now := time.Now() + client, err := e2emimir.NewClient(cortex1.HTTPEndpoint(), "", "", "", userID) require.NoError(t, err) - if res.StatusCode == 200 { - numSeriesTotal += 10 - numSeriesWithSameMetricName += 10 - } else { - require.Equal(t, 400, res.StatusCode) - if errs++; errs >= maxErrorsBeforeStop { - break + numSeriesWithSameMetricName := 0 + numSeriesTotal := 0 + maxErrorsBeforeStop := 1 + + // Try to push as many series with the same metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _ := generator(10, 1, func() string { return "test_limit_per_metric" }, now, func() []prompb.Label { + return []prompb.Label{{ + Name: "cardinality", + Value: strconv.Itoa(rand.Int()), + }} + }) + + res, err := client.Push(series) + require.NoError(t, err) + + if res.StatusCode == 200 { + numSeriesTotal += 10 + numSeriesWithSameMetricName += 10 + } else { + require.Equal(t, 400, res.StatusCode) + if errs++; errs >= maxErrorsBeforeStop { + break + } } } - } - // Try to push as many series with the different metric name as we can. - for i, errs := 0, 0; i < 10000; i++ { - series, _ := generator(10, 1, func() string { return fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()) }, now, nil) - res, err := client.Push(series) - require.NoError(t, err) - - if res.StatusCode == 200 { - numSeriesTotal += 10 - } else { - require.Equal(t, 400, res.StatusCode) - if errs++; errs >= maxErrorsBeforeStop { - break + // Try to push as many series with the different metric name as we can. + for i, errs := 0, 0; i < 10000; i++ { + series, _ := generator(10, 1, func() string { return fmt.Sprintf("test_limit_per_tenant_%d", rand.Int()) }, now, nil) + res, err := client.Push(series) + require.NoError(t, err) + + if res.StatusCode == 200 { + numSeriesTotal += 10 + } else { + require.Equal(t, 400, res.StatusCode) + if errs++; errs >= maxErrorsBeforeStop { + break + } } } - } - // With just one ingester we expect to hit the limit exactly - assert.Equal(t, testData.MaxGlobalSeriesPerMetric, numSeriesWithSameMetricName) - assert.Equal(t, testData.MaxGlobalSeriesPerTenant, numSeriesTotal) + // With just one ingester we expect to hit the limit exactly + assert.Equal(t, testData.MaxGlobalSeriesPerMetric, numSeriesWithSameMetricName) + assert.Equal(t, testData.MaxGlobalSeriesPerTenant, numSeriesTotal) - // Check metrics - metricNumSeries, err := cortex1.SumMetrics([]string{"cortex_ingester_memory_series"}) - require.NoError(t, err) - assert.Equal(t, testData.MaxGlobalSeriesPerTenant, int(e2e.SumValues(metricNumSeries))) - metricNumExemplars, err := cortex1.SumMetrics([]string{"cortex_ingester_tsdb_exemplar_exemplars_in_storage"}) - require.NoError(t, err) - assert.Equal(t, testData.MaxGlobalExemplarsPerUser, int(e2e.SumValues(metricNumExemplars))) - }) + // Check metrics + metricNumSeries, err := cortex1.SumMetrics([]string{"cortex_ingester_memory_series"}) + require.NoError(t, err) + assert.Equal(t, testData.MaxGlobalSeriesPerTenant, int(e2e.SumValues(metricNumSeries))) + metricNumExemplars, err := cortex1.SumMetrics([]string{"cortex_ingester_tsdb_exemplar_exemplars_in_storage"}) + require.NoError(t, err) + assert.Equal(t, testData.MaxGlobalExemplarsPerUser, int(e2e.SumValues(metricNumExemplars))) + }) + } } } } diff --git a/pkg/ingester/ingester.go b/pkg/ingester/ingester.go index 7f7364d0ca4..97d412cf029 100644 --- a/pkg/ingester/ingester.go +++ b/pkg/ingester/ingester.go @@ -468,6 +468,31 @@ func (i *Ingester) starting(ctx context.Context) (err error) { return errors.Wrap(err, "opening existing TSDBs") } + if i.ownedSeriesService != nil { + // We need to perform the initial computation of owned series after the TSDBs are opened but before the ingester becomes + // ACTIVE in the ring and starts to accept requests. However, because the ingester still uses the Lifecycler (rather + // than BasicLifecycler) there is no deterministic way to delay the ACTIVE state until we finish the calculations. + // + // Since we don't actually need to be ACTIVE in the ring to calculate owned series (just present, with tokens) we instead + // calculate owned series before we start the lifecycler at all. Once we move the ingester to the BasicLifecycler and + // have better control over the ring states, we could perform the calculation while in JOINING state, and move to + // ACTIVE once we finish. + + oss := i.ownedSeriesService + + // Fetch and cache current ring state + _, err := oss.checkRingForChanges() + switch { + case errors.Is(err, ring.ErrEmptyRing): + level.Warn(i.logger).Log("msg", "ingester ring is empty") + case err != nil: + return fmt.Errorf("can't read ring: %v", err) + default: + // We pass ringChanged=true, but all TSDBs at this point (after opening TSDBs, but before ingester switched to Running state) also have "new user" trigger set anyway. + oss.updateAllTenants(ctx, true) + } + } + // Important: we want to keep lifecycler running until we ask it to stop, so we need to give it independent context if err := i.lifecycler.StartAsync(context.Background()); err != nil { return errors.Wrap(err, "failed to start lifecycler") diff --git a/pkg/ingester/owned_series.go b/pkg/ingester/owned_series.go index 73e88a7e4b1..5a34eca750f 100644 --- a/pkg/ingester/owned_series.go +++ b/pkg/ingester/owned_series.go @@ -5,7 +5,6 @@ package ingester import ( "context" "errors" - "fmt" "time" "github.com/go-kit/log" @@ -65,27 +64,10 @@ func newOwnedSeriesService(interval time.Duration, instanceID string, ingesterRi }), } - oss.Service = services.NewTimerService(interval, oss.starting, oss.onPeriodicCheck, nil) + oss.Service = services.NewTimerService(interval, nil, oss.onPeriodicCheck, nil) return oss } -// This is Starting function for ownedSeries service. Service is only started after all TSDBs are opened. -// Pushes are not allowed yet when this function runs. -func (oss *ownedSeriesService) starting(ctx context.Context) error { - err := ring.WaitInstanceState(ctx, oss.ingestersRing, oss.instanceID, ring.ACTIVE) - if err != nil { - return err - } - - if _, err := oss.checkRingForChanges(); err != nil { - return fmt.Errorf("can't read ring: %v", err) - } - - // We pass ringChanged=true, but all TSDBs at this point (after opening TSDBs, but before ingester switched to Running state) also have "new user" trigger set anyway. - oss.updateAllTenants(ctx, true) - return nil -} - // This function runs periodically. It checks if ring has changed, and updates number of owned series for any // user that requires it (due to ring change, compaction, shard size change, ...). func (oss *ownedSeriesService) onPeriodicCheck(ctx context.Context) error {