Skip to content

Commit

Permalink
Configure gRPC clients for ruler->ruler communications
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Mar 7, 2025
1 parent 7a47cd8 commit a1f6777
Show file tree
Hide file tree
Showing 4 changed files with 148 additions and 7 deletions.
1 change: 1 addition & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -232,6 +232,7 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance {
"frontend_query_scheduler_client": &c.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation,
"querier_store_gateway_client": &c.Querier.StoreGatewayClient.ClusterValidation,
"scheduler_query_frontend_client": &c.QueryScheduler.GRPCClientConfig.ClusterValidation,
"ruler_client": &c.Ruler.ClientTLSConfig.ClusterValidation,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mimir/mimir_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -42,6 +42,7 @@ func TestCommonConfigCanBeExtended(t *testing.T) {
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label)
})

t.Run("yaml inheritance", func(t *testing.T) {
Expand Down Expand Up @@ -73,6 +74,7 @@ common:
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.QueryScheduler.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Ruler.ClientTLSConfig.ClusterValidation.Label)
})
}

Expand Down
18 changes: 13 additions & 5 deletions pkg/ruler/client_pool.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (

"github.com/go-kit/log"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/grafana/dskit/services"
Expand All @@ -18,6 +19,8 @@ import (
"github.com/prometheus/client_golang/prometheus/promauto"
"google.golang.org/grpc"
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/mimir/pkg/util"
)

// ClientsPool is the interface used to get the client from the pool for a specified address.
Expand Down Expand Up @@ -53,24 +56,29 @@ func newRulerClientPool(clientCfg grpcclient.Config, logger log.Logger, reg prom
})

return &rulerClientsPool{
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg), clientsCount, logger),
client.NewPool("ruler", poolCfg, nil, newRulerClientFactory(clientCfg, reg, logger), clientsCount, logger),
}
}

func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
func newRulerClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, logger log.Logger) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Name: "cortex_ruler_client_request_duration_seconds",
Help: "Time spent executing requests to the ruler.",
Buckets: prometheus.ExponentialBuckets(0.008, 4, 7),
}, []string{"operation", "status_code"})
invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "ruler", util.GRPCProtocol)

return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) {
return dialRulerClient(clientCfg, inst, requestDuration)
return dialRulerClient(clientCfg, inst, requestDuration, invalidClusterValidation, logger)
})
}

func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*rulerExtendedClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
func dialRulerClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, invalidClusterValidation *prometheus.CounterVec, logger log.Logger) (*rulerExtendedClient, error) {
unary, stream := grpcclient.Instrument(requestDuration)
if clientCfg.ClusterValidation.Label != "" {
unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.ClusterValidation.Label, invalidClusterValidation, logger))
}
opts, err := clientCfg.DialOption(unary, stream)
if err != nil {
return nil, err
}
Expand Down
134 changes: 132 additions & 2 deletions pkg/ruler/client_pool_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,15 +10,21 @@ import (
"net"
"testing"

"github.com/go-kit/log"
"github.com/gogo/status"
"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/grpcutil"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/user"
"github.com/prometheus/client_golang/prometheus"
dto "github.com/prometheus/client_model/go"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"google.golang.org/grpc"
"google.golang.org/grpc/codes"
)

