Skip to content

Commit

Permalink
distributor: add temporary metric to see if we're dropping nh samples (
Browse files Browse the repository at this point in the history
…#10760)

* distributor: add temporary metric to see if we're dropping nh samples

Measure how many native histograms samples we're dropping silently.
One native histograms are stable and enabled by default, this metric
will no longer be needed.

Signed-off-by: György Krajcsovits <[email protected]>
  • Loading branch information
krajorama authored Mar 4, 2025
1 parent 7c38e09 commit 2a3eb85
Show file tree
Hide file tree
Showing 3 changed files with 82 additions and 0 deletions.
1 change: 1 addition & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -64,6 +64,7 @@
* `-server.cluster-validation.grpc.soft-validation`
* [ENHANCEMENT] All: Add `cortex_client_request_invalid_cluster_validation_labels_total` metrics, that is used by Mimir's gRPC clients to track invalid cluster validations. #10767
* [ENHANCEMENT] Ingester client: Add support to configure cluster validation for ingester clients. Failed cluster validations are tracked by `cortex_client_request_invalid_cluster_validation_labels_total` with label `client=ingester`. #10767
* [ENHANCEMENT] Add experimental metric `cortex_distributor_dropped_native_histograms_total` to measure native histograms silently dropped when native histograms are disabled for a tenant. #10760
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
22 changes: 22 additions & 0 deletions pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -152,6 +152,9 @@ type Distributor struct {
labelValuesWithNewlinesPerUser *prometheus.CounterVec
hashCollisionCount prometheus.Counter

// Metric for silently dropped native histogram samples
droppedNativeHistograms *prometheus.CounterVec

// Metrics for data rejected for hitting per-tenant limits
discardedSamplesTooManyHaClusters *prometheus.CounterVec
discardedSamplesRateLimited *prometheus.CounterVec
Expand Down Expand Up @@ -467,6 +470,11 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
Help: "Total number of label values with newlines seen at ingestion time.",
}, []string{"user"}),

droppedNativeHistograms: promauto.With(reg).NewCounterVec(prometheus.CounterOpts{
Name: "cortex_distributor_dropped_native_histograms_total",
Help: "The total number of native histograms that were silently dropped because native histograms ingestion is disabled.",
}, []string{"user"}),

