Skip to content

Commit

Permalink
Configure gRPC clients for querier->store-gateway communications
Browse files Browse the repository at this point in the history
Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Mar 6, 2025
1 parent ed4205b commit 80900b8
Show file tree
Hide file tree
Showing 8 changed files with 54 additions and 9 deletions.
21 changes: 21 additions & 0 deletions cmd/mimir/config-descriptor.json
Original file line number Diff line number Diff line change
Expand Up @@ -1868,6 +1868,27 @@
"fieldFlag": "querier.store-gateway-client.tls-min-version",
"fieldType": "string",
"fieldCategory": "advanced"
},
{
"kind": "block",
"name": "cluster_validation",
"required": false,
"desc": "",
"blockEntries": [
{
"kind": "field",
"name": "label",
"required": false,
"desc": "Optionally define the cluster validation label to be sent together with the requests by the clients.",
"fieldValue": null,
"fieldDefaultValue": "",
"fieldFlag": "querier.store-gateway-client.cluster-validation.label",
"fieldType": "string",
"fieldCategory": "experimental"
}
],
"fieldValue": null,
"fieldDefaultValue": null
}
],
"fieldValue": null,
Expand Down
2 changes: 2 additions & 0 deletions cmd/mimir/help-all.txt.tmpl
Original file line number Diff line number Diff line change
Expand Up @@ -2271,6 +2271,8 @@ Usage of ./cmd/mimir/mimir:
Override the expected name on the server certificate.
-querier.shuffle-sharding-ingesters-enabled
Fetch in-memory series from the minimum set of required ingesters, selecting only ingesters which may have received series since -querier.query-ingesters-within. If this setting is false or -querier.query-ingesters-within is '0', queriers always query all ingesters (ingesters shuffle sharding on read path is disabled). (default true)
-querier.store-gateway-client.cluster-validation.label string
[experimental] Optionally define the cluster validation label to be sent together with the requests by the clients.
-querier.store-gateway-client.tls-ca-path string
Path to the CA certificates to validate server certificate against. If not set, the host's root CA certificates are used.
-querier.store-gateway-client.tls-cert-path string
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -1591,6 +1591,12 @@ store_gateway_client:
# CLI flag: -querier.store-gateway-client.tls-min-version
[tls_min_version: <string> | default = ""]
cluster_validation:
# (experimental) Optionally define the cluster validation label to be sent
# together with the requests by the clients.
# CLI flag: -querier.store-gateway-client.cluster-validation.label
[label: <string> | default = ""]
# (advanced) Fetch in-memory series from the minimum set of required ingesters,
# selecting only ingesters which may have received series since
# -querier.query-ingesters-within. If this setting is false or
Expand Down
1 change: 1 addition & 0 deletions pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -230,6 +230,7 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance {
"frontend_worker_scheduler_client": &c.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation,
"block_builder_scheduler_client": &c.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation,
"frontend_query_scheduler_client": &c.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation,
"querier_store_gateway_client": &c.Querier.StoreGatewayClient.ClusterValidation,
},
}
}
Expand Down
2 changes: 2 additions & 0 deletions pkg/mimir/mimir_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -40,6 +40,7 @@ func TestCommonConfigCanBeExtended(t *testing.T) {
require.Equal(t, "client-cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
})

t.Run("yaml inheritance", func(t *testing.T) {
Expand Down Expand Up @@ -69,6 +70,7 @@ common:
require.Equal(t, "client-cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.BlockBuilder.SchedulerConfig.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Frontend.FrontendV2.GRPCClientConfig.ClusterValidation.Label)
require.Equal(t, "client-cluster", cfg.MimirConfig.Querier.StoreGatewayClient.ClusterValidation.Label)
})
}

Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/blocks_store_queryable_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -1779,7 +1779,7 @@ func TestBlocksStoreQuerier_ShouldReturnContextCanceledIfContextWasCanceledWhile
clientCfg := grpcclient.Config{}
flagext.DefaultValues(&clientCfg)