func Test_newRulerClientFactory(t *testing.T) {
Expand All @@ -42,7 +48,7 @@ func Test_newRulerClientFactory(t *testing.T) {
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
factory := newRulerClientFactory(cfg, reg)
factory := newRulerClientFactory(cfg, reg, log.NewNopLogger())

for i := 0; i < 2; i++ {
inst := ring.InstanceDesc{Addr: listener.Addr().String()}
Expand All @@ -67,9 +73,133 @@ func Test_newRulerClientFactory(t *testing.T) {
assert.Equal(t, uint64(2), metrics[0].GetMetric()[0].GetHistogram().GetSampleCount())
}

func TestClientPool_ClusterValidation(t *testing.T) {
testCases := map[string]struct {
serverClusterValidation clusterutil.ClusterValidationConfig
clientClusterValidation clusterutil.ClientClusterValidationConfig
expectedError *status.Status
}{
"if server and client have equal cluster validation labels and cluster validation is disabled no error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{Enabled: false},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{Label: "cluster"},
expectedError: nil,
},
"if server and client have different cluster validation labels and cluster validation is disabled no error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "server-cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{Enabled: false},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{Label: "client-cluster"},
expectedError: nil,
},
"if server and client have equal cluster validation labels and cluster validation is enabled no error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{Enabled: true},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{Label: "cluster"},
expectedError: nil,
},
"if server and client have different cluster validation labels and soft cluster validation is enabled no error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "server-cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{
Enabled: true,
SoftValidation: true,
},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{Label: "client-cluster"},
expectedError: nil,
},
"if server and client have different cluster validation labels and cluster validation is enabled an error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "server-cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{
Enabled: true,
SoftValidation: false,
},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{Label: "client-cluster"},
expectedError: grpcutil.Status(codes.Internal, `request rejected by the server: rejected request with wrong cluster validation label "client-cluster" - it should be "server-cluster"`),
},
"if client has no cluster validation label and soft cluster validation is enabled no error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "server-cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{
Enabled: true,
SoftValidation: true,
},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{},
expectedError: nil,
},
"if client has no cluster validation label and cluster validation is enabled an error is returned": {
serverClusterValidation: clusterutil.ClusterValidationConfig{
Label: "server-cluster",
GRPC: clusterutil.ClusterValidationProtocolConfig{
Enabled: true,
SoftValidation: false,
},
},
clientClusterValidation: clusterutil.ClientClusterValidationConfig{},
expectedError: grpcutil.Status(codes.FailedPrecondition, `rejected request with empty cluster validation label - it should be "server-cluster"`),
},
}
for testName, testCase := range testCases {
t.Run(testName, func(t *testing.T) {
var grpcOptions []grpc.ServerOption
if testCase.serverClusterValidation.GRPC.Enabled {
grpcOptions = []grpc.ServerOption{
grpc.ChainUnaryInterceptor(middleware.ClusterUnaryServerInterceptor(testCase.serverClusterValidation.Label, testCase.serverClusterValidation.GRPC.SoftValidation, log.NewNopLogger())),
}
}

grpcServer := grpc.NewServer(grpcOptions...)
defer grpcServer.GracefulStop()

srv := &mockRulerServer{}
RegisterRulerServer(grpcServer, srv)

listener, err := net.Listen("tcp", "localhost:0")
require.NoError(t, err)

go func() {
require.NoError(t, grpcServer.Serve(listener))
}()

// Create a client factory and query back the mocked service
// with different clients.
cfg := grpcclient.Config{}
flagext.DefaultValues(&cfg)
cfg.ClusterValidation = testCase.clientClusterValidation

reg := prometheus.NewPedanticRegistry()
factory := newRulerClientFactory(cfg, reg, log.NewNopLogger())
inst := ring.InstanceDesc{Addr: listener.Addr().String()}
client, err := factory.FromInstance(inst)
require.NoError(t, err)
defer client.Close() //nolint:errcheck

ctx := user.InjectOrgID(context.Background(), "test")
_, err = client.(*rulerExtendedClient).Rules(ctx, &RulesRequest{})
if testCase.expectedError == nil {
require.NoError(t, err)
} else {
stat, ok := grpcutil.ErrorToStatus(err)
require.True(t, ok)
require.Equal(t, testCase.expectedError.Code(), stat.Code())
require.Equal(t, testCase.expectedError.Message(), stat.Message())
}
})
}
}

type mockRulerServer struct{}

func (m *mockRulerServer) Rules(context.Context, *RulesRequest) (*RulesResponse, error) {
func (m *mockRulerServer) Rules(ctx context.Context, req *RulesRequest) (*RulesResponse, error) {
return &RulesResponse{}, nil
}

Expand Down

0 comments on commit a1f6777

Please sign in to comment.