diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index 7f39ff1e28c..b67f02073f4 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -3061,8 +3061,6 @@ grpc_client_config: The `grpc_client` block configures the gRPC client used to communicate between two Mimir components. The supported CLI flags `` used to reference this configuration block are: -- `querier.frontend-client` -- `querier.scheduler-client` - `query-frontend.grpc-client-config` - `query-scheduler.grpc-client-config` - `ruler.client` @@ -3240,13 +3238,283 @@ The `frontend_worker` block configures the worker running within the querier, pi # Configures the gRPC client used to communicate between the querier and the # query-frontend. -# The CLI flags prefix for this block configuration is: querier.frontend-client -[grpc_client_config: ] +grpc_client_config: + # (advanced) gRPC client max receive message size (bytes). + # CLI flag: -querier.frontend-client.grpc-max-recv-msg-size + [max_recv_msg_size: | default = 104857600] + + # (advanced) gRPC client max send message size (bytes). + # CLI flag: -querier.frontend-client.grpc-max-send-msg-size + [max_send_msg_size: | default = 104857600] + + # (advanced) Use compression when sending messages. Supported values are: + # 'gzip', 'snappy', 's2' and '' (disable compression) + # CLI flag: -querier.frontend-client.grpc-compression + [grpc_compression: | default = ""] + + # (advanced) Rate limit for gRPC client; 0 means disabled. + # CLI flag: -querier.frontend-client.grpc-client-rate-limit + [rate_limit: | default = 0] + + # (advanced) Rate limit burst for gRPC client. + # CLI flag: -querier.frontend-client.grpc-client-rate-limit-burst + [rate_limit_burst: | default = 0] + + # (advanced) Enable backoff and retry when we hit rate limits. + # CLI flag: -querier.frontend-client.backoff-on-ratelimits + [backoff_on_ratelimits: | default = false] + + backoff_config: + # (advanced) Minimum delay when backing off. + # CLI flag: -querier.frontend-client.backoff-min-period + [min_period: | default = 100ms] + + # (advanced) Maximum delay when backing off. + # CLI flag: -querier.frontend-client.backoff-max-period + [max_period: | default = 10s] + + # (advanced) Number of times to backoff and retry before failing. + # CLI flag: -querier.frontend-client.backoff-retries + [max_retries: | default = 10] + + # (experimental) Initial stream window size. Values less than the default are + # not supported and are ignored. Setting this to a value other than the + # default disables the BDP estimator. + # CLI flag: -querier.frontend-client.initial-stream-window-size + [initial_stream_window_size: | default = 63KiB1023B] + + # (experimental) Initial connection window size. Values less than the default + # are not supported and are ignored. Setting this to a value other than the + # default disables the BDP estimator. + # CLI flag: -querier.frontend-client.initial-connection-window-size + [initial_connection_window_size: | default = 63KiB1023B] + + # (advanced) Enable TLS in the gRPC client. This flag needs to be enabled when + # any other TLS flag is set. If set to false, insecure connection to gRPC + # server will be used. + # CLI flag: -querier.frontend-client.tls-enabled + [tls_enabled: | default = false] + + # (advanced) Path to the client certificate, which will be used for + # authenticating with the server. Also requires the key path to be configured. + # CLI flag: -querier.frontend-client.tls-cert-path + [tls_cert_path: | default = ""] + + # (advanced) Path to the key for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -querier.frontend-client.tls-key-path + [tls_key_path: | default = ""] + + # (advanced) Path to the CA certificates to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -querier.frontend-client.tls-ca-path + [tls_ca_path: | default = ""] + + # (advanced) Override the expected name on the server certificate. + # CLI flag: -querier.frontend-client.tls-server-name + [tls_server_name: | default = ""] + + # (advanced) Skip validating server certificate. + # CLI flag: -querier.frontend-client.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # (advanced) Override the default cipher suite list (separated by commas). + # Allowed values: + # + # Secure Ciphers: + # - TLS_AES_128_GCM_SHA256 + # - TLS_AES_256_GCM_SHA384 + # - TLS_CHACHA20_POLY1305_SHA256 + # - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA + # - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA + # - TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 + # - TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + # - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256 + # - TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 + # + # Insecure Ciphers: + # - TLS_RSA_WITH_RC4_128_SHA + # - TLS_RSA_WITH_3DES_EDE_CBC_SHA + # - TLS_RSA_WITH_AES_128_CBC_SHA + # - TLS_RSA_WITH_AES_256_CBC_SHA + # - TLS_RSA_WITH_AES_128_CBC_SHA256 + # - TLS_RSA_WITH_AES_128_GCM_SHA256 + # - TLS_RSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA + # - TLS_ECDHE_RSA_WITH_RC4_128_SHA + # - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 + # - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 + # CLI flag: -querier.frontend-client.tls-cipher-suites + [tls_cipher_suites: | default = ""] + + # (advanced) Override the default minimum TLS version. Allowed values: + # VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13 + # CLI flag: -querier.frontend-client.tls-min-version + [tls_min_version: | default = ""] + + # (advanced) The maximum amount of time to establish a connection. A value of + # 0 means default gRPC client connect timeout and backoff. + # CLI flag: -querier.frontend-client.connect-timeout + [connect_timeout: | default = 5s] + + # (advanced) Initial backoff delay after first connection failure. Only + # relevant if ConnectTimeout > 0. + # CLI flag: -querier.frontend-client.connect-backoff-base-delay + [connect_backoff_base_delay: | default = 1s] + + # (advanced) Maximum backoff delay when establishing a connection. Only + # relevant if ConnectTimeout > 0. + # CLI flag: -querier.frontend-client.connect-backoff-max-delay + [connect_backoff_max_delay: | default = 5s] + + # (experimental) Optionally define gRPC client's cluster validation label. + # CLI flag: -querier.frontend-client.cluster-validation-label + [cluster_validation_label: | default = ""] # Configures the gRPC client used to communicate between the querier and the # query-scheduler. -# The CLI flags prefix for this block configuration is: querier.scheduler-client -[query_scheduler_grpc_client_config: ] +query_scheduler_grpc_client_config: + # (advanced) gRPC client max receive message size (bytes). + # CLI flag: -querier.scheduler-client.grpc-max-recv-msg-size + [max_recv_msg_size: | default = 104857600] + + # (advanced) gRPC client max send message size (bytes). + # CLI flag: -querier.scheduler-client.grpc-max-send-msg-size + [max_send_msg_size: | default = 104857600] + + # (advanced) Use compression when sending messages. Supported values are: + # 'gzip', 'snappy', 's2' and '' (disable compression) + # CLI flag: -querier.scheduler-client.grpc-compression + [grpc_compression: | default = ""] + + # (advanced) Rate limit for gRPC client; 0 means disabled. + # CLI flag: -querier.scheduler-client.grpc-client-rate-limit + [rate_limit: | default = 0] + + # (advanced) Rate limit burst for gRPC client. + # CLI flag: -querier.scheduler-client.grpc-client-rate-limit-burst + [rate_limit_burst: | default = 0] + + # (advanced) Enable backoff and retry when we hit rate limits. + # CLI flag: -querier.scheduler-client.backoff-on-ratelimits + [backoff_on_ratelimits: | default = false] + + backoff_config: + # (advanced) Minimum delay when backing off. + # CLI flag: -querier.scheduler-client.backoff-min-period + [min_period: | default = 100ms] + + # (advanced) Maximum delay when backing off. + # CLI flag: -querier.scheduler-client.backoff-max-period + [max_period: | default = 10s] + + # (advanced) Number of times to backoff and retry before failing. + # CLI flag: -querier.scheduler-client.backoff-retries + [max_retries: | default = 10] + + # (experimental) Initial stream window size. Values less than the default are + # not supported and are ignored. Setting this to a value other than the + # default disables the BDP estimator. + # CLI flag: -querier.scheduler-client.initial-stream-window-size + [initial_stream_window_size: | default = 63KiB1023B] + + # (experimental) Initial connection window size. Values less than the default + # are not supported and are ignored. Setting this to a value other than the + # default disables the BDP estimator. + # CLI flag: -querier.scheduler-client.initial-connection-window-size + [initial_connection_window_size: | default = 63KiB1023B] + + # (advanced) Enable TLS in the gRPC client. This flag needs to be enabled when + # any other TLS flag is set. If set to false, insecure connection to gRPC + # server will be used. + # CLI flag: -querier.scheduler-client.tls-enabled + [tls_enabled: | default = false] + + # (advanced) Path to the client certificate, which will be used for + # authenticating with the server. Also requires the key path to be configured. + # CLI flag: -querier.scheduler-client.tls-cert-path + [tls_cert_path: | default = ""] + + # (advanced) Path to the key for the client certificate. Also requires the + # client certificate to be configured. + # CLI flag: -querier.scheduler-client.tls-key-path + [tls_key_path: | default = ""] + + # (advanced) Path to the CA certificates to validate server certificate + # against. If not set, the host's root CA certificates are used. + # CLI flag: -querier.scheduler-client.tls-ca-path + [tls_ca_path: | default = ""] + + # (advanced) Override the expected name on the server certificate. + # CLI flag: -querier.scheduler-client.tls-server-name + [tls_server_name: | default = ""] + + # (advanced) Skip validating server certificate. + # CLI flag: -querier.scheduler-client.tls-insecure-skip-verify + [tls_insecure_skip_verify: | default = false] + + # (advanced) Override the default cipher suite list (separated by commas). + # Allowed values: + # + # Secure Ciphers: + # - TLS_AES_128_GCM_SHA256 + # - TLS_AES_256_GCM_SHA384 + # - TLS_CHACHA20_POLY1305_SHA256 + # - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_256_CBC_SHA + # - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA + # - TLS_ECDHE_RSA_WITH_AES_256_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_128_GCM_SHA256 + # - TLS_ECDHE_ECDSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_RSA_WITH_AES_128_GCM_SHA256 + # - TLS_ECDHE_RSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_RSA_WITH_CHACHA20_POLY1305_SHA256 + # - TLS_ECDHE_ECDSA_WITH_CHACHA20_POLY1305_SHA256 + # + # Insecure Ciphers: + # - TLS_RSA_WITH_RC4_128_SHA + # - TLS_RSA_WITH_3DES_EDE_CBC_SHA + # - TLS_RSA_WITH_AES_128_CBC_SHA + # - TLS_RSA_WITH_AES_256_CBC_SHA + # - TLS_RSA_WITH_AES_128_CBC_SHA256 + # - TLS_RSA_WITH_AES_128_GCM_SHA256 + # - TLS_RSA_WITH_AES_256_GCM_SHA384 + # - TLS_ECDHE_ECDSA_WITH_RC4_128_SHA + # - TLS_ECDHE_RSA_WITH_RC4_128_SHA + # - TLS_ECDHE_RSA_WITH_3DES_EDE_CBC_SHA + # - TLS_ECDHE_ECDSA_WITH_AES_128_CBC_SHA256 + # - TLS_ECDHE_RSA_WITH_AES_128_CBC_SHA256 + # CLI flag: -querier.scheduler-client.tls-cipher-suites + [tls_cipher_suites: | default = ""] + + # (advanced) Override the default minimum TLS version. Allowed values: + # VersionTLS10, VersionTLS11, VersionTLS12, VersionTLS13 + # CLI flag: -querier.scheduler-client.tls-min-version + [tls_min_version: | default = ""] + + # (advanced) The maximum amount of time to establish a connection. A value of + # 0 means default gRPC client connect timeout and backoff. + # CLI flag: -querier.scheduler-client.connect-timeout + [connect_timeout: | default = 5s] + + # (advanced) Initial backoff delay after first connection failure. Only + # relevant if ConnectTimeout > 0. + # CLI flag: -querier.scheduler-client.connect-backoff-base-delay + [connect_backoff_base_delay: | default = 1s] + + # (advanced) Maximum backoff delay when establishing a connection. Only + # relevant if ConnectTimeout > 0. + # CLI flag: -querier.scheduler-client.connect-backoff-max-delay + [connect_backoff_max_delay: | default = 5s] + + # (experimental) Optionally define gRPC client's cluster validation label. + # CLI flag: -querier.scheduler-client.cluster-validation-label + [cluster_validation_label: | default = ""] # (experimental) Enables streaming of responses from querier to query-frontend # for response types that support it (currently only `active_series` responses diff --git a/go.sum b/go.sum index 4c3fed840c9..8e58a3007a8 100644 --- a/go.sum +++ b/go.sum @@ -1272,8 +1272,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250303095629-6fd0c494dfa3 h1:otX6Lww40e6jEsc4AQBdYmgDsaryAZkBTdBvWUsnDxk= github.com/grafana/alerting v0.0.0-20250303095629-6fd0c494dfa3/go.mod h1:p5P86ajOwL2XPJN6+xu6QJUtO4DptdFiqJ35r3Ac4HY= -github.com/grafana/dskit v0.0.0-20250303172748-fd4441b85237 h1:VZagYtPcmjgazfPAuWN7lER6mprG20r51+1eYPpATkw= -github.com/grafana/dskit v0.0.0-20250303172748-fd4441b85237/go.mod h1:cu2zIOHhAgRaIDuECsERftSp1l7KHq1aX1jgihQCu0c= +github.com/grafana/dskit v0.0.0-20250303214858-d23654211757 h1:nAd6h3RfteaAMeTO4cJLcPQGm1X7uYxv5oAhZICkBNw= +github.com/grafana/dskit v0.0.0-20250303214858-d23654211757/go.mod h1:cu2zIOHhAgRaIDuECsERftSp1l7KHq1aX1jgihQCu0c= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/pkg/mimir/mimir.go b/pkg/mimir/mimir.go index 927d2b00509..9ae5f21ac56 100644 --- a/pkg/mimir/mimir.go +++ b/pkg/mimir/mimir.go @@ -224,7 +224,9 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance { "alertmanager_storage": &c.AlertmanagerStorage.StorageBackendConfig, }, GRPCClient: map[string]*util.GRPCClientConfig{ - "ingester_client": &c.IngesterClient.GRPCClientConfig, + "ingester_client": &c.IngesterClient.GRPCClientConfig, + "frontend_worker_frontend_client": &c.Worker.QueryFrontendGRPCClientConfig, + "frontend_worker_scheduler_client": &c.Worker.QuerySchedulerGRPCClientConfig, }, } } diff --git a/pkg/mimir/mimir_config_test.go b/pkg/mimir/mimir_config_test.go index 03d834236ec..8b147a8bf02 100644 --- a/pkg/mimir/mimir_config_test.go +++ b/pkg/mimir/mimir_config_test.go @@ -11,7 +11,6 @@ import ( "gopkg.in/yaml.v3" ingester_client "github.com/grafana/mimir/pkg/ingester/client" - "github.com/grafana/mimir/pkg/mimir" "github.com/grafana/mimir/pkg/storage/bucket" "github.com/grafana/mimir/pkg/util" @@ -34,13 +33,17 @@ func TestCommonConfigCanBeExtended(t *testing.T) { // Values should be properly inherited. require.Equal(t, "s3", cfg.CustomStorage.Backend) - require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize) - require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel) // Mimir's inheritance should still work. require.Equal(t, "s3", cfg.MimirConfig.BlocksStorage.Bucket.Backend) require.Equal(t, 1000000, cfg.MimirConfig.IngesterClient.GRPCClientConfig.MaxRecvMsgSize) require.Equal(t, "cluster", cfg.MimirConfig.IngesterClient.GRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidationLabel) }) t.Run("yaml inheritance", func(t *testing.T) { @@ -62,26 +65,30 @@ common: // Values should be properly inherited. require.Equal(t, "s3", cfg.CustomStorage.Backend) - require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize) - require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel) // Mimir's inheritance should still work. require.Equal(t, "s3", cfg.MimirConfig.BlocksStorage.Bucket.Backend) require.Equal(t, 1000000, cfg.MimirConfig.IngesterClient.GRPCClientConfig.MaxRecvMsgSize) require.Equal(t, "cluster", cfg.MimirConfig.IngesterClient.GRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidationLabel) }) } type customExtendedConfig struct { - MimirConfig mimir.Config `yaml:",inline"` - CustomStorage bucket.Config `yaml:"custom_storage"` - CustomIngesterClient ingester_client.Config `yaml:"custom_ingester_client"` + MimirConfig mimir.Config `yaml:",inline"` + CustomStorage bucket.Config `yaml:"custom_storage"` + CustomGRPCClient ingester_client.Config `yaml:"custom_grpc_client"` } func (c *customExtendedConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) { c.MimirConfig.RegisterFlags(f, logger) c.CustomStorage.RegisterFlagsWithPrefix("custom-storage", f) - c.CustomIngesterClient.RegisterFlagsWithPrefix("custom-ingester-client", f) + c.CustomGRPCClient.RegisterFlagsWithPrefix("custom-ingester-client", f) } func (c *customExtendedConfig) CommonConfigInheritance() mimir.CommonConfigInheritance { @@ -90,7 +97,7 @@ func (c *customExtendedConfig) CommonConfigInheritance() mimir.CommonConfigInher "custom": &c.CustomStorage.StorageBackendConfig, }, GRPCClient: map[string]*util.GRPCClientConfig{ - "custom_grpc_client": &c.CustomIngesterClient.GRPCClientConfig, + "custom_grpc_client": &c.CustomGRPCClient.GRPCClientConfig, }, } } @@ -108,7 +115,7 @@ func TestMimirConfigCanBeInlined(t *testing.T) { const commonYAMLConfig = ` custom_storage: backend: s3 -custom_ingester_client: +custom_grpc_client: grpc_client_config: max_recv_msg_size: 1000000 cluster_validation_label: cluster @@ -123,6 +130,6 @@ custom_ingester_client: // Value should be properly set. require.Equal(t, "s3", cfg.CustomStorage.Backend) - require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize) - require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel) + require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize) + require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel) } diff --git a/pkg/querier/worker/frontend_processor_test.go b/pkg/querier/worker/frontend_processor_test.go index 60f729e64ee..101a330352e 100644 --- a/pkg/querier/worker/frontend_processor_test.go +++ b/pkg/querier/worker/frontend_processor_test.go @@ -29,6 +29,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb" "github.com/grafana/mimir/pkg/querier/stats" + "github.com/grafana/mimir/pkg/util" ) func TestFrontendProcessor_processQueriesOnSingleStream(t *testing.T) { @@ -231,7 +232,13 @@ func prepareFrontendProcessor() (*frontendProcessor, *frontendProcessClientMock, requestHandler := &requestHandlerMock{} - fp := newFrontendProcessor(Config{QuerierID: "test-querier-id", QueryFrontendGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 1}}, requestHandler, log.NewNopLogger()) + fp := newFrontendProcessor( + Config{ + QuerierID: "test-querier-id", + QueryFrontendGRPCClientConfig: util.GRPCClientConfig{Config: grpcclient.Config{MaxSendMsgSize: 1}}, + }, + requestHandler, log.NewNopLogger(), + ) fp.frontendClientFactory = func(_ *grpc.ClientConn) frontendv1pb.FrontendClient { return frontendClient } diff --git a/pkg/querier/worker/scheduler_processor.go b/pkg/querier/worker/scheduler_processor.go index d1ec2da6990..d8c31d0056f 100644 --- a/pkg/querier/worker/scheduler_processor.go +++ b/pkg/querier/worker/scheduler_processor.go @@ -57,7 +57,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r streamResponse: streamResponse, maxMessageSize: cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize, querierID: cfg.QuerierID, - grpcConfig: cfg.QueryFrontendGRPCClientConfig, + grpcConfig: cfg.QueryFrontendGRPCClientConfig.Config, streamingEnabled: cfg.ResponseStreamingEnabled, schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient { diff --git a/pkg/querier/worker/scheduler_processor_test.go b/pkg/querier/worker/scheduler_processor_test.go index 26d45ccdcd0..025d42c8bd0 100644 --- a/pkg/querier/worker/scheduler_processor_test.go +++ b/pkg/querier/worker/scheduler_processor_test.go @@ -32,6 +32,7 @@ import ( "github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb" querier_stats "github.com/grafana/mimir/pkg/querier/stats" "github.com/grafana/mimir/pkg/scheduler/schedulerpb" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/test" ) @@ -329,7 +330,7 @@ func TestSchedulerProcessor_QueryTime(t *testing.T) { } func TestCreateSchedulerProcessor(t *testing.T) { - conf := grpcclient.Config{} + conf := util.GRPCClientConfig{Config: grpcclient.Config{}} flagext.DefaultValues(&conf) conf.MaxSendMsgSize = 1 * 1024 * 1024 @@ -337,12 +338,12 @@ func TestCreateSchedulerProcessor(t *testing.T) { SchedulerAddress: "sched:12345", QuerierID: "test", QueryFrontendGRPCClientConfig: conf, - QuerySchedulerGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 5 * 1024}, // schedulerProcessor should ignore this. + QuerySchedulerGRPCClientConfig: util.GRPCClientConfig{Config: grpcclient.Config{MaxSendMsgSize: 5 * 1024}}, // schedulerProcessor should ignore this. MaxConcurrentRequests: 5, }, nil, nil, nil) assert.Equal(t, 1*1024*1024, sp.maxMessageSize) - assert.Equal(t, conf, sp.grpcConfig) + assert.Equal(t, conf.Config, sp.grpcConfig) } func TestSchedulerProcessor_ResponseStream(t *testing.T) { diff --git a/pkg/querier/worker/worker.go b/pkg/querier/worker/worker.go index 2b9459a116e..b4cedd8c77a 100644 --- a/pkg/querier/worker/worker.go +++ b/pkg/querier/worker/worker.go @@ -15,8 +15,8 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/grafana/dskit/grpcclient" "github.com/grafana/dskit/httpgrpc" + "github.com/grafana/dskit/middleware" "github.com/grafana/dskit/servicediscovery" "github.com/grafana/dskit/services" "github.com/pkg/errors" @@ -24,17 +24,18 @@ import ( "google.golang.org/grpc" "github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery" + "github.com/grafana/mimir/pkg/util" "github.com/grafana/mimir/pkg/util/grpcencoding/s2" ) type Config struct { - FrontendAddress string `yaml:"frontend_address"` - SchedulerAddress string `yaml:"scheduler_address"` - DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration" category:"advanced"` - QuerierID string `yaml:"id" category:"advanced"` - QueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-frontend."` - QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-scheduler."` - ResponseStreamingEnabled bool `yaml:"response_streaming_enabled" category:"experimental"` + FrontendAddress string `yaml:"frontend_address"` + SchedulerAddress string `yaml:"scheduler_address"` + DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration" category:"advanced"` + QuerierID string `yaml:"id" category:"advanced"` + QueryFrontendGRPCClientConfig util.GRPCClientConfig `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-frontend."` + QuerySchedulerGRPCClientConfig util.GRPCClientConfig `yaml:"query_scheduler_grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-scheduler."` + ResponseStreamingEnabled bool `yaml:"response_streaming_enabled" category:"experimental"` // This configuration is injected internally. MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine. @@ -105,7 +106,7 @@ type querierWorker struct { *services.BasicService maxConcurrentRequests int - grpcClientConfig grpcclient.Config + grpcClientConfig util.GRPCClientConfig log log.Logger processor processor @@ -117,6 +118,8 @@ type querierWorker struct { mu sync.Mutex managers map[string]*processorManager instances map[string]servicediscovery.Instance + + invalidClusterValidation *prometheus.CounterVec } func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (services.Service, error) { @@ -128,10 +131,13 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr cfg.QuerierID = hostname } - var processor processor - var grpcCfg grpcclient.Config - var servs []services.Service - var factory serviceDiscoveryFactory + var ( + processor processor + grpcCfg util.GRPCClientConfig + workerClient string + servs []services.Service + factory serviceDiscoveryFactory + ) switch { case cfg.SchedulerAddress != "" || cfg.QuerySchedulerDiscovery.Mode == schedulerdiscovery.ModeRing: @@ -142,6 +148,7 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr } grpcCfg = cfg.QuerySchedulerGRPCClientConfig + workerClient = "query-scheduler-worker" processor, servs = newSchedulerProcessor(cfg, handler, log, reg) case cfg.FrontendAddress != "": @@ -152,23 +159,26 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr } grpcCfg = cfg.QueryFrontendGRPCClientConfig + workerClient = "query-frontend-worker" processor = newFrontendProcessor(cfg, handler, log) default: return nil, errors.New("no query-scheduler or query-frontend address") } - return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs) + invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, workerClient, util.GRPCProtocol) + return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs, invalidClusterValidation) } -func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service) (*querierWorker, error) { +func newQuerierWorkerWithProcessor(grpcCfg util.GRPCClientConfig, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service, invalidClusterValidation *prometheus.CounterVec) (*querierWorker, error) { f := &querierWorker{ - grpcClientConfig: grpcCfg, - maxConcurrentRequests: maxConcReq, - log: log, - managers: map[string]*processorManager{}, - instances: map[string]servicediscovery.Instance{}, - processor: processor, + grpcClientConfig: grpcCfg, + maxConcurrentRequests: maxConcReq, + log: log, + managers: map[string]*processorManager{}, + instances: map[string]servicediscovery.Instance{}, + processor: processor, + invalidClusterValidation: invalidClusterValidation, } // There's no service discovery in some tests. @@ -389,7 +399,11 @@ func (w *querierWorker) getDesiredConcurrency() map[string]int { func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) { // Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics. - opts, err := w.grpcClientConfig.DialOption(nil, nil) + var unary []grpc.UnaryClientInterceptor + if w.grpcClientConfig.ClusterValidationLabel != "" { + unary = []grpc.UnaryClientInterceptor{middleware.ClusterUnaryClientInterceptor(w.grpcClientConfig.ClusterValidationLabel, w.invalidClusterValidation, w.log)} + } + opts, err := w.grpcClientConfig.DialOption(unary, nil) if err != nil { return nil, err diff --git a/pkg/querier/worker/worker_test.go b/pkg/querier/worker/worker_test.go index 7a6c937a99d..8c31c330620 100644 --- a/pkg/querier/worker/worker_test.go +++ b/pkg/querier/worker/worker_test.go @@ -174,7 +174,7 @@ func TestResetConcurrency(t *testing.T) { MaxConcurrentRequests: tt.maxConcurrent, } - w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil) + w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, nil) require.NoError(t, err) require.NoError(t, services.StartAndAwaitRunning(context.Background(), w)) @@ -261,7 +261,7 @@ func TestQuerierWorker_getDesiredConcurrency(t *testing.T) { MaxConcurrentRequests: testData.maxConcurrent, } - w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil) + w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, nil) require.NoError(t, err) for _, instance := range testData.instances {