From 17741f87736143fff2dbc8daefc7fdf2d1436cf7 Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Thu, 6 Mar 2025 18:34:39 +0100 Subject: [PATCH] Configure gRPC clients for querier->store-gateway communications Signed-off-by: Yuri Nikolic --- pkg/mimir/mimir.go | 1 + pkg/mimir/mimir_config_test.go | 2 ++ pkg/querier/blocks_store_queryable_test.go | 2 +- pkg/querier/store_gateway_client.go | 26 ++++++++++++++++------ pkg/querier/store_gateway_client_test.go | 3 ++- 5 files changed, 25 insertions(+), 9 deletions(-) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index bda4cbaa22a..5cf60c00e10 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -230,6 +230,7 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance { "frontend_worker_scheduler_client": &c.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation, "block_builder_scheduler_client": &c.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation, "frontend_query_scheduler_client": &c.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation, + "querier_store_gateway_client": &c.Querier.StoreGatewayClient.ClusterValidation, }, } } diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index 913b0187195..7883390508e 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -40,6 +40,7 @@ func TestCommonConfigCanBeExtended(t *testing.T) { require.Equal(t, "client-cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label) + require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -69,6 +70,7 @@ common: require.Equal(t, "client-cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label) + require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label) }) } diff --git a/pkg/querier/blocks_store_queryable_test.go b/pkg/querier/blocks_store_queryable_test.go index d692cd0d7e4..bc7c1d46ee8 100644 --- a/pkg/querier/blocks_store_queryable_test.go +++ b/pkg/querier/blocks_store_queryable_test.go @@ -1779,7 +1779,7 @@ func TestBlocksStoreQuerier_ShouldReturnContextCanceledIfContextWasCanceledWhile clientCfg := grpcclient.Config{} flagext.DefaultValues(&clientCfg) - client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"})) + client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"}), util.NewRequestInvalidClusterValidationLabelsTotalCounter(nil, "store-gateway", util.GRPCProtocol), log.NewNopLogger()) require.NoError(t, err) t.Cleanup(func() { require.NoError(t, client.Close()) diff --git a/pkg/querier/store_gateway_client.go b/pkg/querier/store_gateway_client.go index 583d9c4067a..f1458d84cca 100644 --- a/pkg/querier/store_gateway_client.go +++ b/pkg/querier/store_gateway_client.go @@ -10,8 +10,10 @@ import ( "time" "github.com/go-kit/log" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/crypto/tls" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" "github.com/pkg/errors" @@ -21,9 +23,10 @@ import ( "google.golang.org/grpc/health/grpc_health_v1" "github.com/grafana/mimir/pkg/storegateway/storegatewaypb" + "github.com/grafana/mimir/pkg/util" ) -func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, logger log.Logger) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Namespace: "cortex", Name: "storegateway_client_request_duration_seconds", @@ -32,13 +35,19 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re ConstLabels: prometheus.Labels{"client": "querier"}, }, []string{"operation", "status_code"}) + invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "store-gateway", util.GRPCProtocol) + return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) { - return dialStoreGatewayClient(clientCfg, inst, requestDuration) + return dialStoreGatewayClient(clientCfg, inst, requestDuration, invalidClusterValidation, logger) }) } -func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) { - opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) +func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, invalidClusterValidation *prometheus.CounterVec, logger log.Logger) (*storeGatewayClient, error) { + unary, stream := grpcclient.Instrument(requestDuration) + if clientCfg.ClusterValidation.Label != "" { + unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.ClusterValidation.Label, invalidClusterValidation, logger)) + } + opts, err := clientCfg.DialOption(unary, stream) if err != nil { return nil, err } @@ -85,6 +94,7 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf BackoffOnRatelimits: false, TLSEnabled: clientConfig.TLSEnabled, TLS: clientConfig.TLS, + ClusterValidation: clientConfig.ClusterValidation, } poolCfg := client.PoolConfig{ CheckInterval: 10 * time.Second, @@ -99,15 +109,17 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf ConstLabels: map[string]string{"client": "querier"}, }) - return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger) + return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg, logger), clientsCount, logger) } type ClientConfig struct { - TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` - TLS tls.ClientConfig `yaml:",inline"` + TLSEnabled bool `yaml:"tls_enabled" category:"advanced"` + TLS tls.ClientConfig `yaml:",inline"` + ClusterValidation clusterutil.ClientClusterValidationConfig `yaml:"-"` } func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.") cfg.TLS.RegisterFlagsWithPrefix(prefix, f) + cfg.ClusterValidation.RegisterAndTrackFlagsWithPrefix(prefix, f) } diff --git a/pkg/querier/store_gateway_client_test.go b/pkg/querier/store_gateway_client_test.go index 3bcdfce2677..ed4b2ff60cc 100644 --- a/pkg/querier/store_gateway_client_test.go +++ b/pkg/querier/store_gateway_client_test.go @@ -10,6 +10,7 @@ import ( "net" "testing" + "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" @@ -45,7 +46,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) { flagext.DefaultValues(&cfg) reg := prometheus.NewPedanticRegistry() - factory := newStoreGatewayClientFactory(cfg, reg) + factory := newStoreGatewayClientFactory(cfg, reg, log.NewNopLogger()) for i := 0; i < 2; i++ { inst := ring.InstanceDesc{Addr: listener.Addr().String()}