client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"}))
client, err := dialStoreGatewayClient(clientCfg, ring.InstanceDesc{Addr: listener.Addr().String()}, promauto.With(nil).NewHistogramVec(prometheus.HistogramOpts{}, []string{"route", "status_code"}), util.NewRequestInvalidClusterValidationLabelsTotalCounter(nil, "store-gateway", util.GRPCProtocol), log.NewNopLogger())
require.NoError(t, err)
t.Cleanup(func() {
require.NoError(t, client.Close())
Expand Down
26 changes: 19 additions & 7 deletions pkg/querier/store_gateway_client.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,8 +10,10 @@ import (
"time"

"github.com/go-kit/log"
"github.com/grafana/dskit/clusterutil"
"github.com/grafana/dskit/crypto/tls"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/ring"
"github.com/grafana/dskit/ring/client"
"github.com/pkg/errors"
Expand All @@ -21,9 +23,10 @@ import (
"google.golang.org/grpc/health/grpc_health_v1"

"github.com/grafana/mimir/pkg/storegateway/storegatewaypb"
"github.com/grafana/mimir/pkg/util"
)

func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer) client.PoolFactory {
func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Registerer, logger log.Logger) client.PoolFactory {
requestDuration := promauto.With(reg).NewHistogramVec(prometheus.HistogramOpts{
Namespace: "cortex",
Name: "storegateway_client_request_duration_seconds",
Expand All @@ -32,13 +35,19 @@ func newStoreGatewayClientFactory(clientCfg grpcclient.Config, reg prometheus.Re
ConstLabels: prometheus.Labels{"client": "querier"},
}, []string{"operation", "status_code"})

invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, "store-gateway", util.GRPCProtocol)

return client.PoolInstFunc(func(inst ring.InstanceDesc) (client.PoolClient, error) {
return dialStoreGatewayClient(clientCfg, inst, requestDuration)
return dialStoreGatewayClient(clientCfg, inst, requestDuration, invalidClusterValidation, logger)
})
}

func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec) (*storeGatewayClient, error) {
opts, err := clientCfg.DialOption(grpcclient.Instrument(requestDuration))
func dialStoreGatewayClient(clientCfg grpcclient.Config, inst ring.InstanceDesc, requestDuration *prometheus.HistogramVec, invalidClusterValidation *prometheus.CounterVec, logger log.Logger) (*storeGatewayClient, error) {
unary, stream := grpcclient.Instrument(requestDuration)
if clientCfg.ClusterValidation.Label != "" {
unary = append(unary, middleware.ClusterUnaryClientInterceptor(clientCfg.ClusterValidation.Label, invalidClusterValidation, logger))
}
opts, err := clientCfg.DialOption(unary, stream)
if err != nil {
return nil, err
}
Expand Down Expand Up @@ -85,6 +94,7 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
BackoffOnRatelimits: false,
TLSEnabled: clientConfig.TLSEnabled,
TLS: clientConfig.TLS,
ClusterValidation: clientConfig.ClusterValidation,
}
poolCfg := client.PoolConfig{
CheckInterval: 10 * time.Second,
Expand All @@ -99,15 +109,17 @@ func newStoreGatewayClientPool(discovery client.PoolServiceDiscovery, clientConf
ConstLabels: map[string]string{"client": "querier"},
})

return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg), clientsCount, logger)
return client.NewPool("store-gateway", poolCfg, discovery, newStoreGatewayClientFactory(clientCfg, reg, logger), clientsCount, logger)
}

type ClientConfig struct {
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
TLSEnabled bool `yaml:"tls_enabled" category:"advanced"`
TLS tls.ClientConfig `yaml:",inline"`
ClusterValidation clusterutil.ClientClusterValidationConfig `yaml:"cluster_validation"`
}

func (cfg *ClientConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) {
f.BoolVar(&cfg.TLSEnabled, prefix+".tls-enabled", cfg.TLSEnabled, "Enable TLS for gRPC client connecting to store-gateway.")
cfg.TLS.RegisterFlagsWithPrefix(prefix, f)
cfg.ClusterValidation.RegisterAndTrackFlagsWithPrefix(prefix+".cluster-validation.", f)
}
3 changes: 2 additions & 1 deletion pkg/querier/store_gateway_client_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -10,6 +10,7 @@ import (
"net"
"testing"

"github.com/go-kit/log"
"github.com/grafana/dskit/flagext"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/ring"
Expand Down Expand Up @@ -45,7 +46,7 @@ func Test_newStoreGatewayClientFactory(t *testing.T) {
flagext.DefaultValues(&cfg)

reg := prometheus.NewPedanticRegistry()
factory := newStoreGatewayClientFactory(cfg, reg)
factory := newStoreGatewayClientFactory(cfg, reg, log.NewNopLogger())

for i := 0; i < 2; i++ {
inst := ring.InstanceDesc{Addr: listener.Addr().String()}
Expand Down

0 comments on commit 80900b8

Please sign in to comment.