From 2c29161fe6e31566ffc77e08d3b9e35fe2e08f57 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Sun, 9 Mar 2025 10:38:29 +0100 Subject: [PATCH] Distributor: Return specific error message when burst size limit exceeded Signed-off-by: Arve Knudsen --- CHANGELOG.md | 7 +++--- pkg/distributor/distributor.go | 6 +++++ pkg/distributor/distributor_test.go | 2 +- pkg/distributor/errors.go | 34 +++++++++++++++++++++++++++-- 4 files changed, 43 insertions(+), 6 deletions(-) diff --git a/CHANGELOG.md b/CHANGELOG.md index 1a7f4252619..6e08829b549 100644 --- a/CHANGELOG.md +++ b/CHANGELOG.md @@ -4,9 +4,6 @@ ### Grafana Mimir -* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702 -* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738 -* [FEATURE] Distributor: Add experimental Influx handler. #10153 * [CHANGE] Querier: pass context to queryable `IsApplicable` hook. #10451 * [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236 * [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256 @@ -19,6 +16,10 @@ `cortex_ruler_write_requests_total` and `cortex_ruler_queries_total` metrics. #10536 * [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660 #10712 * [CHANGE] Store-gateway: Include posting sampling rate in sparse index headers. When the sampling rate isn't set in a sparse index header, store gateway will rebuild the sparse header with the configured `blocks-storage.bucket-store.posting-offsets-in-mem-sampling` value. If the sparse header's sampling rate is set, but doesn't match the configured rate, store gateway will either rebuild the sparse header or downsample to the configured sampling rate. #10684 +* [CHANGE] Distributor: Return specific error message when burst size limit exceeded. #10835 +* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702 +* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738 +* [FEATURE] Distributor: Add experimental Influx handler. #10153 * [ENHANCEMENT] Compactor: Expose `cortex_bucket_index_last_successful_update_timestamp_seconds` for all tenants assigned to the compactor before starting the block cleanup job. #10569 * [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103 * [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168 diff --git a/pkg/distributor/distributor.go b/pkg/distributor/distributor.go index 47e2599305b..07853add009 100644 --- a/pkg/distributor/distributor.go +++ b/pkg/distributor/distributor.go @@ -1311,6 +1311,12 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc { d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars)) d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata)) + // Determine whether limiter burst size was exceeded. + limiterBurst := d.ingestionRateLimiter.Burst(now, userID) + if totalN > limiterBurst { + return newIngestionBurstSizeLimitedError(limiterBurst, totalN) + } + burstSize := d.limits.IngestionBurstSize(userID) if d.limits.IngestionBurstFactor(userID) > 0 { burstSize = int(d.limits.IngestionRate(userID) * d.limits.IngestionBurstFactor(userID)) diff --git a/pkg/distributor/distributor_test.go b/pkg/distributor/distributor_test.go index 0d9e2365715..6b54db7da53 100644 --- a/pkg/distributor/distributor_test.go +++ b/pkg/distributor/distributor_test.go @@ -197,7 +197,7 @@ func TestDistributor_Push(t *testing.T) { happyIngesters: 3, samples: samplesIn{num: 25, startTimestampMs: 123456789000}, metadata: 5, - expectedGRPCError: status.New(codes.ResourceExhausted, newIngestionRateLimitedError(20, 20).Error()), + expectedGRPCError: status.New(codes.ResourceExhausted, newIngestionBurstSizeLimitedError(20, 55).Error()), expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED}, metricNames: []string{lastSeenTimestamp}, expectedMetrics: ` diff --git a/pkg/distributor/errors.go b/pkg/distributor/errors.go index e0eb803dd75..46d05ab6533 100644 --- a/pkg/distributor/errors.go +++ b/pkg/distributor/errors.go @@ -41,6 +41,11 @@ var ( validation.IngestionBurstSizeFlag, ) + ingestionBurstSizeLimitedMsgFormat = globalerror.IngestionRateLimited.MessageWithPerTenantLimitConfig( + "the request has been rejected because the tenant exceeded the ingestion burst size limit, set to %d, with %d items. This limit is applied on the total number of samples, exemplars and metadata received across all distributors", + validation.IngestionBurstSizeFlag, + ) + requestRateLimitedMsgFormat = globalerror.RequestRateLimited.MessageWithPerTenantLimitConfig( "the request has been rejected because the tenant exceeded the request rate limit, set to %v requests/s across all distributors with a maximum allowed burst of %d", validation.RequestRateFlag, @@ -124,7 +129,7 @@ type ingestionRateLimitedError struct { burst int } -// newIngestionRateLimitedError creates a ingestionRateLimitedError error containing the given error message. +// newIngestionRateLimitedError creates an ingestionRateLimitedError error containing the given error message. func newIngestionRateLimitedError(limit float64, burst int) ingestionRateLimitedError { return ingestionRateLimitedError{ limit: limit, @@ -143,6 +148,31 @@ func (e ingestionRateLimitedError) Cause() mimirpb.ErrorCause { // Ensure that ingestionRateLimitedError implements Error. var _ Error = ingestionRateLimitedError{} +// ingestionBurstSizeLimitedError represents the ingestion burst size limited error. +type ingestionBurstSizeLimitedError struct { + burst int + items int +} + +// newIngestionBurstSizeLimitedError creates an ingestionBurstSizeLimitedError error containing the given error message. +func newIngestionBurstSizeLimitedError(burst, items int) ingestionBurstSizeLimitedError { + return ingestionBurstSizeLimitedError{ + burst: burst, + items: items, + } +} + +func (e ingestionBurstSizeLimitedError) Error() string { + return fmt.Sprintf(ingestionBurstSizeLimitedMsgFormat, e.burst, e.items) +} + +func (e ingestionBurstSizeLimitedError) Cause() mimirpb.ErrorCause { + return mimirpb.INGESTION_RATE_LIMITED +} + +// Ensure that ingestionBurstSizeLimitedError implements Error. +var _ Error = ingestionBurstSizeLimitedError{} + // requestRateLimitedError is an error used to represent the request rate limited error. type requestRateLimitedError struct { limit float64 @@ -174,7 +204,7 @@ type ingesterPushError struct { cause mimirpb.ErrorCause } -// newIngesterPushError creates a ingesterPushError error representing the given status object. +// newIngesterPushError creates an ingesterPushError error representing the given status object. func newIngesterPushError(stat *status.Status, ingesterID string) ingesterPushError { errorCause := mimirpb.UNKNOWN_CAUSE details := stat.Details()