discardedSamplesTooManyHaClusters: validation.DiscardedSamplesCounter(reg, reasonTooManyHAClusters),
discardedSamplesRateLimited: validation.DiscardedSamplesCounter(reg, reasonRateLimited),
discardedRequestsRateLimited: validation.DiscardedRequestsCounter(reg, reasonRateLimited),
Expand Down Expand Up @@ -730,6 +738,8 @@ func (d *Distributor) cleanupInactiveUser(userID string) {

d.PushMetrics.deleteUserMetrics(userID)

d.droppedNativeHistograms.DeleteLabelValues(userID)

filter := prometheus.Labels{"user": userID}
d.dedupedSamples.DeletePartialMatch(filter)
d.discardedSamplesTooManyHaClusters.DeletePartialMatch(filter)
Expand Down Expand Up @@ -1206,6 +1216,10 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// Enforce the creation grace period on exemplars too.
maxExemplarTS := now.Add(d.limits.CreationGracePeriod(userID)).UnixMilli()

// Are we going to drop native histograms? If yes, let's count and report them.
countDroppedNativeHistograms := !d.limits.NativeHistogramsIngestionEnabled(userID)
var droppedNativeHistograms int

var firstPartialErr error
var removeIndexes []int
totalSamples, totalExemplars := 0, 0
Expand All @@ -1227,6 +1241,10 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
// Note that validateSeries may drop some data in ts.
shouldRemove, validationErr := d.validateSeries(now, &req.Timeseries[tsIdx], userID, group, skipLabelValidation, skipLabelCountValidation, minExemplarTS, maxExemplarTS)

if countDroppedNativeHistograms {
droppedNativeHistograms += len(ts.Histograms)
}

// Errors in validation are considered non-fatal, as one series in a request may contain
// invalid data but all the remaining series could be perfectly valid.
if validationErr != nil {
Expand All @@ -1245,6 +1263,10 @@ func (d *Distributor) prePushValidationMiddleware(next PushFunc) PushFunc {
labelValuesWithNewlines += d.labelValuesWithNewlines(ts.Labels)
}

if droppedNativeHistograms > 0 {
d.droppedNativeHistograms.WithLabelValues(userID).Add(float64(droppedNativeHistograms))
}

d.incomingSamplesPerRequest.WithLabelValues(userID).Observe(float64(totalSamples))
d.incomingExemplarsPerRequest.WithLabelValues(userID).Observe(float64(totalExemplars))
if labelValuesWithNewlines > 0 {
Expand Down
59 changes: 59 additions & 0 deletions pkg/distributor/distributor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -362,6 +362,7 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
"cortex_distributor_non_ha_samples_received_total",
"cortex_distributor_latest_seen_sample_timestamp_seconds",
"cortex_distributor_label_values_with_newlines_total",
"cortex_distributor_dropped_native_histograms_total",
}

d.receivedSamples.WithLabelValues("userA").Add(5)
Expand All @@ -377,12 +378,17 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
d.dedupedSamples.WithLabelValues("userA", "cluster1").Inc() // We cannot clean this metric
d.latestSeenSampleTimestampPerUser.WithLabelValues("userA").Set(1111)
d.labelValuesWithNewlinesPerUser.WithLabelValues("userA").Inc()
d.droppedNativeHistograms.WithLabelValues("userA").Inc()

require.NoError(t, testutil.GatherAndCompare(reg, strings.NewReader(`
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
cortex_distributor_deduped_samples_total{cluster="cluster1",user="userA"} 1
# HELP cortex_distributor_dropped_native_histograms_total The total number of native histograms that were silently dropped because native histograms ingestion is disabled.
# TYPE cortex_distributor_dropped_native_histograms_total counter
cortex_distributor_dropped_native_histograms_total{user="userA"} 1
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
cortex_distributor_latest_seen_sample_timestamp_seconds{user="userA"} 1111
Expand Down Expand Up @@ -429,6 +435,9 @@ func TestDistributor_MetricsCleanup(t *testing.T) {
# HELP cortex_distributor_deduped_samples_total The total number of deduplicated samples.
# TYPE cortex_distributor_deduped_samples_total counter
# HELP cortex_distributor_dropped_native_histograms_total The total number of native histograms that were silently dropped because native histograms ingestion is disabled.
# TYPE cortex_distributor_dropped_native_histograms_total counter
# HELP cortex_distributor_latest_seen_sample_timestamp_seconds Unix timestamp of latest received sample per user.
# TYPE cortex_distributor_latest_seen_sample_timestamp_seconds gauge
Expand Down Expand Up @@ -1468,6 +1477,56 @@ func TestDistributor_Push_HistogramValidation(t *testing.T) {
}
}

func TestDistributor_Push_CountDroppedNativeHistograms(t *testing.T) {
tests := map[string]struct {
ingestionEnabled bool
expectCounterExists bool
}{
"native histograms ingestion enabled": {
ingestionEnabled: true,
expectCounterExists: false,
},
"native histograms ingestion disabled": {
ingestionEnabled: false,
expectCounterExists: true,
},
}

for testName, tc := range tests {
t.Run(testName, func(t *testing.T) {
ctx := user.InjectOrgID(context.Background(), "user")
req := makeWriteRequestHistogram([]string{model.MetricNameLabel, "test"}, 1000, generateTestHistogram(0))

limits := prepareDefaultLimits()
limits.NativeHistogramsIngestionEnabled = tc.ingestionEnabled

ds, _, regs, _ := prepare(t, prepConfig{
numIngesters: 2,
happyIngesters: 2,
numDistributors: 1,
limits: limits,
})

// Pre-condition check.
require.Len(t, ds, 1)
require.Len(t, regs, 1)

resp, err := ds[0].Push(ctx, req)
require.NoError(t, err)
require.Equal(t, emptyResponse, resp)
if tc.expectCounterExists {
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(`
# HELP cortex_distributor_dropped_native_histograms_total The total number of native histograms that were silently dropped because native histograms ingestion is disabled.
# TYPE cortex_distributor_dropped_native_histograms_total counter
cortex_distributor_dropped_native_histograms_total{user="user"} 1
`), "cortex_distributor_dropped_native_histograms_total"))
} else {
assert.NoError(t, testutil.GatherAndCompare(regs[0], strings.NewReader(``), "cortex_distributor_dropped_native_histograms_total"))
}
})
}
}

func TestDistributor_SampleDuplicateTimestamp(t *testing.T) {
labels := []string{labels.MetricName, "series", "job", "job", "service", "service"}

Expand Down

0 comments on commit 2a3eb85

Please sign in to comment.