Skip to content

Commit

Permalink
Make ingesters return gRPC errors only (#7151)
Browse files Browse the repository at this point in the history
* Make ingesters return gRPC errors only

Signed-off-by: Yuri Nikolic <[email protected]>

* Denote deprecated feature in about-versioning.md

Signed-off-by: Yuri Nikolic <[email protected]>

* Adding notes about backwards compatibility

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Jan 18, 2024
1 parent 409da4d commit 3fb6280
Show file tree
Hide file tree
Showing 9 changed files with 41 additions and 29 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -23,6 +23,7 @@
* [CHANGE] All: set `-server.report-grpc-codes-in-instrumentation-label-enabled` to `true` by default, which enables reporting gRPC status codes as `status_code` labels in the `cortex_request_duration_seconds` metric. #7144
* [CHANGE] Distributor: report gRPC status codes as `status_code` labels in the `cortex_ingester_client_request_duration_seconds` metric by default. #7144
* [CHANGE] Distributor: CLI flag `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled` has been deprecated, and its default value is set to `true`. #7144
* [CHANGE] Ingester: CLI flag `-ingester.return-only-grpc-errors` has been deprecated, and its default value is set to `true`. To ensure backwards compatibility, during a migration from a version prior to 2.11.0 to 2.12 or later, `-ingester.return-only-grpc-errors` should be set to `false`. Once all the components are migrated, the flag can be removed. #7151
* [FEATURE] Introduce `-tenant-federation.max-tenants` option to limit the max number of tenants allowed for requests when federation is enabled. #6959
* [FEATURE] Cardinality API: added a new `count_method` parameter which enables counting active label values. #7085
* [FEATURE] Querier / query-frontend: added `-querier.promql-experimental-functions-enabled` CLI flag (and respective YAML config option) to enable experimental PromQL functions. The experimental functions introduced are: `mad_over_time()`, `sort_by_label()` and `sort_by_label_desc()`. #7057
Expand Down
4 changes: 2 additions & 2 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -3038,10 +3038,10 @@
"required": false,
"desc": "When enabled only gRPC errors will be returned by the ingester.",
"fieldValue": null,
"fieldDefaultValue": false,
"fieldDefaultValue": true,
"fieldFlag": "ingester.return-only-grpc-errors",
"fieldType": "boolean",
"fieldCategory": "experimental"
"fieldCategory": "deprecated"
},
{
"kind": "field",
Expand Down
2 changes: 1 addition & 1 deletion cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -1362,7 +1362,7 @@ Usage of ./cmd/mimir/mimir:
-ingester.read-path-memory-utilization-limit uint
[experimental] Memory limit, in bytes, for CPU/memory utilization based read request limiting. Use 0 to disable it.
-ingester.return-only-grpc-errors
[experimental] When enabled only gRPC errors will be returned by the ingester.
[deprecated] When enabled only gRPC errors will be returned by the ingester. (default true)
-ingester.ring.consul.acl-token string
ACL Token used to interact with Consul.
-ingester.ring.consul.cas-retry-delay duration
Expand Down
8 changes: 7 additions & 1 deletion docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -108,7 +108,6 @@ The following features are currently experimental:
- `ingester.ring.token-generation-strategy`
- `ingester.ring.spread-minimizing-zones`
- `ingester.ring.spread-minimizing-join-ring-in-order`
- Allow ingester's `Push()` to return gRPC errors only: `-ingester.return-only-grpc-errors`.
- Ingester client
- Per-ingester circuit breaking based on requests timing out or hitting per-instance limits
- `-ingester.client.circuit-breaker.enabled`
Expand Down Expand Up @@ -176,3 +175,10 @@ The following features or configuration parameters are currently deprecated and

- Logging
- `-log.buffered`

The following features or configuration parameters are currently deprecated and will be **removed in Mimir 2.14**:

- Ingester
- `-ingester.return-only-grpc-errors`
- Ingester client
- `-ingester.client.report-grpc-codes-in-instrumentation-label-enabled`
Original file line number Diff line number Diff line change
Expand Up @@ -1172,9 +1172,9 @@ instance_limits:
# CLI flag: -ingester.error-sample-rate
[error_sample_rate: <int> | default = 0]
# (experimental) When enabled only gRPC errors will be returned by the ingester.
# (deprecated) When enabled only gRPC errors will be returned by the ingester.
# CLI flag: -ingester.return-only-grpc-errors
[return_only_grpc_errors: <boolean> | default = false]
[return_only_grpc_errors: <boolean> | default = true]
# (experimental) When enabled, only series currently owned by ingester according
# to the ring are used when checking user per-tenant series limit.
Expand Down
21 changes: 16 additions & 5 deletions pkg/ingester/ingester.go
Original file line number Diff line number Diff line change
Expand Up @@ -116,6 +116,8 @@ const (
// Value used to track the limit between sequential and concurrent TSDB opernings.
// Below this value, TSDBs of different tenants are opened sequentially, otherwise concurrently.
maxTSDBOpenWithoutConcurrency = 10

deprecatedReturnOnlyGRPCErrorsFlag = "ingester.return-only-grpc-errors" // Deprecated. TODO: Remove in Mimir 2.14.
)

var (
Expand Down Expand Up @@ -176,7 +178,7 @@ type Config struct {

ErrorSampleRate int64 `yaml:"error_sample_rate" json:"error_sample_rate" category:"experimental"`

ReturnOnlyGRPCErrors bool `yaml:"return_only_grpc_errors" json:"return_only_grpc_errors" category:"experimental"`
DeprecatedReturnOnlyGRPCErrors bool `yaml:"return_only_grpc_errors" json:"return_only_grpc_errors" category:"deprecated"`

UseIngesterOwnedSeriesForLimits bool `yaml:"use_ingester_owned_series_for_limits" category:"experimental"`
UpdateIngesterOwnedSeries bool `yaml:"track_ingester_owned_series" category:"experimental"`
Expand All @@ -202,17 +204,26 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
f.BoolVar(&cfg.LogUtilizationBasedLimiterCPUSamples, "ingester.log-utilization-based-limiter-cpu-samples", false, "Enable logging of utilization based limiter CPU samples.")
f.BoolVar(&cfg.LimitInflightRequestsUsingGrpcMethodLimiter, "ingester.limit-inflight-requests-using-grpc-method-limiter", false, "Use experimental method of limiting push requests.")
f.Int64Var(&cfg.ErrorSampleRate, "ingester.error-sample-rate", 0, "Each error will be logged once in this many times. Use 0 to log all of them.")
f.BoolVar(&cfg.ReturnOnlyGRPCErrors, "ingester.return-only-grpc-errors", false, "When enabled only gRPC errors will be returned by the ingester.")
f.BoolVar(&cfg.UseIngesterOwnedSeriesForLimits, "ingester.use-ingester-owned-series-for-limits", false, "When enabled, only series currently owned by ingester according to the ring are used when checking user per-tenant series limit.")
f.BoolVar(&cfg.UpdateIngesterOwnedSeries, "ingester.track-ingester-owned-series", false, "This option enables tracking of ingester-owned series based on ring state, even if -ingester.use-ingester-owned-series-for-limits is disabled.")
f.DurationVar(&cfg.OwnedSeriesUpdateInterval, "ingester.owned-series-update-interval", 15*time.Second, "How often to check for ring changes and possibly recompute owned series as a result of detected change.")

// The ingester.return-only-grpc-errors flag has been deprecated.
// According to the migration plan (https://github.com/grafana/mimir/issues/6008#issuecomment-1854320098)
// the default behaviour of Mimir should be as this flag were set to true.
// TODO: Remove in Mimir 2.14.0
f.BoolVar(&cfg.DeprecatedReturnOnlyGRPCErrors, deprecatedReturnOnlyGRPCErrorsFlag, true, "When enabled only gRPC errors will be returned by the ingester.")
}

func (cfg *Config) Validate() error {
func (cfg *Config) Validate(logger log.Logger) error {
if cfg.ErrorSampleRate < 0 {
return fmt.Errorf("error sample rate cannot be a negative number")
}

if !cfg.DeprecatedReturnOnlyGRPCErrors {
util.WarnDeprecatedConfig(deprecatedReturnOnlyGRPCErrorsFlag, logger)
}

return cfg.IngesterRing.Validate()
}

Expand Down Expand Up @@ -3361,7 +3372,7 @@ func (i *Ingester) Push(ctx context.Context, req *mimirpb.WriteRequest) (*mimirp
}

func (i *Ingester) mapPushErrorToErrorWithStatus(err error) error {
if i.cfg.ReturnOnlyGRPCErrors {
if i.cfg.DeprecatedReturnOnlyGRPCErrors {
return mapPushErrorToErrorWithStatus(err)
}
return mapPushErrorToErrorWithHTTPOrGRPCStatus(err)
Expand All @@ -3376,7 +3387,7 @@ func (i *Ingester) mapReadErrorToErrorWithStatus(err error) error {
return err
}

if i.cfg.ReturnOnlyGRPCErrors {
if i.cfg.DeprecatedReturnOnlyGRPCErrors {
return mapReadErrorToErrorWithStatus(err)
}
return mapReadErrorToErrorWithHTTPOrGRPCStatus(err)
Expand Down
22 changes: 8 additions & 14 deletions pkg/ingester/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,7 +15,6 @@ import (
"io"
"math"
"net"
"net/http"
"net/http/httptest"
"net/url"
"os"
Expand Down Expand Up @@ -1795,7 +1794,6 @@ func TestIngester_Push(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.IngesterRing.ReplicationFactor = 1
cfg.ActiveSeriesMetrics.Enabled = !testData.disableActiveSeries
cfg.ReturnOnlyGRPCErrors = true
limits := defaultLimitsTestConfig()
limits.MaxGlobalExemplarsPerUser = testData.maxExemplars
limits.MaxGlobalMetricsWithMetadataPerUser = testData.maxMetadataPerUser
Expand Down Expand Up @@ -2530,7 +2528,7 @@ func Test_Ingester_LabelNames(t *testing.T) {
_, err := i.LabelNames(ctx, &client.LabelNamesRequest{})
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -2594,7 +2592,7 @@ func Test_Ingester_LabelValues(t *testing.T) {
_, err := i.LabelValues(ctx, &client.LabelValuesRequest{})
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -2795,7 +2793,7 @@ func TestIngester_LabelNamesAndValues(t *testing.T) {
err := i.LabelNamesAndValues(&client.LabelNamesAndValuesRequest{}, nil)
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -2915,7 +2913,7 @@ func TestIngester_LabelValuesCardinality(t *testing.T) {
err := i.LabelValuesCardinality(&client.LabelValuesCardinalityRequest{}, nil)
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -3447,7 +3445,7 @@ func Test_Ingester_MetricsForLabelMatchers(t *testing.T) {
_, err := i.MetricsForLabelMatchers(ctx, &client.MetricsForLabelMatchersRequest{})
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -3845,7 +3843,7 @@ func TestIngester_QueryStream(t *testing.T) {
err = i.QueryStream(&client.QueryRequest{}, nil)
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -4304,7 +4302,7 @@ func TestIngester_QueryExemplars(t *testing.T) {
_, err := i.QueryExemplars(ctx, &client.ExemplarQueryRequest{})
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -5619,7 +5617,7 @@ func Test_Ingester_UserStats(t *testing.T) {
_, err := i.UserStats(ctx, &client.UserStatsRequest{})
stat, ok := status.FromError(err)
require.True(t, ok)
require.Equal(t, http.StatusServiceUnavailable, int(stat.Code()))
require.Equal(t, codes.ResourceExhausted, stat.Code())
require.Equal(t, tooBusyErrorMsg, stat.Message())
verifyUtilizationLimitedRequestsMetric(t, registry)
})
Expand Down Expand Up @@ -6268,7 +6266,6 @@ func TestIngesterWithShippingDisabledDeletesBlocksOnlyAfterRetentionExpires(t *t

func TestIngesterPushErrorDuringForcedCompaction(t *testing.T) {
cfg := defaultIngesterTestConfig(t)
cfg.ReturnOnlyGRPCErrors = true
i, err := prepareIngesterWithBlocksStorage(t, cfg, nil)
require.NoError(t, err)

Expand Down Expand Up @@ -7307,7 +7304,6 @@ func TestIngesterUserLimitExceeded(t *testing.T) {
// Set RF=1 here to ensure the series and metadata limits
// are actually set to 1 instead of 3.
cfg.IngesterRing.ReplicationFactor = 1
cfg.ReturnOnlyGRPCErrors = true
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, dataDir, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
Expand Down Expand Up @@ -7411,7 +7407,6 @@ func TestIngesterMetricLimitExceeded(t *testing.T) {
// Set RF=1 here to ensure the series and metadata limits
// are actually set to 1 instead of 3.
cfg.IngesterRing.ReplicationFactor = 1
cfg.ReturnOnlyGRPCErrors = true
ing, err := prepareIngesterWithBlocksStorageAndLimits(t, cfg, limits, dataDir, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), ing))
Expand Down Expand Up @@ -9376,7 +9371,6 @@ func TestIngester_PushWithSampledErrors(t *testing.T) {
ingesterCfg := defaultIngesterTestConfig(t)
ingesterCfg.IngesterRing.ReplicationFactor = 1
ingesterCfg.ErrorSampleRate = int64(errorSampleRate)
ingesterCfg.ReturnOnlyGRPCErrors = true
limits := defaultLimitsTestConfig()
limits.MaxGlobalExemplarsPerUser = testData.maxExemplars
limits.NativeHistogramsIngestionEnabled = testData.nativeHistograms
Expand Down
4 changes: 2 additions & 2 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -240,7 +240,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.IngestStorage.Validate(); err != nil {
return errors.Wrap(err, "invalid ingest storage config")
}
if c.isAnyModuleEnabled(Ingester, Write, All) && c.IngestStorage.Enabled && !c.Ingester.ReturnOnlyGRPCErrors {
if c.isAnyModuleEnabled(Ingester, Write, All) && c.IngestStorage.Enabled && !c.Ingester.DeprecatedReturnOnlyGRPCErrors {
return errors.New("to use ingest storage (-ingest-storage.enabled) also enable -ingester.return-only-grpc-errors")
}
if err := c.BlocksStorage.Validate(c.Ingester.ActiveSeriesMetrics, log); err != nil {
Expand All @@ -259,7 +259,7 @@ func (c *Config) Validate(log log.Logger) error {
if err := c.IngesterClient.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester_client config")
}
if err := c.Ingester.Validate(); err != nil {
if err := c.Ingester.Validate(log); err != nil {
return errors.Wrap(err, "invalid ingester config")
}
if err := c.Worker.Validate(); err != nil {
Expand Down
4 changes: 2 additions & 2 deletions pkg/mimir/mimir_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -428,7 +428,7 @@ func TestConfigValidation(t *testing.T) {
cfg.IngestStorage.Enabled = true
cfg.IngestStorage.KafkaConfig.Address = "localhost:123"
cfg.IngestStorage.KafkaConfig.Topic = "topic"
cfg.Ingester.ReturnOnlyGRPCErrors = false
cfg.Ingester.DeprecatedReturnOnlyGRPCErrors = false

return cfg
},
Expand All @@ -442,7 +442,7 @@ func TestConfigValidation(t *testing.T) {
cfg.IngestStorage.Enabled = true
cfg.IngestStorage.KafkaConfig.Address = "localhost:123"
cfg.IngestStorage.KafkaConfig.Topic = "topic"
cfg.Ingester.ReturnOnlyGRPCErrors = false
cfg.Ingester.DeprecatedReturnOnlyGRPCErrors = false

return cfg
},
Expand Down

0 comments on commit 3fb6280

Please sign in to comment.