Skip to content

Commit

Permalink
Allow cluster validation in query-frontend and query-scheduler client…
Browse files Browse the repository at this point in the history
…s of querier worked

Signed-off-by: Yuri Nikolic <[email protected]>
  • Loading branch information
duricanikolic committed Mar 4, 2025
1 parent 75e98b6 commit eb90433
Show file tree
Hide file tree
Showing 8 changed files with 76 additions and 45 deletions.
4 changes: 2 additions & 2 deletions go.sum
Original file line number Diff line number Diff line change
Expand Up @@ -1272,8 +1272,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T
github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4=
github.com/grafana/alerting v0.0.0-20250303095629-6fd0c494dfa3 h1:otX6Lww40e6jEsc4AQBdYmgDsaryAZkBTdBvWUsnDxk=
github.com/grafana/alerting v0.0.0-20250303095629-6fd0c494dfa3/go.mod h1:p5P86ajOwL2XPJN6+xu6QJUtO4DptdFiqJ35r3Ac4HY=
github.com/grafana/dskit v0.0.0-20250303172748-fd4441b85237 h1:VZagYtPcmjgazfPAuWN7lER6mprG20r51+1eYPpATkw=
github.com/grafana/dskit v0.0.0-20250303172748-fd4441b85237/go.mod h1:cu2zIOHhAgRaIDuECsERftSp1l7KHq1aX1jgihQCu0c=
github.com/grafana/dskit v0.0.0-20250303214858-d23654211757 h1:nAd6h3RfteaAMeTO4cJLcPQGm1X7uYxv5oAhZICkBNw=
github.com/grafana/dskit v0.0.0-20250303214858-d23654211757/go.mod h1:cu2zIOHhAgRaIDuECsERftSp1l7KHq1aX1jgihQCu0c=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM=
github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI=
github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8=
Expand Down
4 changes: 3 additions & 1 deletion pkg/mimir/mimir.go
Original file line number Diff line number Diff line change
Expand Up @@ -224,7 +224,9 @@ func (c *Config) CommonConfigInheritance() CommonConfigInheritance {
"alertmanager_storage": &c.AlertmanagerStorage.StorageBackendConfig,
},
GRPCClient: map[string]*util.GRPCClientConfig{
"ingester_client": &c.IngesterClient.GRPCClientConfig,
"ingester_client": &c.IngesterClient.GRPCClientConfig,
"frontend_worker_frontend_client": &c.Worker.QueryFrontendGRPCClientConfig,
"frontend_worker_scheduler_client": &c.Worker.QuerySchedulerGRPCClientConfig,
},
}
}
Expand Down
33 changes: 20 additions & 13 deletions pkg/mimir/mimir_config_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -11,7 +11,6 @@ import (
"gopkg.in/yaml.v3"

ingester_client "github.com/grafana/mimir/pkg/ingester/client"

"github.com/grafana/mimir/pkg/mimir"
"github.com/grafana/mimir/pkg/storage/bucket"
"github.com/grafana/mimir/pkg/util"
Expand All @@ -34,13 +33,17 @@ func TestCommonConfigCanBeExtended(t *testing.T) {

// Values should be properly inherited.
require.Equal(t, "s3", cfg.CustomStorage.Backend)
require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel)

// Mimir's inheritance should still work.
require.Equal(t, "s3", cfg.MimirConfig.BlocksStorage.Bucket.Backend)
require.Equal(t, 1000000, cfg.MimirConfig.IngesterClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.IngesterClient.GRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidationLabel)
})

t.Run("yaml inheritance", func(t *testing.T) {
Expand All @@ -62,26 +65,30 @@ common:

// Values should be properly inherited.
require.Equal(t, "s3", cfg.CustomStorage.Backend)
require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel)

// Mimir's inheritance should still work.
require.Equal(t, "s3", cfg.MimirConfig.BlocksStorage.Bucket.Backend)
require.Equal(t, 1000000, cfg.MimirConfig.IngesterClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.IngesterClient.GRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.Worker.QueryFrontendGRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.MimirConfig.Worker.QuerySchedulerGRPCClientConfig.ClusterValidationLabel)
})
}

