Skip to content

Commit

Permalink
Distributor: Return specific error message when burst size limit exce…
Browse files Browse the repository at this point in the history
…eded

Signed-off-by: Arve Knudsen <[email protected]>
  • Loading branch information
aknuds1 committed Mar 9, 2025
1 parent d65b32c commit 2c29161
Show file tree
Hide file tree
Showing 4 changed files with 43 additions and 6 deletions.
7 changes: 4 additions & 3 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -4,9 +4,6 @@

### Grafana Mimir

* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702
* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [CHANGE] Querier: pass context to queryable `IsApplicable` hook. #10451
* [CHANGE] Distributor: OTLP and push handler replace all non-UTF8 characters with the unicode replacement character `\uFFFD` in error messages before propagating them. #10236
* [CHANGE] Querier: pass query matchers to queryable `IsApplicable` hook. #10256
Expand All @@ -19,6 +16,10 @@
`cortex_ruler_write_requests_total` and `cortex_ruler_queries_total` metrics. #10536
* [CHANGE] Querier / Query-frontend: Remove experimental `-querier.promql-experimental-functions-enabled` and `-query-frontend.block-promql-experimental-functions` CLI flags and respective YAML configuration options to enable experimental PromQL functions. Instead access to experimental PromQL functions is always blocked. You can enable them using the per-tenant setting `enabled_promql_experimental_functions`. #10660 #10712
* [CHANGE] Store-gateway: Include posting sampling rate in sparse index headers. When the sampling rate isn't set in a sparse index header, store gateway will rebuild the sparse header with the configured `blocks-storage.bucket-store.posting-offsets-in-mem-sampling` value. If the sparse header's sampling rate is set, but doesn't match the configured rate, store gateway will either rebuild the sparse header or downsample to the configured sampling rate. #10684
* [CHANGE] Distributor: Return specific error message when burst size limit exceeded. #10835
* [FEATURE] Ingester/Distributor: Add support for exporting cost attribution metrics (`cortex_ingester_attributed_active_series`, `cortex_distributor_received_attributed_samples_total`, and `cortex_discarded_attributed_samples_total`) with labels specified by customers to a custom Prometheus registry. This feature enables more flexible billing data tracking. #10269 #10702
* [FEATURE] Ruler: Added `/ruler/tenants` endpoints to list the discovered tenants with rule groups. #10738
* [FEATURE] Distributor: Add experimental Influx handler. #10153
* [ENHANCEMENT] Compactor: Expose `cortex_bucket_index_last_successful_update_timestamp_seconds` for all tenants assigned to the compactor before starting the block cleanup job. #10569
* [ENHANCEMENT] Query Frontend: Return server-side `samples_processed` statistics. #10103
* [ENHANCEMENT] Distributor: OTLP receiver now converts also metric metadata. See also https://github.com/prometheus/prometheus/pull/15416. #10168
Expand Down
6 changes: 6 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -1311,6 +1311,12 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
d.discardedExemplarsRateLimited.WithLabelValues(userID).Add(float64(validatedExemplars))
d.discardedMetadataRateLimited.WithLabelValues(userID).Add(float64(validatedMetadata))

// Determine whether limiter burst size was exceeded.
limiterBurst := d.ingestionRateLimiter.Burst(now, userID)
if totalN > limiterBurst {
return newIngestionBurstSizeLimitedError(limiterBurst, totalN)
}

burstSize := d.limits.IngestionBurstSize(userID)
if d.limits.IngestionBurstFactor(userID) > 0 {
burstSize = int(d.limits.IngestionRate(userID) * d.limits.IngestionBurstFactor(userID))
Expand Down
2 changes: 1 addition & 1 deletion pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -197,7 +197,7 @@ func TestDistributor_Push(t *testing.T) {
happyIngesters: 3,
samples: samplesIn{num: 25, startTimestampMs: 123456789000},
metadata: 5,
expectedGRPCError: status.New(codes.ResourceExhausted, newIngestionRateLimitedError(20, 20).Error()),
expectedGRPCError: status.New(codes.ResourceExhausted, newIngestionBurstSizeLimitedError(20, 55).Error()),
expectedErrorDetails: &mimirpb.ErrorDetails{Cause: mimirpb.INGESTION_RATE_LIMITED},
metricNames: []string{lastSeenTimestamp},
expectedMetrics: `
Expand Down
34 changes: 32 additions & 2 deletions pkg/distributor/errors.go
Original file line number Diff line number Diff line change
Expand Up @@ -41,6 +41,11 @@ var (
validation.IngestionBurstSizeFlag,
)

ingestionBurstSizeLimitedMsgFormat = globalerror.IngestionRateLimited.MessageWithPerTenantLimitConfig(
"the request has been rejected because the tenant exceeded the ingestion burst size limit, set to %d, with %d items. This limit is applied on the total number of samples, exemplars and metadata received across all distributors",
validation.IngestionBurstSizeFlag,
)

requestRateLimitedMsgFormat = globalerror.RequestRateLimited.MessageWithPerTenantLimitConfig(
"the request has been rejected because the tenant exceeded the request rate limit, set to %v requests/s across all distributors with a maximum allowed burst of %d",
validation.RequestRateFlag,
Expand Down Expand Up @@ -124,7 +129,7 @@ type ingestionRateLimitedError struct {
burst int
}

// newIngestionRateLimitedError creates a ingestionRateLimitedError error containing the given error message.
// newIngestionRateLimitedError creates an ingestionRateLimitedError error containing the given error message.
func newIngestionRateLimitedError(limit float64, burst int) ingestionRateLimitedError {
return ingestionRateLimitedError{
limit: limit,
Expand All @@ -143,6 +148,31 @@ func (e ingestionRateLimitedError) Cause() mimirpb.ErrorCause {
// Ensure that ingestionRateLimitedError implements Error.
var _ Error = ingestionRateLimitedError{}

// ingestionBurstSizeLimitedError represents the ingestion burst size limited error.
type ingestionBurstSizeLimitedError struct {
burst int
items int
}

// newIngestionBurstSizeLimitedError creates an ingestionBurstSizeLimitedError error containing the given error message.
func newIngestionBurstSizeLimitedError(burst, items int) ingestionBurstSizeLimitedError {
return ingestionBurstSizeLimitedError{
burst: burst,
items: items,
}
}

func (e ingestionBurstSizeLimitedError) Error() string {
return fmt.Sprintf(ingestionBurstSizeLimitedMsgFormat, e.burst, e.items)
}

func (e ingestionBurstSizeLimitedError) Cause() mimirpb.ErrorCause {
return mimirpb.INGESTION_RATE_LIMITED
}

// Ensure that ingestionBurstSizeLimitedError implements Error.
var _ Error = ingestionBurstSizeLimitedError{}

// requestRateLimitedError is an error used to represent the request rate limited error.
type requestRateLimitedError struct {
limit float64
Expand Down Expand Up @@ -174,7 +204,7 @@ type ingesterPushError struct {
cause mimirpb.ErrorCause
}

// newIngesterPushError creates a ingesterPushError error representing the given status object.
// newIngesterPushError creates an ingesterPushError error representing the given status object.
func newIngesterPushError(stat *status.Status, ingesterID string) ingesterPushError {
errorCause := mimirpb.UNKNOWN_CAUSE
details := stat.Details()
Expand Down

0 comments on commit 2c29161

Please sign in to comment.