Skip to content

Commit

Permalink
Allow ingester clients to use middleware.ClusterUnaryClientInterceptor (
Browse files Browse the repository at this point in the history
#10767)

* Allow ingester clients to use middleware.ClusterUnaryClientInterceptor

Signed-off-by: Yuri Nikolic <[email protected]>

* Adding documentation and CHANGELOG

Signed-off-by: Yuri Nikolic <[email protected]>

* Adding the missing header in pkg/util/metrics.go

Signed-off-by: Yuri Nikolic <[email protected]>

* Fixing review findings

Signed-off-by: Yuri Nikolic <[email protected]>

---------

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic authored Mar 3, 2025
1 parent 7eb9252 commit 6b51b44
Show file tree
Hide file tree
Showing 14 changed files with 230 additions and 28 deletions.
6 changes: 6 additions & 0 deletions CHANGELOG.md
Original file line number Diff line number Diff line change
Expand Up @@ -58,6 +58,12 @@
* `go_cpu_classes_gc_total_cpu_seconds_total`
* `go_cpu_classes_total_cpu_seconds_total`
* `go_cpu_classes_idle_cpu_seconds_total`
* [ENHANCEMENT] All: Add support for cluster validation in gRCP calls. When it is enabled, gRPC server verifies if a request coming from a gRPC client comes from an expected cluster. This validation can be configured by the following experimental configuration options: #10767
* `-server.cluster-validation.label`
* `-server.cluster-validation.grpc.enabled`
* `-server.cluster-validation.grpc.soft-validation`
* [ENHANCEMENT] All: Add `cortex_client_request_invalid_cluster_validation_labels_total` metrics, that is used by Mimir's gRPC clients to track invalid cluster validations. #10767
* [ENHANCEMENT] Ingester client: Add support to configure cluster validation for ingester clients. Failed cluster validations are tracked by `cortex_client_request_invalid_cluster_validation_labels_total` with label `client=ingester`. #10767
* [BUGFIX] Distributor: Use a boolean to track changes while merging the ReplicaDesc components, rather than comparing the objects directly. #10185
* [BUGFIX] Querier: fix timeout responding to query-frontend when response size is very close to `-querier.frontend-client.grpc-max-send-msg-size`. #10154
* [BUGFIX] Query-frontend and querier: show warning/info annotations in some cases where they were missing (if a lazy querier was used). #10277
Expand Down
9 changes: 6 additions & 3 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -730,7 +730,8 @@
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "server.cluster-validation.label",
"fieldType": "string"
"fieldType": "string",
"fieldCategory": "experimental"
},
{
"kind": "block",
Expand All @@ -746,7 +747,8 @@
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "server.cluster-validation.grpc.enabled",
"fieldType": "boolean"
"fieldType": "boolean",
"fieldCategory": "experimental"
},
{
"kind": "field",
Expand All @@ -756,7 +758,8 @@
"fieldValue": null,
"fieldDefaultValue": false,
"fieldFlag": "server.cluster-validation.grpc.soft-validation",
"fieldType": "boolean"
"fieldType": "boolean",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
Expand Down
6 changes: 3 additions & 3 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -3162,11 +3162,11 @@ Usage of ./cmd/mimir/mimir:
-runtime-config.reload-period duration
How often to check runtime config files. (default 10s)
-server.cluster-validation.grpc.enabled
When enabled, cluster label validation will be executed.
[experimental] When enabled, cluster label validation will be executed.
-server.cluster-validation.grpc.soft-validation
When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled
[experimental] When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled
-server.cluster-validation.label string
Optionally define server's cluster validation label.
[experimental] Optionally define server's cluster validation label.
-server.graceful-shutdown-timeout duration
Timeout for graceful shutdowns (default 30s)
-server.grpc-conn-limit int
Expand Down
6 changes: 0 additions & 6 deletions cmd/mimir/help.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -771,12 +771,6 @@ Usage of ./cmd/mimir/mimir:
The tenant's shard size when sharding is used by ruler. Value of 0 disables shuffle sharding for the tenant, and tenant rules will be sharded across all ruler replicas.
-runtime-config.file comma-separated-list-of-strings
Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right.
-server.cluster-validation.grpc.enabled
When enabled, cluster label validation will be executed.
-server.cluster-validation.grpc.soft-validation
When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled
-server.cluster-validation.label string
Optionally define server's cluster validation label.
-server.grpc-listen-address string
gRPC server listen address.
-server.grpc-listen-port int
Expand Down
4 changes: 4 additions & 0 deletions docs/sources/mimir/configure/about-versioning.md
Original file line number Diff line number Diff line change
Expand Up @@ -253,6 +253,10 @@ The following features are currently experimental:
- Server
- [PROXY protocol](https://www.haproxy.org/download/2.3/doc/proxy-protocol.txt) support
- `-server.proxy-protocol-enabled`
- Cross-cluster validation support for gRPC communications
- `-server.cluster-validation.label`
- `-server.cluster-validation.grpc.enabled`
- `-server.cluster-validation.grpc.soft-validation`
- Kafka-based ingest storage
- `-ingest-storage.*`
- `-ingester.partition-ring.*`
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -757,17 +757,18 @@ grpc_tls_config:
[http_path_prefix: <string> | default = ""]
cluster_validation:
# Optionally define server's cluster validation label.
# (experimental) Optionally define server's cluster validation label.
# CLI flag: -server.cluster-validation.label
[label: <string> | default = ""]
grpc:
# When enabled, cluster label validation will be executed.
# (experimental) When enabled, cluster label validation will be executed.
# CLI flag: -server.cluster-validation.grpc.enabled
[enabled: <boolean> | default = false]
# When enabled, soft cluster label validation will be executed. Can be
# enabled only together with server.cluster-validation.grpc.enabled
# (experimental) When enabled, soft cluster label validation will be
# executed. Can be enabled only together with
# server.cluster-validation.grpc.enabled
# CLI flag: -server.cluster-validation.grpc.soft-validation
[softvalidation: <boolean> | default = false]
```
Expand Down
152 changes: 152 additions & 0 deletions integration/ingester_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@
package integration

import (
"fmt"
"net/http"
"testing"
"time"
Expand Down Expand Up @@ -783,3 +784,154 @@ func TestIngesterReportGRPCStatusCodes(t *testing.T) {
require.Equalf(t, 1.0, successfulQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
require.Equalf(t, 0.0, cancelledQueryRequests, "got %v query requests (%v successful, %v cancelled)", totalQueryRequests, successfulQueryRequests, cancelledQueryRequests)
}

func TestInvalidClusterValidationLabel(t *testing.T) {
testCases := map[string]struct {
distributorClusterLabel string
ingesterClusterLabel string
softValidation bool
expectedResponseStatus int
expectedIngesterServerStatus string
expectedIngesterClientStatus string
expectedClusterValidationFailures int
}{
"when ingester client and server have the same cluster label no error is expected": {
distributorClusterLabel: "cluster",
ingesterClusterLabel: "cluster",
expectedResponseStatus: http.StatusOK,
expectedIngesterServerStatus: "OK",
expectedIngesterClientStatus: "OK",
expectedClusterValidationFailures: 0,
},
"when ingester client and server do not have the same cluster label and soft validation is enabled no error is expected": {
distributorClusterLabel: "distributor-cluster",
ingesterClusterLabel: "ingester-cluster",
softValidation: true,
expectedResponseStatus: http.StatusOK,
expectedIngesterServerStatus: "OK",
expectedIngesterClientStatus: "OK",
expectedClusterValidationFailures: 0,
},
"when ingester client and server do not have the same cluster label and soft validation is disabled an error is expected": {
distributorClusterLabel: "distributor-cluster",
ingesterClusterLabel: "ingester-cluster",
softValidation: false,
expectedResponseStatus: http.StatusInternalServerError,
expectedIngesterServerStatus: "FailedPrecondition",
expectedIngesterClientStatus: "Internal",
expectedClusterValidationFailures: 1,
},
}

series := []prompb.TimeSeries{
{
Labels: []prompb.Label{
{
Name: "__name__",
Value: "not_foobar",
},
},
Samples: []prompb.Sample{
{
Timestamp: time.Now().Round(time.Second).UnixMilli(),
Value: 100,
},
},
},
}

for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
s, err := e2e.NewScenario(networkName)
require.NoError(t, err)
defer s.Close()

baseFlags := map[string]string{
"-distributor.ingestion-tenant-shard-size": "0",
"-ingester.ring.heartbeat-period": "1s",
}

flags := mergeFlags(
BlocksStorageFlags(),
BlocksStorageS3Flags(),
baseFlags,
)

distributorFlags := mergeFlags(
flags,
map[string]string{
"-server.cluster-validation.label": testCase.distributorClusterLabel,
},
)

ingesterFlags := mergeFlags(
flags,
map[string]string{
"-server.cluster-validation.label": testCase.ingesterClusterLabel,
"-server.cluster-validation.grpc.enabled": "true",
"-server.cluster-validation.grpc.soft-validation": fmt.Sprintf("%v", testCase.softValidation),
},
)

// Start dependencies.
consul := e2edb.NewConsul()
minio := e2edb.NewMinio(9000, flags["-blocks-storage.s3.bucket-name"])
require.NoError(t, s.StartAndWaitReady(consul, minio))

// Start Mimir components.
distributor := e2emimir.NewDistributor("distributor", consul.NetworkHTTPEndpoint(), distributorFlags)
ingester := e2emimir.NewIngester("ingester", consul.NetworkHTTPEndpoint(), ingesterFlags)
require.NoError(t, s.StartAndWaitReady(distributor, ingester))

// Wait until distributor has updated the ring.
require.NoError(t, distributor.WaitSumMetricsWithOptions(e2e.Equals(1), []string{"cortex_ring_members"}, e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "name", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "state", "ACTIVE"))))

client, err := e2emimir.NewClient(distributor.HTTPEndpoint(), "", "", "", userID)
require.NoError(t, err)

res, err := client.Push(series)
require.NoError(t, err)
require.Equal(t, testCase.expectedResponseStatus, res.StatusCode)

// We track ingester server response code.
sums, err := ingester.SumMetrics(
[]string{"cortex_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "route", "/cortex.Ingester/Push"),
labels.MustNewMatcher(labels.MatchEqual, "status_code", testCase.expectedIngesterServerStatus),
),
e2e.SkipMissingMetrics,
e2e.WithMetricCount,
)
require.NoError(t, err)
require.Equal(t, 1.0, e2e.SumValues(sums))

// We track ingester client response code.
sums, err = distributor.SumMetrics([]string{"cortex_ingester_client_request_duration_seconds"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "operation", "/cortex.Ingester/Push"),
labels.MustNewMatcher(labels.MatchRegexp, "status_code", testCase.expectedIngesterClientStatus),
),
e2e.WithMetricCount,
e2e.SkipMissingMetrics,
)
require.NoError(t, err)
require.Equal(t, 1.0, e2e.SumValues(sums))

if testCase.expectedClusterValidationFailures > 0 {
// We expect that the cluster validation error is tracked by ingester client's metrics.
sums, err = distributor.SumMetrics([]string{"cortex_client_request_invalid_cluster_validation_labels_total"},
e2e.WithLabelMatchers(
labels.MustNewMatcher(labels.MatchEqual, "method", "/cortex.Ingester/Push"),
labels.MustNewMatcher(labels.MatchEqual, "client", "ingester"),
labels.MustNewMatcher(labels.MatchEqual, "protocol", "grpc"),
),
)
require.NoError(t, err)
require.Equal(t, float64(testCase.expectedClusterValidationFailures), e2e.SumValues(sums))
}
})
}
}
2 changes: 1 addition & 1 deletion pkg/distributor/distributor.go
Original file line number Diff line number Diff line change
Expand Up @@ -374,7 +374,7 @@ func New(cfg Config, clientConfig ingester_client.Config, limits *validation.Ove
clientMetrics := ingester_client.NewMetrics(reg)
if cfg.IngesterClientFactory == nil {
cfg.IngesterClientFactory = ring_client.PoolInstFunc(func(inst ring.InstanceDesc) (ring_client.PoolClient, error) {
return ingester_client.MakeIngesterClient(inst, clientConfig, clientMetrics)
return ingester_client.MakeIngesterClient(inst, clientConfig, clientMetrics, log)
})
}

Expand Down
9 changes: 7 additions & 2 deletions pkg/ingester/client/client.go
Original file line number Diff line number Diff line change
Expand Up @@ -8,6 +8,7 @@ package client
import (
"flag"

"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
Expand All @@ -33,10 +34,13 @@ type closableHealthAndIngesterClient struct {
}

// MakeIngesterClient makes a new IngesterClient
func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics) (HealthAndIngesterClient, error) {
func MakeIngesterClient(inst ring.InstanceDesc, cfg Config, metrics *Metrics, logger log.Logger) (HealthAndIngesterClient, error) {
reportGRPCStatusesOptions := []middleware.InstrumentationOption{middleware.ReportGRPCStatusOption}
unary, stream := grpcclient.Instrument(metrics.requestDuration, reportGRPCStatusesOptions...)
unary = append(unary, querierapi.ReadConsistencyClientUnaryInterceptor)
if cfg.ClusterValidationLabel != "" {
unary = append(unary, middleware.ClusterUnaryClientInterceptor(cfg.ClusterValidationLabel, metrics.invalidClusterVerificationLabels, logger))
}
stream = append(stream, querierapi.ReadConsistencyClientStreamInterceptor)

dialOpts, err := cfg.GRPCClientConfig.DialOption(unary, stream)
Expand Down Expand Up @@ -66,7 +70,8 @@ func (c *closableHealthAndIngesterClient) Close() error {

// Config is the configuration struct for the ingester client
type Config struct {
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with ingesters from distributors, queriers and rulers."`
GRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate with ingesters from distributors, queriers and rulers."`
ClusterValidationLabel string `yaml:"-"`
}

// RegisterFlags registers configuration settings used by the ingester client config.
Expand Down
7 changes: 6 additions & 1 deletion pkg/ingester/client/metrics.go
Original file line number Diff line number Diff line change
Expand Up @@ -5,10 +5,13 @@ package client
import (
"github.com/prometheus/client_golang/prometheus"
"github.com/prometheus/client_golang/prometheus/promauto"

"github.com/grafana/mimir/pkg/util"
)

type Metrics struct {
requestDuration *prometheus.HistogramVec
requestDuration *prometheus.HistogramVec
invalidClusterVerificationLabels *prometheus.CounterVec
}

func NewMetrics(reg prometheus.Registerer) *Metrics {
Expand All @@ -18,5 +21,7 @@ func NewMetrics(reg prometheus.Registerer) *Metrics {
Help: "Time spent doing Ingester requests.",
Buckets: prometheus.ExponentialBuckets(0.001, 4, 8),
}, []string{"operation", "status_code"}),

invalidClusterVerificationLabels: util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "ingester", util.GRPCProtocol),
}
}
Loading

0 comments on commit 6b51b44

Please sign in to comment.