From 145b51def287d3df7f41c30e2ba78d505c20192f Mon Sep 17 00:00:00 2001 From: Yuri Nikolic Date: Fri, 7 Mar 2025 15:33:56 +0100 Subject: [PATCH] Configure gRPC clients for ruler->ruler communications Signed-off-by: Yuri Nikolic --- pkg/mimir/mimir.go | 1 + pkg/mimir/mimir_config_test.go | 2 ++ pkg/ruler/client_pool.go | 18 +++++++++++++----- pkg/ruler/client_pool_test.go | 3 ++- 4 files changed, 18 insertions(+), 6 deletions(-) diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index bf42431000d..4e53343dd66 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -232,6 +232,7 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance { "frontend_query_scheduler_client": &c.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation, "querier_store_gateway_client": &c.Querier.StoreGatewayClient.ClusterValidation, "scheduler_query_frontend_client": &c.QueryScheduler.GRPCClientConfig.ClusterValidation, + "ruler_client": &c.Ruler.ClientTLSConfig.ClusterValidation, }, } } diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index ea2a60ce1f5..84003797281 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -42,6 +42,7 @@ func TestCommonConfigCanBeExtended(t *testing.T) { require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label) + require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -73,6 +74,7 @@ common: require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label) require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label) + require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label) }) } diff --git a/pkg/ruler/client_pool.go b/pkg/ruler/client_pool.go index 78927875f6c..ddde38a4307 100644 --- a/pkg/ruler/client_pool.go +++ b/pkg/ruler/client_pool.go @@ -10,6 +10,7 @@ import ( "github.com/go-kit/log" "github.com/grafana/dskit/grpcclient" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/ring" "github.com/grafana/dskit/ring/client" "github.com/grafana/dskit/services" @@ -18,6 +19,8 @@ import ( "github.com/prometheus/client_golang/prometheus/promauto" "google.golang.org/grpc" "google.golang.org/grpc/health/grpc_health_v1" + + "github.com/grafana/mimir/pkg/util" ) // ClientsPool is the interface used to get the client from the pool for a specified address. @@ -53,24 +56,29 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom }) return &rulerClientsPool{ - client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger), + client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg, logger), clientsCount, logger), } } -func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory { +func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, logger log.Logger) client.PoolFactory { requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{ Name: "cortex_ruler_client_request_duration_seconds", Help: "Time spent executing requests to the ruler.", Buckets: prometheus.ExponentialBuckets(0.008, 4, 7), }, []string{"operation", "status_code"}) + invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "ruler", util.GRPCProtocol) return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) { - return dialRulerClient(clientCfg, inst, requestDuration) + return dialRulerClient(clientCfg, inst, requestDuration, invalidClusterValidation, logger) }) } -func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) { - opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration)) +func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, invalidClusterValidation *prometheus.CounterVec, logger log.Logger) (*rulerExtendedClient, error) { + unary, stream := grpcclient.Instrument(requestDuration) + if clientCfg.ClusterValidation.Label != "" { + unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.ClusterValidation.Label, invalidClusterValidation, logger)) + } + opts, err := clientCfg.DialOption(unary, stream) if err != nil { return nil, err } diff --git a/pkg/ruler/client_pool_test.go b/pkg/ruler/client_pool_test.go index fd1e8a40d56..8c6bbb7c752 100644 --- a/pkg/ruler/client_pool_test.go +++ b/pkg/ruler/client_pool_test.go @@ -10,6 +10,7 @@ import ( "net" "testing" + "github.com/go-kit/log" "github.com/grafana/dskit/flagext" "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/ring" @@ -42,7 +43,7 @@ func Test_newRulerClientFactory(t *testing.T) { flagext.DefaultValues(&cfg) reg := prometheus.NewPedanticRegistry() - factory := newRulerClientFactory(cfg, reg) + factory := newRulerClientFactory(cfg, reg, log.NewNopLogger()) for i := 0; i < 2; i++ { inst := ring.InstanceDesc{Addr: listener.Addr().String()}