Skip to content

Commit

Permalink
distributor: Run ingestion limiter after custom push wrappers
Browse files Browse the repository at this point in the history
I was chasing an issue where a tenant was limited on a 10k/s limit, with a ~200/s received rate.

Turns out that the ingestion limit was applied and then custom push wrappers were removing timeseries and samples. The number of series recorded in the metrics was the one after custom push wrappers

Ingestion rate limit should apply after all push wrappers, since its purpose is to limit what we send to ingesters

With this bug, the `Mimir / Tenants` dashboard is also wrong because the limit is shown against the number of received samples
  • Loading branch information
julienduchesne committed Feb 26, 2025
1 parent 34cdbbf commit b9d0e60
Show file tree
Hide file tree
Showing 2 changed files with 112 additions and 7 deletions.
47 changes: 40 additions & 7 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -968,6 +968,8 @@ func (d *Distributor) wrapPushWithMiddlewares(next PushFunc) PushFunc {
middlewares = append(middlewares, d.prePushSortAndFilterMiddleware)
middlewares = append(middlewares, d.prePushValidationMiddleware)
middlewares = append(middlewares, d.cfg.PushWrappers...)
// Apply ingestion rate limiting last, after all other middleware have potentially modified the request
middlewares = append(middlewares, d.ingestionRateLimitMiddleware)

for ix := len(middlewares) - 1; ix >= 0; ix-- {
next = middlewares[ix](next)
Expand Down Expand Up @@ -1280,6 +1282,43 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
return firstPartialErr
}

err = next(ctx, pushReq)
if err != nil {
// Errors resulting from the pushing to the ingesters have priority over validation errors.
return err
}

return firstPartialErr
}
}

func (d *Distributor) ingestionRateLimitMiddleware(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
next, maybeCleanup := NextOrCleanup(next, pushReq)
defer maybeCleanup()

req, err := pushReq.WriteRequest()
if err != nil {
return err
}

userID, err := tenant.TenantID(ctx)
if err != nil {
return err
}

now := mtime.Now()
group := validation.GroupLabel(d.limits, userID, req.Timeseries)

validatedSamples := 0
validatedExemplars := 0
validatedMetadata := 0
for _, ts := range req.Timeseries {
validatedSamples += len(ts.Samples) + len(ts.Histograms)
validatedExemplars += len(ts.Exemplars)
}
validatedMetadata = len(req.Metadata)

totalN := validatedSamples + validatedExemplars + validatedMetadata
if !d.ingestionRateLimiter.AllowN(now, userID, totalN) {
if len(req.Timeseries) > 0 {
Expand All @@ -1299,13 +1338,7 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// totalN included samples, exemplars and metadata. Ingester follows this pattern when computing its ingestion rate.
d.ingestionRate.Add(int64(totalN))

err = next(ctx, pushReq)
if err != nil {
// Errors resulting from the pushing to the ingesters have priority over validation errors.
return err
}

return firstPartialErr
return next(ctx, pushReq)
}
}

Expand Down
72 changes: 72 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -656,6 +656,78 @@ func TestDistributor_PushIngestionRateLimiter(t *testing.T) {
}
}

func TestDistributor_IngestionRateLimiterMiddlewareWithPushWrapper(t *testing.T) {
// Create a push wrapper that removes all but one sample from each request
sampleFilterWrapper := func(next PushFunc) PushFunc {
return func(ctx context.Context, pushReq *Request) error {
req, err := pushReq.WriteRequest()
if err != nil {
return err
}

// Keep only the first timeseries and sample
req.Timeseries = req.Timeseries[:1]
req.Timeseries[0].Samples = req.Timeseries[0].Samples[:1]
return next(ctx, pushReq)
}
}

limits := prepareDefaultLimits()
limits.IngestionRate = 3
limits.IngestionBurstSize = 3

cfg := Config{
PushWrappers: []PushWrapper{sampleFilterWrapper},
}
distributors, _, regs, _ := prepare(t, prepConfig{
numIngesters: 3,
happyIngesters: 3,
numDistributors: 1,
configure: func(config *Config) {
config.PushWrappers = cfg.PushWrappers
},
limits: limits,
})
d := distributors[0]

ctx := user.InjectOrgID(context.Background(), "test")

// Create requests with many samples that would normally exceed the rate limit
reqs := []*mimirpb.WriteRequest{
makeWriteRequest(0, 20, 0, false, false, "test1"), // 20 samples, wrapper will reduce to 1
makeWriteRequest(0, 15, 0, false, false, "test2"), // 15 samples, wrapper will reduce to 1
makeWriteRequest(0, 30, 0, false, false, "test3"), // 30 samples, wrapper will reduce to 1
}

// Execute requests - they should all succeed since wrapper reduces sample count
for _, req := range reqs {
resp, err := d.Push(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse, resp)
}

// Check ingestion rate is based on actual samples sent (3 total) not original count (65 total)
require.Less(t, d.ingestionRate.Rate(), float64(5))

// Check metrics using GatherAndCompare
require.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(`
# HELP cortex_distributor_samples_in_total The total number of samples that have come in to the distributor, including rejected or deduped samples.
# TYPE cortex_distributor_samples_in_total counter
cortex_distributor_samples_in_total{user="test"} 65
# HELP cortex_distributor_received_samples_total The total number of received samples, excluding rejected and deduped samples.
# TYPE cortex_distributor_received_samples_total counter
cortex_distributor_received_samples_total{user="test"} 3
# HELP cortex_distributor_requests_in_total The total number of requests that have come in to the distributor, including rejected or deduped requests.
# TYPE cortex_distributor_requests_in_total counter
cortex_distributor_requests_in_total{user="test"} 3
# HELP cortex_discarded_samples_total The total number of samples that have been rate limited.
# TYPE cortex_discarded_samples_total counter
`), "cortex_distributor_samples_in_total", "cortex_distributor_received_samples_total", "cortex_distributor_requests_in_total", "cortex_discarded_samples_total"))
}

func TestDistributor_PushInstanceLimits(t *testing.T) {
type testPush struct {
samples int
Expand Down

0 comments on commit b9d0e60

Please sign in to comment.