Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

distributor: Run ingestion limiter after custom push wrappers #10754

Open
wants to merge 3 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -84,6 +84,7 @@
* [BUGFIX] Distributor: Report partially converted OTLP requests with status 400 Bad Request. #10588
* [BUGFIX] Ruler: fix issue where rule evaluations could be missed while shutting down a ruler instance if that instance owns many rule groups. prometheus/prometheus#15804 #10762
* [BUGFIX] Ingester: Add additional check on reactive limiter queue sizes. #10722
* [BUGFIX] Distributor: Apply ingestion rate limit after custom PushWrappers have run. This ensures that the `ingestion_rate` limit is properly applied against the same value recorded in the `cortex_distributor_received_samples_total` metric. #10754

### Mixin

Expand Down
47 changes: 40 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,8 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {
middlewares = append(middlewares, d.prePushSortAndFilterMiddleware)
middlewares = append(middlewares, d.prePushValidationMiddleware)
middlewares = append(middlewares, d.cfg.PushWrappers...)
// Apply ingestion rate limiting last, after all other middleware have potentially modified the request
middlewares = append(middlewares, d.ingestionRateLimitMiddleware)

for ix := len(middlewares) - 1; ix >= 0; ix-- {
next = middlewares[ix](next)
Expand Down Expand Up @@ -1280,6 +1282,43 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
return firstPartialErr
}

err = next(ctx, pushReq)
if err != nil {
// Errors resulting from the pushing to the ingesters have priority over validation errors.
return err
}

return firstPartialErr
}
}

func (d *Distributor) ingestionRateLimitMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
next, maybeCleanup := NextOrCleanup(next, pushReq)
defer maybeCleanup()

req, err := pushReq.WriteRequest()
if err != nil {
return err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

now := mtime.Now()
group := validation.GroupLabel(d.limits, userID, req.Timeseries)

validatedSamples := 0
validatedExemplars := 0
validatedMetadata := 0
for _, ts := range req.Timeseries {
validatedSamples += len(ts.Samples) + len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}
validatedMetadata = len(req.Metadata)

totalN := validatedSamples + validatedExemplars + validatedMetadata
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
if len(req.Timeseries) > 0 {
Expand All @@ -1299,13 +1338,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// totalN included samples, exemplars and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

err = next(ctx, pushReq)
if err != nil {
// Errors resulting from the pushing to the ingesters have priority over validation errors.
return err
}

return firstPartialErr
return next(ctx, pushReq)
}
}

Expand Down
72 changes: 72 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,78 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_IngestionRateLimiterMiddlewareWithPushWrapper(t *testing.T) {
// Create a push wrapper that removes all but one sample from each request
sampleFilterWrapper := func(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
req, err := pushReq.WriteRequest()
if err != nil {
return err
}

// Keep only the first timeseries and sample
req.Timeseries = req.Timeseries[:1]
req.Timeseries[0].Samples = req.Timeseries[0].Samples[:1]
return next(ctx, pushReq)
}
}

limits := prepareDefaultLimits()
limits.IngestionRate = 3
limits.IngestionBurstSize = 3

cfg := Config{
PushWrappers: []PushWrapper{sampleFilterWrapper},
}
distributors, _, regs, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
configure: func(config *Config) {
config.PushWrappers = cfg.PushWrappers
},
limits: limits,
})
d := distributors[0]

ctx := user.InjectOrgID(context.Background(), "test")

// Create requests with many samples that would normally exceed the rate limit
reqs := []*mimirpb.WriteRequest{
makeWriteRequest(0, 20, 0, false, false, "test1"), // 20 samples, wrapper will reduce to 1
makeWriteRequest(0, 15, 0, false, false, "test2"), // 15 samples, wrapper will reduce to 1
makeWriteRequest(0, 30, 0, false, false, "test3"), // 30 samples, wrapper will reduce to 1
}

// Execute requests - they should all succeed since wrapper reduces sample count
for _, req := range reqs {
resp, err := d.Push(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse, resp)
}

// Check ingestion rate is based on actual samples sent (3 total) not original count (65 total)
require.Less(t, d.ingestionRate.Rate(), float64(5))

// Check metrics using GatherAndCompare
require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(`
# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
cortex_distributor_samples_in_total{user="test"} 65

# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="test"} 3

# HELP cortex_distributor_requests_in_total The total number of requests that have come in to the distributor, including rejected or deduped requests.
# TYPE cortex_distributor_requests_in_total counter
cortex_distributor_requests_in_total{user="test"} 3

# HELP cortex_discarded_samples_total The total number of samples that have been rate limited.
# TYPE cortex_discarded_samples_total counter
`), "cortex_distributor_samples_in_total", "cortex_distributor_received_samples_total", "cortex_distributor_requests_in_total", "cortex_discarded_samples_total"))
}

func TestDistributor_PushInstanceLimits(t *testing.T) {
type testPush struct {
samples int
Expand Down
Loading