Skip to content

Commit

Permalink
Configure gRPC clients for ruler->ruler communications
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Mar 7, 2025
1 parent 7a47cd8 commit 145b51d
Show file tree
Hide file tree
Showing 4 changed files with 18 additions and 6 deletions.
1 change: 1 addition & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance {
"frontend_query_scheduler_client": &c.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation,
"querier_store_gateway_client": &c.Querier.StoreGatewayClient.ClusterValidation,
"scheduler_query_frontend_client": &c.QueryScheduler.GRPCClientConfig.ClusterValidation,
"ruler_client": &c.Ruler.ClientTLSConfig.ClusterValidation,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mimir/mimir_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestCommonConfigCanBeExtended(t *testing.T) {
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label)
})

t.Run("yaml inheritance", func(t *testing.T) {
Expand Down Expand Up @@ -73,6 +74,7 @@ common:
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label)
})
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/ruler/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
Expand All @@ -18,6 +19,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/mimir/pkg/util"
)

// ClientsPool is the interface used to get the client from the pool for a specified address.
Expand Down Expand Up @@ -53,24 +56,29 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom
})

return &rulerClientsPool{
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger),
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg, logger), clientsCount, logger),
}
}

func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, logger log.Logger) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ruler_client_request_duration_seconds",
Help: "Time spent executing requests to the ruler.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}, []string{"operation", "status_code"})
invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "ruler", util.GRPCProtocol)

return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) {
return dialRulerClient(clientCfg, inst, requestDuration)
return dialRulerClient(clientCfg, inst, requestDuration, invalidClusterValidation, logger)
})
}

func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, invalidClusterValidation *prometheus.CounterVec, logger log.Logger) (*rulerExtendedClient, error) {
unary, stream := grpcclient.Instrument(requestDuration)
if clientCfg.ClusterValidation.Label != "" {
unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.ClusterValidation.Label, invalidClusterValidation, logger))
}
opts, err := clientCfg.DialOption(unary, stream)
if err != nil {
return nil, err
}
Expand Down
3 changes: 2 additions & 1 deletion pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -42,7 +43,7 @@ func Test_newRulerClientFactory(t *testing.T) {
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
factory := newRulerClientFactory(cfg, reg)
factory := newRulerClientFactory(cfg, reg, log.NewNopLogger())

for i := 0; i < 2; i++ {
inst := ring.InstanceDesc{Addr: listener.Addr().String()}
Expand Down

0 comments on commit 145b51d

Please sign in to comment.