type customExtendedConfig struct {
MimirConfig mimir.Config `yaml:",inline"`
CustomStorage bucket.Config `yaml:"custom_storage"`
CustomIngesterClient ingester_client.Config `yaml:"custom_ingester_client"`
MimirConfig mimir.Config `yaml:",inline"`
CustomStorage bucket.Config `yaml:"custom_storage"`
CustomGRPCClient ingester_client.Config `yaml:"custom_grpc_client"`
}

func (c *customExtendedConfig) RegisterFlags(f *flag.FlagSet, logger log.Logger) {
c.MimirConfig.RegisterFlags(f, logger)
c.CustomStorage.RegisterFlagsWithPrefix("custom-storage", f)
c.CustomIngesterClient.RegisterFlagsWithPrefix("custom-ingester-client", f)
c.CustomGRPCClient.RegisterFlagsWithPrefix("custom-ingester-client", f)
}

func (c *customExtendedConfig) CommonConfigInheritance() mimir.CommonConfigInheritance {
Expand All @@ -90,7 +97,7 @@ func (c *customExtendedConfig) CommonConfigInheritance() mimir.CommonConfigInher
"custom": &c.CustomStorage.StorageBackendConfig,
},
GRPCClient: map[string]*util.GRPCClientConfig{
"custom_grpc_client": &c.CustomIngesterClient.GRPCClientConfig,
"custom_grpc_client": &c.CustomGRPCClient.GRPCClientConfig,
},
}
}
Expand All @@ -108,7 +115,7 @@ func TestMimirConfigCanBeInlined(t *testing.T) {
const commonYAMLConfig = `
custom_storage:
backend: s3
custom_ingester_client:
custom_grpc_client:
grpc_client_config:
max_recv_msg_size: 1000000
cluster_validation_label: cluster
Expand All @@ -123,6 +130,6 @@ custom_ingester_client:

// Value should be properly set.
require.Equal(t, "s3", cfg.CustomStorage.Backend)
require.Equal(t, 1000000, cfg.CustomIngesterClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomIngesterClient.GRPCClientConfig.ClusterValidationLabel)
require.Equal(t, 1000000, cfg.CustomGRPCClient.GRPCClientConfig.MaxRecvMsgSize)
require.Equal(t, "cluster", cfg.CustomGRPCClient.GRPCClientConfig.ClusterValidationLabel)
}
9 changes: 8 additions & 1 deletion pkg/querier/worker/frontend_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -29,6 +29,7 @@ import (

"github.com/grafana/mimir/pkg/frontend/v1/frontendv1pb"
"github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/util"
)

func TestFrontendProcessor_processQueriesOnSingleStream(t *testing.T) {
Expand Down Expand Up @@ -231,7 +232,13 @@ func prepareFrontendProcessor() (*frontendProcessor, *frontendProcessClientMock,

requestHandler := &requestHandlerMock{}

fp := newFrontendProcessor(Config{QuerierID: "test-querier-id", QueryFrontendGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 1}}, requestHandler, log.NewNopLogger())
fp := newFrontendProcessor(
Config{
QuerierID: "test-querier-id",
QueryFrontendGRPCClientConfig: util.GRPCClientConfig{Config: grpcclient.Config{MaxSendMsgSize: 1}},
},
requestHandler, log.NewNopLogger(),
)
fp.frontendClientFactory = func(_ *grpc.ClientConn) frontendv1pb.FrontendClient {
return frontendClient
}
Expand Down
2 changes: 1 addition & 1 deletion pkg/querier/worker/scheduler_processor.go
Original file line number Diff line number Diff line change
Expand Up @@ -57,7 +57,7 @@ func newSchedulerProcessor(cfg Config, handler RequestHandler, log log.Logger, r
streamResponse: streamResponse,
maxMessageSize: cfg.QueryFrontendGRPCClientConfig.MaxSendMsgSize,
querierID: cfg.QuerierID,
grpcConfig: cfg.QueryFrontendGRPCClientConfig,
grpcConfig: cfg.QueryFrontendGRPCClientConfig.Config,
streamingEnabled: cfg.ResponseStreamingEnabled,

schedulerClientFactory: func(conn *grpc.ClientConn) schedulerpb.SchedulerForQuerierClient {
Expand Down
7 changes: 4 additions & 3 deletions pkg/querier/worker/scheduler_processor_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,7 @@ import (
"github.com/grafana/mimir/pkg/frontend/v2/frontendv2pb"
querier_stats "github.com/grafana/mimir/pkg/querier/stats"
"github.com/grafana/mimir/pkg/scheduler/schedulerpb"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/test"
)

Expand Down Expand Up @@ -329,20 +330,20 @@ func TestSchedulerProcessor_QueryTime(t *testing.T) {
}

func TestCreateSchedulerProcessor(t *testing.T) {
conf := grpcclient.Config{}
conf := util.GRPCClientConfig{Config: grpcclient.Config{}}
flagext.DefaultValues(&conf)
conf.MaxSendMsgSize = 1 * 1024 * 1024

sp, _ := newSchedulerProcessor(Config{
SchedulerAddress: "sched:12345",
QuerierID: "test",
QueryFrontendGRPCClientConfig: conf,
QuerySchedulerGRPCClientConfig: grpcclient.Config{MaxSendMsgSize: 5 * 1024}, // schedulerProcessor should ignore this.
QuerySchedulerGRPCClientConfig: util.GRPCClientConfig{Config: grpcclient.Config{MaxSendMsgSize: 5 * 1024}}, // schedulerProcessor should ignore this.
MaxConcurrentRequests: 5,
}, nil, nil, nil)

assert.Equal(t, 1*1024*1024, sp.maxMessageSize)
assert.Equal(t, conf, sp.grpcConfig)
assert.Equal(t, conf.Config, sp.grpcConfig)
}

func TestSchedulerProcessor_ResponseStream(t *testing.T) {
Expand Down
58 changes: 36 additions & 22 deletions pkg/querier/worker/worker.go
Original file line number Diff line number Diff line change
Expand Up @@ -15,26 +15,27 @@ import (

"github.com/go-kit/log"
"github.com/go-kit/log/level"
"github.com/grafana/dskit/grpcclient"
"github.com/grafana/dskit/httpgrpc"
"github.com/grafana/dskit/middleware"
"github.com/grafana/dskit/servicediscovery"
"github.com/grafana/dskit/services"
"github.com/pkg/errors"
"github.com/prometheus/client_golang/prometheus"
"google.golang.org/grpc"

"github.com/grafana/mimir/pkg/scheduler/schedulerdiscovery"
"github.com/grafana/mimir/pkg/util"
"github.com/grafana/mimir/pkg/util/grpcencoding/s2"
)

type Config struct {
FrontendAddress string `yaml:"frontend_address"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration" category:"advanced"`
QuerierID string `yaml:"id" category:"advanced"`
QueryFrontendGRPCClientConfig grpcclient.Config `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-frontend."`
QuerySchedulerGRPCClientConfig grpcclient.Config `yaml:"query_scheduler_grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-scheduler."`
ResponseStreamingEnabled bool `yaml:"response_streaming_enabled" category:"experimental"`
FrontendAddress string `yaml:"frontend_address"`
SchedulerAddress string `yaml:"scheduler_address"`
DNSLookupPeriod time.Duration `yaml:"dns_lookup_duration" category:"advanced"`
QuerierID string `yaml:"id" category:"advanced"`
QueryFrontendGRPCClientConfig util.GRPCClientConfig `yaml:"grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-frontend."`
QuerySchedulerGRPCClientConfig util.GRPCClientConfig `yaml:"query_scheduler_grpc_client_config" doc:"description=Configures the gRPC client used to communicate between the querier and the query-scheduler."`
ResponseStreamingEnabled bool `yaml:"response_streaming_enabled" category:"experimental"`

// This configuration is injected internally.
MaxConcurrentRequests int `yaml:"-"` // Must be same as passed to PromQL Engine.
Expand Down Expand Up @@ -105,7 +106,7 @@ type querierWorker struct {
*services.BasicService

maxConcurrentRequests int
grpcClientConfig grpcclient.Config
grpcClientConfig util.GRPCClientConfig
log log.Logger

processor processor
Expand All @@ -117,6 +118,8 @@ type querierWorker struct {
mu sync.Mutex
managers map[string]*processorManager
instances map[string]servicediscovery.Instance

invalidClusterValidation *prometheus.CounterVec
}

func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg prometheus.Registerer) (services.Service, error) {
Expand All @@ -128,10 +131,13 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
cfg.QuerierID = hostname
}

var processor processor
var grpcCfg grpcclient.Config
var servs []services.Service
var factory serviceDiscoveryFactory
var (
processor processor
grpcCfg util.GRPCClientConfig
workerClient string
servs []services.Service
factory serviceDiscoveryFactory
)

switch {
case cfg.SchedulerAddress != "" || cfg.QuerySchedulerDiscovery.Mode == schedulerdiscovery.ModeRing:
Expand All @@ -142,6 +148,7 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
}

grpcCfg = cfg.QuerySchedulerGRPCClientConfig
workerClient = "query-scheduler-worker"
processor, servs = newSchedulerProcessor(cfg, handler, log, reg)

case cfg.FrontendAddress != "":
Expand All @@ -152,23 +159,26 @@ func NewQuerierWorker(cfg Config, handler RequestHandler, log log.Logger, reg pr
}

grpcCfg = cfg.QueryFrontendGRPCClientConfig
workerClient = "query-frontend-worker"
processor = newFrontendProcessor(cfg, handler, log)

default:
return nil, errors.New("no query-scheduler or query-frontend address")
}

return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs)
invalidClusterValidation := util.NewRequestInvalidClusterValidationLabelsTotalCounter(reg, workerClient, util.GRPCProtocol)
return newQuerierWorkerWithProcessor(grpcCfg, cfg.MaxConcurrentRequests, log, processor, factory, servs, invalidClusterValidation)
}

func newQuerierWorkerWithProcessor(grpcCfg grpcclient.Config, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service) (*querierWorker, error) {
func newQuerierWorkerWithProcessor(grpcCfg util.GRPCClientConfig, maxConcReq int, log log.Logger, processor processor, newServiceDiscovery serviceDiscoveryFactory, servs []services.Service, invalidClusterValidation *prometheus.CounterVec) (*querierWorker, error) {
f := &querierWorker{
grpcClientConfig: grpcCfg,
maxConcurrentRequests: maxConcReq,
log: log,
managers: map[string]*processorManager{},
instances: map[string]servicediscovery.Instance{},
processor: processor,
grpcClientConfig: grpcCfg,
maxConcurrentRequests: maxConcReq,
log: log,
managers: map[string]*processorManager{},
instances: map[string]servicediscovery.Instance{},
processor: processor,
invalidClusterValidation: invalidClusterValidation,
}

// There's no service discovery in some tests.
Expand Down Expand Up @@ -389,7 +399,11 @@ func (w *querierWorker) getDesiredConcurrency() map[string]int {

func (w *querierWorker) connect(ctx context.Context, address string) (*grpc.ClientConn, error) {
// Because we only use single long-running method, it doesn't make sense to inject user ID, send over tracing or add metrics.
opts, err := w.grpcClientConfig.DialOption(nil, nil)
var unary []grpc.UnaryClientInterceptor
if w.grpcClientConfig.ClusterValidationLabel != "" {
unary = []grpc.UnaryClientInterceptor{middleware.ClusterUnaryClientInterceptor(w.grpcClientConfig.ClusterValidationLabel, w.invalidClusterValidation, w.log)}
}
opts, err := w.grpcClientConfig.DialOption(unary, nil)

if err != nil {
return nil, err
Expand Down
4 changes: 2 additions & 2 deletions pkg/querier/worker/worker_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -174,7 +174,7 @@ func TestResetConcurrency(t *testing.T) {
MaxConcurrentRequests: tt.maxConcurrent,
}

w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil)
w, err := newQuerierWorkerWithProcessor(cfg.QuerySchedulerGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, nil)
require.NoError(t, err)
require.NoError(t, services.StartAndAwaitRunning(context.Background(), w))

Expand Down Expand Up @@ -261,7 +261,7 @@ func TestQuerierWorker_getDesiredConcurrency(t *testing.T) {
MaxConcurrentRequests: testData.maxConcurrent,
}

w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil)
w, err := newQuerierWorkerWithProcessor(cfg.QueryFrontendGRPCClientConfig, cfg.MaxConcurrentRequests, log.NewNopLogger(), &mockProcessor{}, nil, nil, nil)
require.NoError(t, err)

for _, instance := range testData.instances {
Expand Down

0 comments on commit eb90433

Please sign in to comment.