From 23e0da4d315c97308ab5e8f698ad3f6b99aa1a37 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Thu, 27 Feb 2025 17:37:44 -0600 Subject: [PATCH 01/10] get tidy vendor dskit --- go.mod | 2 +- go.sum | 4 +- .../clusterutil/cluster_validation_config.go | 47 ++ .../grafana/dskit/clusterutil/clusterutil.go | 44 ++ .../github.com/grafana/dskit/flagext/map.go | 108 +++++ .../dskit/grpcutil/error_details.pb.go | 422 ++++++++++++++++++ .../dskit/grpcutil/error_details.proto | 14 + .../grafana/dskit/grpcutil/status.go | 17 + .../grafana/dskit/middleware/grpc_cluster.go | 125 ++++-- .../github.com/grafana/dskit/server/server.go | 11 + vendor/modules.txt | 3 +- 11 files changed, 765 insertions(+), 32 deletions(-) create mode 100644 vendor/github.com/grafana/dskit/clusterutil/cluster_validation_config.go create mode 100644 vendor/github.com/grafana/dskit/clusterutil/clusterutil.go create mode 100644 vendor/github.com/grafana/dskit/flagext/map.go create mode 100644 vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go create mode 100644 vendor/github.com/grafana/dskit/grpcutil/error_details.proto diff --git a/go.mod b/go.mod index 5ad8b2bccd7..b27bec2d521 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250213152722-e83d24ebed15 + github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 diff --git a/go.sum b/go.sum index 2c08edfa1f7..bab9d9566e1 100644 --- a/go.sum +++ b/go.sum @@ -1272,8 +1272,8 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250225150117-15e285d78df2 h1:kESrzm0FcRVLmGIQCgl1MCwDGLH4sLzWphr7mcFdbfI= github.com/grafana/alerting v0.0.0-20250225150117-15e285d78df2/go.mod h1:hdGB3dSl8Ma9Rjo2YiAEAjMkZ5HiNJbNDqRKDefRZrM= -github.com/grafana/dskit v0.0.0-20250213152722-e83d24ebed15 h1:5Mdedurc8b1Cc3RW9qRHNtxekGN1PEOYTymq18A0Ddc= -github.com/grafana/dskit v0.0.0-20250213152722-e83d24ebed15/go.mod h1:SPLNCARd4xdjCkue0O6hvuoveuS1dGJjDnfxYe405YQ= +github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed h1:fyMK3XSnKgheQVXuL84QM/IWu+bruBC8Wez5en7S+YI= +github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed/go.mod h1:TOlLjBa9TGnuo3Ow7X0zfFFSOye7FFhsN2wtaGXknJI= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= diff --git a/vendor/github.com/grafana/dskit/clusterutil/cluster_validation_config.go b/vendor/github.com/grafana/dskit/clusterutil/cluster_validation_config.go new file mode 100644 index 00000000000..5f0ba687102 --- /dev/null +++ b/vendor/github.com/grafana/dskit/clusterutil/cluster_validation_config.go @@ -0,0 +1,47 @@ +package clusterutil + +import ( + "flag" + "fmt" +) + +type ClusterValidationConfig struct { + Label string + GRPC ClusterValidationProtocolConfig +} + +type ClusterValidationProtocolConfig struct { + Enabled bool + SoftValidation bool +} + +func (cfg *ClusterValidationConfig) Validate() error { + return cfg.GRPC.Validate("grpc", cfg.Label) +} + +func (cfg *ClusterValidationConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + clusterValidationPrefix := prefix + ".cluster-validation" + f.StringVar(&cfg.Label, clusterValidationPrefix+".label", "", "Optionally define server's cluster validation label.") + cfg.GRPC.RegisterFlagsWithPrefix(clusterValidationPrefix+".grpc", f) +} + +func (cfg *ClusterValidationProtocolConfig) Validate(prefix string, label string) error { + if label == "" { + if cfg.Enabled || cfg.SoftValidation { + return fmt.Errorf("%s: validation cannot be enabled if cluster validation label is not configured", prefix) + } + return nil + } + + if !cfg.Enabled && cfg.SoftValidation { + return fmt.Errorf("%s: soft validation can be enabled only if cluster validation is enabled", prefix) + } + return nil +} + +func (cfg *ClusterValidationProtocolConfig) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { + softValidationFlag := prefix + ".soft-validation" + enabledFlag := prefix + ".enabled" + f.BoolVar(&cfg.SoftValidation, softValidationFlag, false, fmt.Sprintf("When enabled, soft cluster label validation will be executed. Can be enabled only together with %s", enabledFlag)) + f.BoolVar(&cfg.Enabled, enabledFlag, false, "When enabled, cluster label validation will be executed.") +} diff --git a/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go new file mode 100644 index 00000000000..e6614ba04f7 --- /dev/null +++ b/vendor/github.com/grafana/dskit/clusterutil/clusterutil.go @@ -0,0 +1,44 @@ +package clusterutil + +import ( + "context" + "fmt" + + "google.golang.org/grpc/metadata" +) + +const ( + // MetadataClusterVerificationLabelKey is the key of the cluster verification label gRPC metadata. + MetadataClusterVerificationLabelKey = "x-cluster" +) + +var ( + ErrNoClusterVerificationLabel = fmt.Errorf("no cluster verification label in context") + errDifferentClusterVerificationLabels = func(clusterIDs []string) error { + return fmt.Errorf("gRPC metadata should contain exactly 1 value for key %q, but it contains %v", MetadataClusterVerificationLabelKey, clusterIDs) + } +) + +// PutClusterIntoOutgoingContext returns a new context with the provided value for +// MetadataClusterVerificationLabelKey, merged with any existing metadata in the context. +// Empty values are ignored. +func PutClusterIntoOutgoingContext(ctx context.Context, cluster string) context.Context { + if cluster == "" { + return ctx + } + return metadata.AppendToOutgoingContext(ctx, MetadataClusterVerificationLabelKey, cluster) +} + +// GetClusterFromIncomingContext returns a single metadata value corresponding to the +// MetadataClusterVerificationLabelKey key from the incoming context, if it exists. +// In all other cases an error is returned. +func GetClusterFromIncomingContext(ctx context.Context) (string, error) { + clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterVerificationLabelKey) + if len(clusterIDs) > 1 { + return "", errDifferentClusterVerificationLabels(clusterIDs) + } + if len(clusterIDs) == 0 || clusterIDs[0] == "" { + return "", ErrNoClusterVerificationLabel + } + return clusterIDs[0], nil +} diff --git a/vendor/github.com/grafana/dskit/flagext/map.go b/vendor/github.com/grafana/dskit/flagext/map.go new file mode 100644 index 00000000000..78f95fb4d3d --- /dev/null +++ b/vendor/github.com/grafana/dskit/flagext/map.go @@ -0,0 +1,108 @@ +package flagext + +import ( + "encoding/json" + "fmt" + + "gopkg.in/yaml.v3" +) + +// LimitsMap is a flag.Value implementation that looks like a generic map, holding float64s, ints, or strings as values. +type LimitsMap[T float64 | int | string] struct { + data map[string]T + validator func(k string, v T) error +} + +func NewLimitsMap[T float64 | int | string](validator func(k string, v T) error) LimitsMap[T] { + return NewLimitsMapWithData(make(map[string]T), validator) +} + +func NewLimitsMapWithData[T float64 | int | string](data map[string]T, validator func(k string, v T) error) LimitsMap[T] { + return LimitsMap[T]{ + data: data, + validator: validator, + } +} + +// IsInitialized returns true if the map is initialized. +func (m LimitsMap[T]) IsInitialized() bool { + return m.data != nil +} + +// String implements flag.Value +func (m LimitsMap[T]) String() string { + out, err := json.Marshal(m.data) + if err != nil { + return fmt.Sprintf("failed to marshal: %v", err) + } + return string(out) +} + +// Set implements flag.Value +func (m LimitsMap[T]) Set(s string) error { + newMap := make(map[string]T) + if err := json.Unmarshal([]byte(s), &newMap); err != nil { + return err + } + return m.updateMap(newMap) +} + +func (m LimitsMap[T]) Read() map[string]T { + return m.data +} + +// Clone returns a copy of the LimitsMap. +func (m LimitsMap[T]) Clone() LimitsMap[T] { + newMap := make(map[string]T, len(m.data)) + for k, v := range m.data { + newMap[k] = v + } + return LimitsMap[T]{data: newMap, validator: m.validator} +} + +// UnmarshalYAML implements yaml.Unmarshaler. +func (m LimitsMap[T]) UnmarshalYAML(value *yaml.Node) error { + newMap := make(map[string]T) + if err := value.Decode(newMap); err != nil { + return err + } + return m.updateMap(newMap) +} + +func (m LimitsMap[T]) updateMap(newMap map[string]T) error { + // Validate first, as we don't want to allow partial updates. + if m.validator != nil { + for k, v := range newMap { + if err := m.validator(k, v); err != nil { + return err + } + } + } + + clear(m.data) + for k, v := range newMap { + m.data[k] = v + } + + return nil +} + +// MarshalYAML implements yaml.Marshaler. +func (m LimitsMap[T]) MarshalYAML() (interface{}, error) { + return m.data, nil +} + +// Equal compares two LimitsMap. This is needed to allow cmp.Equal to compare two LimitsMap. +func (m LimitsMap[T]) Equal(other LimitsMap[T]) bool { + if len(m.data) != len(other.data) { + return false + } + + for k, v := range m.data { + if other.data[k] != v { + return false + } + } + + return true +} diff --git a/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go b/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go new file mode 100644 index 00000000000..c05ecfbcc1d --- /dev/null +++ b/vendor/github.com/grafana/dskit/grpcutil/error_details.pb.go @@ -0,0 +1,422 @@ +// Code generated by protoc-gen-gogo. DO NOT EDIT. +// source: error_details.proto + +package grpcutil + +import ( + fmt "fmt" + proto "github.com/gogo/protobuf/proto" + io "io" + math "math" + math_bits "math/bits" + reflect "reflect" + strconv "strconv" + strings "strings" +) + +// Reference imports to suppress errors if they are not otherwise used. +var _ = proto.Marshal +var _ = fmt.Errorf +var _ = math.Inf + +// This is a compile-time assertion to ensure that this generated file +// is compatible with the proto package it is being compiled against. +// A compilation error at this line likely means your copy of the +// proto package needs to be updated. +const _ = proto.GoGoProtoPackageIsVersion3 // please upgrade the proto package + +type ErrorCause int32 + +const ( + UNKNOWN_CAUSE ErrorCause = 0 + WRONG_CLUSTER_VERIFICATION_LABEL ErrorCause = 1 +) + +var ErrorCause_name = map[int32]string{ + 0: "UNKNOWN_CAUSE", + 1: "WRONG_CLUSTER_VERIFICATION_LABEL", +} + +var ErrorCause_value = map[string]int32{ + "UNKNOWN_CAUSE": 0, + "WRONG_CLUSTER_VERIFICATION_LABEL": 1, +} + +func (ErrorCause) EnumDescriptor() ([]byte, []int) { + return fileDescriptor_bbac13548d6353a4, []int{0} +} + +type ErrorDetails struct { + Cause ErrorCause `protobuf:"varint,1,opt,name=Cause,proto3,enum=grpcutil.ErrorCause" json:"Cause,omitempty"` +} + +func (m *ErrorDetails) Reset() { *m = ErrorDetails{} } +func (*ErrorDetails) ProtoMessage() {} +func (*ErrorDetails) Descriptor() ([]byte, []int) { + return fileDescriptor_bbac13548d6353a4, []int{0} +} +func (m *ErrorDetails) XXX_Unmarshal(b []byte) error { + return m.Unmarshal(b) +} +func (m *ErrorDetails) XXX_Marshal(b []byte, deterministic bool) ([]byte, error) { + if deterministic { + return xxx_messageInfo_ErrorDetails.Marshal(b, m, deterministic) + } else { + b = b[:cap(b)] + n, err := m.MarshalToSizedBuffer(b) + if err != nil { + return nil, err + } + return b[:n], nil + } +} +func (m *ErrorDetails) XXX_Merge(src proto.Message) { + xxx_messageInfo_ErrorDetails.Merge(m, src) +} +func (m *ErrorDetails) XXX_Size() int { + return m.Size() +} +func (m *ErrorDetails) XXX_DiscardUnknown() { + xxx_messageInfo_ErrorDetails.DiscardUnknown(m) +} + +var xxx_messageInfo_ErrorDetails proto.InternalMessageInfo + +func (m *ErrorDetails) GetCause() ErrorCause { + if m != nil { + return m.Cause + } + return UNKNOWN_CAUSE +} + +func init() { + proto.RegisterEnum("grpcutil.ErrorCause", ErrorCause_name, ErrorCause_value) + proto.RegisterType((*ErrorDetails)(nil), "grpcutil.ErrorDetails") +} + +func init() { proto.RegisterFile("error_details.proto", fileDescriptor_bbac13548d6353a4) } + +var fileDescriptor_bbac13548d6353a4 = []byte{ + // 221 bytes of a gzipped FileDescriptorProto + 0x1f, 0x8b, 0x08, 0x00, 0x00, 0x00, 0x00, 0x00, 0x02, 0xff, 0xe2, 0x12, 0x4e, 0x2d, 0x2a, 0xca, + 0x2f, 0x8a, 0x4f, 0x49, 0x2d, 0x49, 0xcc, 0xcc, 0x29, 0xd6, 0x2b, 0x28, 0xca, 0x2f, 0xc9, 0x17, + 0xe2, 0x48, 0x2f, 0x2a, 0x48, 0x2e, 0x2d, 0xc9, 0xcc, 0x51, 0xb2, 0xe2, 0xe2, 0x71, 0x05, 0x29, + 0x70, 0x81, 0xc8, 0x0b, 0x69, 0x71, 0xb1, 0x3a, 0x27, 0x96, 0x16, 0xa7, 0x4a, 0x30, 0x2a, 0x30, + 0x6a, 0xf0, 0x19, 0x89, 0xe8, 0xc1, 0x54, 0xea, 0x81, 0x95, 0x81, 0xe5, 0x82, 0x20, 0x4a, 0xb4, + 0x5c, 0xb9, 0xb8, 0x10, 0x82, 0x42, 0x82, 0x5c, 0xbc, 0xa1, 0x7e, 0xde, 0x7e, 0xfe, 0xe1, 0x7e, + 0xf1, 0xce, 0x8e, 0xa1, 0xc1, 0xae, 0x02, 0x0c, 0x42, 0x2a, 0x5c, 0x0a, 0xe1, 0x41, 0xfe, 0x7e, + 0xee, 0xf1, 0xce, 0x3e, 0xa1, 0xc1, 0x21, 0xae, 0x41, 0xf1, 0x61, 0xae, 0x41, 0x9e, 0x6e, 0x9e, + 0xce, 0x8e, 0x21, 0x9e, 0xfe, 0x7e, 0xf1, 0x3e, 0x8e, 0x4e, 0xae, 0x3e, 0x02, 0x8c, 0x4e, 0x76, + 0x17, 0x1e, 0xca, 0x31, 0xdc, 0x78, 0x28, 0xc7, 0xf0, 0xe1, 0xa1, 0x1c, 0x63, 0xc3, 0x23, 0x39, + 0xc6, 0x15, 0x8f, 0xe4, 0x18, 0x4f, 0x3c, 0x92, 0x63, 0xbc, 0xf0, 0x48, 0x8e, 0xf1, 0xc1, 0x23, + 0x39, 0xc6, 0x17, 0x8f, 0xe4, 0x18, 0x3e, 0x3c, 0x92, 0x63, 0x9c, 0xf0, 0x58, 0x8e, 0xe1, 0xc2, + 0x63, 0x39, 0x86, 0x1b, 0x8f, 0xe5, 0x18, 0xa2, 0xe0, 0x5e, 0x48, 0x62, 0x03, 0xfb, 0xc9, 0x18, + 0x10, 0x00, 0x00, 0xff, 0xff, 0x67, 0x52, 0x05, 0x4d, 0xea, 0x00, 0x00, 0x00, +} + +func (x ErrorCause) String() string { + s, ok := ErrorCause_name[int32(x)] + if ok { + return s + } + return strconv.Itoa(int(x)) +} +func (this *ErrorDetails) Equal(that interface{}) bool { + if that == nil { + return this == nil + } + + that1, ok := that.(*ErrorDetails) + if !ok { + that2, ok := that.(ErrorDetails) + if ok { + that1 = &that2 + } else { + return false + } + } + if that1 == nil { + return this == nil + } else if this == nil { + return false + } + if this.Cause != that1.Cause { + return false + } + return true +} +func (this *ErrorDetails) GoString() string { + if this == nil { + return "nil" + } + s := make([]string, 0, 5) + s = append(s, "&grpcutil.ErrorDetails{") + s = append(s, "Cause: "+fmt.Sprintf("%#v", this.Cause)+",\n") + s = append(s, "}") + return strings.Join(s, "") +} +func valueToGoStringErrorDetails(v interface{}, typ string) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("func(v %v) *%v { return &v } ( %#v )", typ, typ, pv) +} +func (m *ErrorDetails) Marshal() (dAtA []byte, err error) { + size := m.Size() + dAtA = make([]byte, size) + n, err := m.MarshalToSizedBuffer(dAtA[:size]) + if err != nil { + return nil, err + } + return dAtA[:n], nil +} + +func (m *ErrorDetails) MarshalTo(dAtA []byte) (int, error) { + size := m.Size() + return m.MarshalToSizedBuffer(dAtA[:size]) +} + +func (m *ErrorDetails) MarshalToSizedBuffer(dAtA []byte) (int, error) { + i := len(dAtA) + _ = i + var l int + _ = l + if m.Cause != 0 { + i = encodeVarintErrorDetails(dAtA, i, uint64(m.Cause)) + i-- + dAtA[i] = 0x8 + } + return len(dAtA) - i, nil +} + +func encodeVarintErrorDetails(dAtA []byte, offset int, v uint64) int { + offset -= sovErrorDetails(v) + base := offset + for v >= 1<<7 { + dAtA[offset] = uint8(v&0x7f | 0x80) + v >>= 7 + offset++ + } + dAtA[offset] = uint8(v) + return base +} +func (m *ErrorDetails) Size() (n int) { + if m == nil { + return 0 + } + var l int + _ = l + if m.Cause != 0 { + n += 1 + sovErrorDetails(uint64(m.Cause)) + } + return n +} + +func sovErrorDetails(x uint64) (n int) { + return (math_bits.Len64(x|1) + 6) / 7 +} +func sozErrorDetails(x uint64) (n int) { + return sovErrorDetails(uint64((x << 1) ^ uint64((int64(x) >> 63)))) +} +func (this *ErrorDetails) String() string { + if this == nil { + return "nil" + } + s := strings.Join([]string{`&ErrorDetails{`, + `Cause:` + fmt.Sprintf("%v", this.Cause) + `,`, + `}`, + }, "") + return s +} +func valueToStringErrorDetails(v interface{}) string { + rv := reflect.ValueOf(v) + if rv.IsNil() { + return "nil" + } + pv := reflect.Indirect(rv).Interface() + return fmt.Sprintf("*%v", pv) +} +func (m *ErrorDetails) Unmarshal(dAtA []byte) error { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + preIndex := iNdEx + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= uint64(b&0x7F) << shift + if b < 0x80 { + break + } + } + fieldNum := int32(wire >> 3) + wireType := int(wire & 0x7) + if wireType == 4 { + return fmt.Errorf("proto: ErrorDetails: wiretype end group for non-group") + } + if fieldNum <= 0 { + return fmt.Errorf("proto: ErrorDetails: illegal tag %d (wire type %d)", fieldNum, wire) + } + switch fieldNum { + case 1: + if wireType != 0 { + return fmt.Errorf("proto: wrong wireType = %d for field Cause", wireType) + } + m.Cause = 0 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + m.Cause |= ErrorCause(b&0x7F) << shift + if b < 0x80 { + break + } + } + default: + iNdEx = preIndex + skippy, err := skipErrorDetails(dAtA[iNdEx:]) + if err != nil { + return err + } + if skippy < 0 { + return ErrInvalidLengthErrorDetails + } + if (iNdEx + skippy) < 0 { + return ErrInvalidLengthErrorDetails + } + if (iNdEx + skippy) > l { + return io.ErrUnexpectedEOF + } + iNdEx += skippy + } + } + + if iNdEx > l { + return io.ErrUnexpectedEOF + } + return nil +} +func skipErrorDetails(dAtA []byte) (n int, err error) { + l := len(dAtA) + iNdEx := 0 + for iNdEx < l { + var wire uint64 + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + wire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + wireType := int(wire & 0x7) + switch wireType { + case 0: + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + iNdEx++ + if dAtA[iNdEx-1] < 0x80 { + break + } + } + return iNdEx, nil + case 1: + iNdEx += 8 + return iNdEx, nil + case 2: + var length int + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + length |= (int(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + if length < 0 { + return 0, ErrInvalidLengthErrorDetails + } + iNdEx += length + if iNdEx < 0 { + return 0, ErrInvalidLengthErrorDetails + } + return iNdEx, nil + case 3: + for { + var innerWire uint64 + var start int = iNdEx + for shift := uint(0); ; shift += 7 { + if shift >= 64 { + return 0, ErrIntOverflowErrorDetails + } + if iNdEx >= l { + return 0, io.ErrUnexpectedEOF + } + b := dAtA[iNdEx] + iNdEx++ + innerWire |= (uint64(b) & 0x7F) << shift + if b < 0x80 { + break + } + } + innerWireType := int(innerWire & 0x7) + if innerWireType == 4 { + break + } + next, err := skipErrorDetails(dAtA[start:]) + if err != nil { + return 0, err + } + iNdEx = start + next + if iNdEx < 0 { + return 0, ErrInvalidLengthErrorDetails + } + } + return iNdEx, nil + case 4: + return iNdEx, nil + case 5: + iNdEx += 4 + return iNdEx, nil + default: + return 0, fmt.Errorf("proto: illegal wireType %d", wireType) + } + } + panic("unreachable") +} + +var ( + ErrInvalidLengthErrorDetails = fmt.Errorf("proto: negative length found during unmarshaling") + ErrIntOverflowErrorDetails = fmt.Errorf("proto: integer overflow") +) diff --git a/vendor/github.com/grafana/dskit/grpcutil/error_details.proto b/vendor/github.com/grafana/dskit/grpcutil/error_details.proto new file mode 100644 index 00000000000..8e3d66e4d5b --- /dev/null +++ b/vendor/github.com/grafana/dskit/grpcutil/error_details.proto @@ -0,0 +1,14 @@ +syntax = "proto3"; + +package grpcutil; + +option go_package = "grpcutil"; + +enum ErrorCause { + UNKNOWN_CAUSE = 0; + WRONG_CLUSTER_VERIFICATION_LABEL = 1; +} + +message ErrorDetails { + ErrorCause Cause = 1; +} diff --git a/vendor/github.com/grafana/dskit/grpcutil/status.go b/vendor/github.com/grafana/dskit/grpcutil/status.go index a9e9aab249a..065099e4297 100644 --- a/vendor/github.com/grafana/dskit/grpcutil/status.go +++ b/vendor/github.com/grafana/dskit/grpcutil/status.go @@ -3,7 +3,9 @@ package grpcutil import ( "context" "errors" + "fmt" + "github.com/gogo/protobuf/proto" "github.com/gogo/status" "google.golang.org/grpc/codes" grpcstatus "google.golang.org/grpc/status" @@ -60,6 +62,21 @@ func ErrorToStatusCode(err error) codes.Code { return codes.Unknown } +// Status creates a new a *github.com/gogo/status.Status with the +// given error code, error message and error details. +func Status(errCode codes.Code, errMessage string, details ...proto.Message) *status.Status { + stat := status.New(errCode, errMessage) + if len(details) > 0 { + statWithDetails, err := stat.WithDetails(details...) + if err == nil { + return statWithDetails + } + statusErr := fmt.Errorf("error while creating details for a Status with code %s and error message %q: %w", errCode, errMessage, err) + return status.New(codes.InvalidArgument, statusErr.Error()) + } + return stat +} + // IsCanceled checks whether an error comes from an operation being canceled. func IsCanceled(err error) bool { if errors.Is(err, context.Canceled) { diff --git a/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go index a705e75501f..c046b347e35 100644 --- a/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go +++ b/vendor/github.com/grafana/dskit/middleware/grpc_cluster.go @@ -6,48 +6,117 @@ import ( "github.com/go-kit/log" "github.com/go-kit/log/level" - "github.com/gogo/status" + + "github.com/grafana/dskit/clusterutil" + "github.com/grafana/dskit/grpcutil" + + "github.com/pkg/errors" + "github.com/prometheus/client_golang/prometheus" "google.golang.org/grpc" "google.golang.org/grpc/codes" - "google.golang.org/grpc/metadata" -) - -const ( - MetadataClusterKey = "x-cluster" + healthpb "google.golang.org/grpc/health/grpc_health_v1" ) -// ClusterUnaryClientInterceptor propagates the given cluster info to gRPC metadata. -func ClusterUnaryClientInterceptor(cluster string) grpc.UnaryClientInterceptor { +// ClusterUnaryClientInterceptor propagates the given cluster label to gRPC metadata, before calling the next invoker. +// If an empty cluster label, nil invalidCounter or nil logger are provided, ClusterUnaryClientInterceptor panics. +// In case of an error related to the cluster label validation, the error is returned, and invalidCounter is incremented. +func ClusterUnaryClientInterceptor(cluster string, invalidCluster *prometheus.CounterVec, logger log.Logger) grpc.UnaryClientInterceptor { + validateClusterClientInterceptorInputParameters(cluster, invalidCluster, logger) return func(ctx context.Context, method string, req, reply interface{}, cc *grpc.ClientConn, invoker grpc.UnaryInvoker, opts ...grpc.CallOption) error { - if cluster != "" { - ctx = metadata.AppendToOutgoingContext(ctx, MetadataClusterKey, cluster) - } + ctx = clusterutil.PutClusterIntoOutgoingContext(ctx, cluster) + return handleClusterValidationError(invoker(ctx, method, req, reply, cc, opts...), cluster, method, invalidCluster, logger) + } +} - return invoker(ctx, method, req, reply, cc, opts...) +func validateClusterClientInterceptorInputParameters(cluster string, invalidCluster *prometheus.CounterVec, logger log.Logger) { + if cluster == "" { + panic("no cluster label provided") + } + if invalidCluster == nil { + panic("no invalid cluster counter provided") + } + if logger == nil { + panic("no logger provided") + } +} + +func handleClusterValidationError(err error, cluster string, method string, invalidCluster *prometheus.CounterVec, logger log.Logger) error { + if err == nil { + return nil + } + if stat, ok := grpcutil.ErrorToStatus(err); ok { + details := stat.Details() + if len(details) == 1 { + if errDetails, ok := details[0].(*grpcutil.ErrorDetails); ok { + if errDetails.GetCause() == grpcutil.WRONG_CLUSTER_VERIFICATION_LABEL { + msg := fmt.Sprintf("request rejected by the server: %s", stat.Message()) + level.Warn(logger).Log("msg", msg, "method", method, "clusterVerificationLabel", cluster) + invalidCluster.WithLabelValues(method).Inc() + return grpcutil.Status(codes.Internal, msg).Err() + } + } + } } + return err } -// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contains any cluster information and if so, -// checks if the latter corresponds to the given cluster. If it is the case, the request is further propagated. +// ClusterUnaryServerInterceptor checks if the incoming gRPC metadata contains any cluster label and if so, checks if +// the latter corresponds to the given cluster label. If it is the case, the request is further propagated. +// If an empty cluster label or nil logger are provided, ClusterUnaryServerInterceptor panics. +// If the softValidation parameter is true, errors related to the cluster label validation are logged, but not returned. // Otherwise, an error is returned. -func ClusterUnaryServerInterceptor(cluster string, logger log.Logger) grpc.UnaryServerInterceptor { - return func(ctx context.Context, req interface{}, _ *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { - reqCluster := getClusterFromIncomingContext(ctx, logger) - if cluster != reqCluster { - msg := fmt.Sprintf("request intended for cluster %q - this is cluster %q", reqCluster, cluster) - level.Warn(logger).Log("msg", msg) - return nil, status.Error(codes.FailedPrecondition, msg) +func ClusterUnaryServerInterceptor(cluster string, softValidation bool, logger log.Logger) grpc.UnaryServerInterceptor { + validateClusterServerInterceptorInputParameters(cluster, logger) + return func(ctx context.Context, req interface{}, info *grpc.UnaryServerInfo, handler grpc.UnaryHandler) (interface{}, error) { + // We skip the gRPC health check. + if _, ok := info.Server.(healthpb.HealthServer); ok { + return handler(ctx, req) + } + + msgs, err := checkClusterFromIncomingContext(ctx, info.FullMethod, cluster, softValidation) + if len(msgs) > 0 { + level.Warn(logger).Log(msgs...) + } + if err != nil { + stat := grpcutil.Status(codes.FailedPrecondition, err.Error(), &grpcutil.ErrorDetails{Cause: grpcutil.WRONG_CLUSTER_VERIFICATION_LABEL}) + return nil, stat.Err() } return handler(ctx, req) } } -func getClusterFromIncomingContext(ctx context.Context, logger log.Logger) string { - clusterIDs := metadata.ValueFromIncomingContext(ctx, MetadataClusterKey) - if len(clusterIDs) != 1 { - msg := fmt.Sprintf("gRPC metadata should contain exactly 1 value for key \"%s\", but the current set of values is %v. Returning an empty string.", MetadataClusterKey, clusterIDs) - level.Warn(logger).Log("msg", msg) - return "" +func validateClusterServerInterceptorInputParameters(cluster string, logger log.Logger) { + if cluster == "" { + panic("no cluster label provided") + } + if logger == nil { + panic("no logger provided") + } +} + +func checkClusterFromIncomingContext(ctx context.Context, method string, expectedCluster string, softValidationEnabled bool) ([]any, error) { + reqCluster, err := clusterutil.GetClusterFromIncomingContext(ctx) + if err == nil { + if reqCluster == expectedCluster { + return nil, nil + } + var wrongClusterErr error + if !softValidationEnabled { + wrongClusterErr = fmt.Errorf("rejected request with wrong cluster verification label %q - it should be %q", reqCluster, expectedCluster) + } + return []any{"msg", "request with wrong cluster verification label", "method", method, "clusterVerificationLabel", expectedCluster, "requestClusterVerificationLabel", reqCluster, "softValidation", softValidationEnabled}, wrongClusterErr + } + + if errors.Is(err, clusterutil.ErrNoClusterVerificationLabel) { + var emptyClusterErr error + if !softValidationEnabled { + emptyClusterErr = fmt.Errorf("rejected request with empty cluster verification label - it should be %q", expectedCluster) + } + return []any{"msg", "request with no cluster verification label", "method", method, "clusterVerificationLabel", expectedCluster, "softValidation", softValidationEnabled}, emptyClusterErr + } + var rejectedRequestErr error + if !softValidationEnabled { + rejectedRequestErr = fmt.Errorf("rejected request: %w", err) } - return clusterIDs[0] + return []any{"msg", "detected error during cluster verification label extraction", "method", method, "clusterVerificationLabel", expectedCluster, "softValidation", softValidationEnabled, "err", err}, rejectedRequestErr } diff --git a/vendor/github.com/grafana/dskit/server/server.go b/vendor/github.com/grafana/dskit/server/server.go index 4ad92379304..246eb4dac90 100644 --- a/vendor/github.com/grafana/dskit/server/server.go +++ b/vendor/github.com/grafana/dskit/server/server.go @@ -33,6 +33,7 @@ import ( "google.golang.org/grpc/credentials" "google.golang.org/grpc/keepalive" + "github.com/grafana/dskit/clusterutil" "github.com/grafana/dskit/httpgrpc" httpgrpc_server "github.com/grafana/dskit/httpgrpc/server" "github.com/grafana/dskit/log" @@ -155,6 +156,8 @@ type Config struct { GrpcMethodLimiter GrpcInflightMethodLimiter `yaml:"-"` Throughput Throughput `yaml:"-"` + + ClusterValidation clusterutil.ClusterValidationConfig `yaml:"cluster_validation"` } type Throughput struct { @@ -218,6 +221,11 @@ func (cfg *Config) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&cfg.ProxyProtocolEnabled, "server.proxy-protocol-enabled", false, "Enables PROXY protocol.") f.DurationVar(&cfg.Throughput.LatencyCutoff, "server.throughput.latency-cutoff", 0, "Requests taking over the cutoff are be observed to measure throughput. Server-Timing header is used with specified unit as the indicator, for example 'Server-Timing: unit;val=8.2'. If set to 0, the throughput is not calculated.") f.StringVar(&cfg.Throughput.Unit, "server.throughput.unit", "samples_processed", "Unit of the server throughput metric, for example 'processed_bytes' or 'samples_processed'. Observed values are gathered from the 'Server-Timing' header with the 'val' key. If set, it is appended to the request_server_throughput metric name.") + cfg.ClusterValidation.RegisterFlagsWithPrefix("server", f) +} + +func (cfg *Config) Validate() error { + return cfg.ClusterValidation.Validate() } func (cfg *Config) registererOrDefault() prometheus.Registerer { @@ -400,6 +408,9 @@ func newServer(cfg Config, metrics *Metrics) (*Server, error) { middleware.UnaryServerInstrumentInterceptor(metrics.RequestDuration, grpcInstrumentationOptions...), } grpcMiddleware = append(grpcMiddleware, cfg.GRPCMiddleware...) + if cfg.ClusterValidation.GRPC.Enabled { + grpcMiddleware = append(grpcMiddleware, middleware.ClusterUnaryServerInterceptor(cfg.ClusterValidation.Label, cfg.ClusterValidation.GRPC.SoftValidation, logger)) + } grpcStreamMiddleware := []grpc.StreamServerInterceptor{ serverLog.StreamServerInterceptor, diff --git a/vendor/modules.txt b/vendor/modules.txt index e6e00cbac00..48ca03ae23c 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -651,12 +651,13 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250213152722-e83d24ebed15 +# github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast github.com/grafana/dskit/cache github.com/grafana/dskit/cancellation +github.com/grafana/dskit/clusterutil github.com/grafana/dskit/concurrency github.com/grafana/dskit/crypto/tls github.com/grafana/dskit/dns From 3ba08465b3e73aa2c5c69c389edbbe1843004661 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Thu, 27 Feb 2025 17:46:43 -0600 Subject: [PATCH 02/10] Switch to flagext limits map --- pkg/ruler/notifier.go | 13 +- pkg/ruler/notifier_config.go | 28 ++ pkg/ruler/notifier_test.go | 6 +- pkg/util/validation/limits.go | 12 +- pkg/util/validation/limits_map.go | 110 ----- pkg/util/validation/limits_map_test.go | 433 ------------------ .../validation/notifications_limit_flag.go | 5 +- .../notifications_limit_flag_test.go | 20 +- tools/doc-generator/parse/parser.go | 24 +- 9 files changed, 71 insertions(+), 580 deletions(-) create mode 100644 pkg/ruler/notifier_config.go delete mode 100644 pkg/util/validation/limits_map.go delete mode 100644 pkg/util/validation/limits_map_test.go diff --git a/pkg/ruler/notifier.go b/pkg/ruler/notifier.go index 621133611c9..360df16b958 100644 --- a/pkg/ruler/notifier.go +++ b/pkg/ruler/notifier.go @@ -27,7 +27,6 @@ import ( "github.com/grafana/mimir/pkg/util" util_log "github.com/grafana/mimir/pkg/util/log" - "github.com/grafana/mimir/pkg/util/validation" ) var ( @@ -52,11 +51,11 @@ func (cfg *NotifierConfig) RegisterFlags(f *flag.FlagSet) { } type OAuth2Config struct { - ClientID string `yaml:"client_id"` - ClientSecret flagext.Secret `yaml:"client_secret"` - TokenURL string `yaml:"token_url"` - Scopes flagext.StringSliceCSV `yaml:"scopes,omitempty"` - EndpointParams validation.LimitsMap[string] `yaml:"endpoint_params" category:"advanced"` + ClientID string `yaml:"client_id"` + ClientSecret flagext.Secret `yaml:"client_secret"` + TokenURL string `yaml:"token_url"` + Scopes flagext.StringSliceCSV `yaml:"scopes,omitempty"` + EndpointParams flagext.LimitsMap[string] `yaml:"endpoint_params" category:"advanced"` } func (cfg *OAuth2Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) { @@ -65,7 +64,7 @@ func (cfg *OAuth2Config) RegisterFlagsWithPrefix(prefix string, f *flag.FlagSet) f.StringVar(&cfg.TokenURL, prefix+"token_url", "", "Endpoint used to fetch access token.") f.Var(&cfg.Scopes, prefix+"scopes", "Optional scopes to include with the token request.") if !cfg.EndpointParams.IsInitialized() { - cfg.EndpointParams = validation.NewLimitsMap[string](nil) + cfg.EndpointParams = flagext.NewLimitsMap[string](nil) } f.Var(&cfg.EndpointParams, prefix+"endpoint-params", "Optional additional URL parameters to send to the token URL.") } diff --git a/pkg/ruler/notifier_config.go b/pkg/ruler/notifier_config.go new file mode 100644 index 00000000000..cb822db837e --- /dev/null +++ b/pkg/ruler/notifier_config.go @@ -0,0 +1,28 @@ +package ruler + +import ( + "encoding/json" + "fmt" +) + +type AlertmanagerClientConfig struct { + AlertmanagerURL string `yaml:"alertmanager_url"` + NotifierConfig NotifierConfig `yaml:",inline" json:",inline"` +} + +func (acc *AlertmanagerClientConfig) String() string { + out, err := json.Marshal(acc) + if err != nil { + return fmt.Sprintf("failed to marshal: %v", err) + } + return string(out) +} + +func (acc *AlertmanagerClientConfig) Set(s string) error { + new := AlertmanagerClientConfig{} + if err := json.Unmarshal([]byte(s), &new); err != nil { + return err + } + *acc = new + return nil +} diff --git a/pkg/ruler/notifier_test.go b/pkg/ruler/notifier_test.go index 146de3515d5..3d93c6b3d97 100644 --- a/pkg/ruler/notifier_test.go +++ b/pkg/ruler/notifier_test.go @@ -551,12 +551,12 @@ func TestBuildNotifierConfig(t *testing.T) { func TestOAuth2Config_ValidateEndpointParams(t *testing.T) { for name, tc := range map[string]struct { args []string - expected validation.LimitsMap[string] + expected flagext.LimitsMap[string] error string }{ "basic test": { args: []string{"-map-flag", "{\"param1\": \"value1\" }"}, - expected: validation.NewLimitsMapWithData(map[string]string{ + expected: flagext.NewLimitsMapWithData(map[string]string{ "param1": "value1", }, nil), }, @@ -566,7 +566,7 @@ func TestOAuth2Config_ValidateEndpointParams(t *testing.T) { }, } { t.Run(name, func(t *testing.T) { - v := validation.NewLimitsMap[string](nil) + v := flagext.NewLimitsMap[string](nil) fs := flag.NewFlagSet("test", flag.ContinueOnError) fs.SetOutput(&bytes.Buffer{}) // otherwise errors would go to stderr. diff --git a/pkg/util/validation/limits.go b/pkg/util/validation/limits.go index 8de7e601a7c..b27cb0e6473 100644 --- a/pkg/util/validation/limits.go +++ b/pkg/util/validation/limits.go @@ -216,8 +216,8 @@ type Limits struct { RulerRecordingRulesEvaluationEnabled bool `yaml:"ruler_recording_rules_evaluation_enabled" json:"ruler_recording_rules_evaluation_enabled"` RulerAlertingRulesEvaluationEnabled bool `yaml:"ruler_alerting_rules_evaluation_enabled" json:"ruler_alerting_rules_evaluation_enabled"` RulerSyncRulesOnChangesEnabled bool `yaml:"ruler_sync_rules_on_changes_enabled" json:"ruler_sync_rules_on_changes_enabled" category:"advanced"` - RulerMaxRulesPerRuleGroupByNamespace LimitsMap[int] `yaml:"ruler_max_rules_per_rule_group_by_namespace" json:"ruler_max_rules_per_rule_group_by_namespace" category:"experimental"` - RulerMaxRuleGroupsPerTenantByNamespace LimitsMap[int] `yaml:"ruler_max_rule_groups_per_tenant_by_namespace" json:"ruler_max_rule_groups_per_tenant_by_namespace" category:"experimental"` + RulerMaxRulesPerRuleGroupByNamespace flagext.LimitsMap[int] `yaml:"ruler_max_rules_per_rule_group_by_namespace" json:"ruler_max_rules_per_rule_group_by_namespace" category:"experimental"` + RulerMaxRuleGroupsPerTenantByNamespace flagext.LimitsMap[int] `yaml:"ruler_max_rule_groups_per_tenant_by_namespace" json:"ruler_max_rule_groups_per_tenant_by_namespace" category:"experimental"` RulerProtectedNamespaces flagext.StringSliceCSV `yaml:"ruler_protected_namespaces" json:"ruler_protected_namespaces" category:"experimental"` RulerMaxIndependentRuleEvaluationConcurrencyPerTenant int64 `yaml:"ruler_max_independent_rule_evaluation_concurrency_per_tenant" json:"ruler_max_independent_rule_evaluation_concurrency_per_tenant" category:"experimental"` @@ -246,8 +246,8 @@ type Limits struct { AlertmanagerReceiversBlockCIDRNetworks flagext.CIDRSliceCSV `yaml:"alertmanager_receivers_firewall_block_cidr_networks" json:"alertmanager_receivers_firewall_block_cidr_networks"` AlertmanagerReceiversBlockPrivateAddresses bool `yaml:"alertmanager_receivers_firewall_block_private_addresses" json:"alertmanager_receivers_firewall_block_private_addresses"` - NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"` - NotificationRateLimitPerIntegration LimitsMap[float64] `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"` + NotificationRateLimit float64 `yaml:"alertmanager_notification_rate_limit" json:"alertmanager_notification_rate_limit"` + NotificationRateLimitPerIntegration flagext.LimitsMap[float64] `yaml:"alertmanager_notification_rate_limit_per_integration" json:"alertmanager_notification_rate_limit_per_integration"` AlertmanagerMaxGrafanaConfigSizeBytes flagext.Bytes `yaml:"alertmanager_max_grafana_config_size_bytes" json:"alertmanager_max_grafana_config_size_bytes"` AlertmanagerMaxConfigSizeBytes int `yaml:"alertmanager_max_config_size_bytes" json:"alertmanager_max_config_size_bytes"` @@ -362,12 +362,12 @@ func (l *Limits) RegisterFlags(f *flag.FlagSet) { f.BoolVar(&l.RulerSyncRulesOnChangesEnabled, "ruler.sync-rules-on-changes-enabled", true, "True to enable a re-sync of the configured rule groups as soon as they're changed via ruler's config API. This re-sync is in addition of the periodic syncing. When enabled, it may take up to few tens of seconds before a configuration change triggers the re-sync.") // Needs to be initialised to a value so that the documentation can pick up the default value of `{}` because this is set as JSON from the command-line. if !l.RulerMaxRulesPerRuleGroupByNamespace.IsInitialized() { - l.RulerMaxRulesPerRuleGroupByNamespace = NewLimitsMap[int](nil) + l.RulerMaxRulesPerRuleGroupByNamespace = flagext.NewLimitsMap[int](nil) } f.Var(&l.RulerMaxRulesPerRuleGroupByNamespace, "ruler.max-rules-per-rule-group-by-namespace", "Maximum number of rules per rule group by namespace. Value is a map, where each key is the namespace and value is the number of rules allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rules specified has the same meaning as -ruler.max-rules-per-rule-group, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rules-per-rule-group.") if !l.RulerMaxRuleGroupsPerTenantByNamespace.IsInitialized() { - l.RulerMaxRuleGroupsPerTenantByNamespace = NewLimitsMap[int](nil) + l.RulerMaxRuleGroupsPerTenantByNamespace = flagext.NewLimitsMap[int](nil) } f.Var(&l.RulerMaxRuleGroupsPerTenantByNamespace, "ruler.max-rule-groups-per-tenant-by-namespace", "Maximum number of rule groups per tenant by namespace. Value is a map, where each key is the namespace and value is the number of rule groups allowed in the namespace (int). On the command line, this map is given in a JSON format. The number of rule groups specified has the same meaning as -ruler.max-rule-groups-per-tenant, but only applies for the specific namespace. If specified, it supersedes -ruler.max-rule-groups-per-tenant.") f.Var(&l.RulerProtectedNamespaces, "ruler.protected-namespaces", "List of namespaces that are protected from modification unless a special HTTP header is used. If a namespace is protected, it can only be read, not modified via the ruler's configuration API. The value is a list of strings, where each string is a namespace name. On the command line, this list is given as a comma-separated list.") diff --git a/pkg/util/validation/limits_map.go b/pkg/util/validation/limits_map.go deleted file mode 100644 index 324d6ab2592..00000000000 --- a/pkg/util/validation/limits_map.go +++ /dev/null @@ -1,110 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package validation - -import ( - "encoding/json" - "fmt" - - "gopkg.in/yaml.v3" -) - -// LimitsMap is a flag.Value implementation that looks like a generic map, holding float64s, ints, or strings as values. -type LimitsMap[T float64 | int | string] struct { - data map[string]T - validator func(k string, v T) error -} - -func NewLimitsMap[T float64 | int | string](validator func(k string, v T) error) LimitsMap[T] { - return NewLimitsMapWithData(make(map[string]T), validator) -} - -func NewLimitsMapWithData[T float64 | int | string](data map[string]T, validator func(k string, v T) error) LimitsMap[T] { - return LimitsMap[T]{ - data: data, - validator: validator, - } -} - -// IsInitialized returns true if the map is initialized. -func (m LimitsMap[T]) IsInitialized() bool { - return m.data != nil -} - -// String implements flag.Value -func (m LimitsMap[T]) String() string { - out, err := json.Marshal(m.data) - if err != nil { - return fmt.Sprintf("failed to marshal: %v", err) - } - return string(out) -} - -// Set implements flag.Value -func (m LimitsMap[T]) Set(s string) error { - newMap := make(map[string]T) - if err := json.Unmarshal([]byte(s), &newMap); err != nil { - return err - } - return m.updateMap(newMap) -} - -func (m LimitsMap[T]) Read() map[string]T { - return m.data -} - -// Clone returns a copy of the LimitsMap. -func (m LimitsMap[T]) Clone() LimitsMap[T] { - newMap := make(map[string]T, len(m.data)) - for k, v := range m.data { - newMap[k] = v - } - return LimitsMap[T]{data: newMap, validator: m.validator} -} - -// UnmarshalYAML implements yaml.Unmarshaler. -func (m LimitsMap[T]) UnmarshalYAML(value *yaml.Node) error { - newMap := make(map[string]T) - if err := value.DecodeWithOptions(newMap, yaml.DecodeOptions{KnownFields: true}); err != nil { - return err - } - return m.updateMap(newMap) -} - -func (m LimitsMap[T]) updateMap(newMap map[string]T) error { - // Validate first, as we don't want to allow partial updates. - if m.validator != nil { - for k, v := range newMap { - if err := m.validator(k, v); err != nil { - return err - } - } - } - - clear(m.data) - for k, v := range newMap { - m.data[k] = v - } - - return nil -} - -// MarshalYAML implements yaml.Marshaler. -func (m LimitsMap[T]) MarshalYAML() (interface{}, error) { - return m.data, nil -} - -// Equal compares two LimitsMap. This is needed to allow cmp.Equal to compare two LimitsMap. -func (m LimitsMap[T]) Equal(other LimitsMap[T]) bool { - if len(m.data) != len(other.data) { - return false - } - - for k, v := range m.data { - if other.data[k] != v { - return false - } - } - - return true -} diff --git a/pkg/util/validation/limits_map_test.go b/pkg/util/validation/limits_map_test.go deleted file mode 100644 index d9bc2944a04..00000000000 --- a/pkg/util/validation/limits_map_test.go +++ /dev/null @@ -1,433 +0,0 @@ -// SPDX-License-Identifier: AGPL-3.0-only - -package validation - -import ( - "errors" - "testing" - - "github.com/google/go-cmp/cmp" - "github.com/stretchr/testify/require" - "gopkg.in/yaml.v3" -) - -var fakeFloat64Validator = func(_ string, v float64) error { - if v < 0 { - return errors.New("value cannot be negative") - } - return nil -} - -var fakeIntValidator = func(_ string, v int) error { - if v < 0 { - return errors.New("value cannot be negative") - } - return nil -} - -var fakeStringValidator = func(_ string, v string) error { - if len(v) == 0 { - return errors.New("value cannot be empty") - } - return nil -} - -func TestNewLimitsMap(t *testing.T) { - t.Run("float64", func(t *testing.T) { - lm := NewLimitsMap(fakeFloat64Validator) - lm.data["key1"] = 10.6 - require.Len(t, lm.Read(), 1) - }) - - t.Run("int", func(t *testing.T) { - lm := NewLimitsMap(fakeIntValidator) - lm.data["key1"] = 10 - require.Len(t, lm.Read(), 1) - }) - - t.Run("string", func(t *testing.T) { - lm := NewLimitsMap(fakeStringValidator) - lm.data["key1"] = "test" - require.Len(t, lm.Read(), 1) - }) -} - -func TestLimitsMap_IsNil(t *testing.T) { - tc := map[string]struct { - input LimitsMap[float64] - expected bool - }{ - - "when the map is initialised": { - input: LimitsMap[float64]{data: map[string]float64{"key1": 10}}, - expected: true, - }, - "when the map is not initialised": { - input: LimitsMap[float64]{data: nil}, - expected: false, - }, - } - - for name, tt := range tc { - t.Run(name, func(t *testing.T) { - require.Equal(t, tt.input.IsInitialized(), tt.expected) - }) - } -} - -func TestLimitsMap_SetAndString(t *testing.T) { - t.Run("numeric", func(t *testing.T) { - tc := map[string]struct { - input string - expected map[string]float64 - error string - }{ - - "set without error": { - input: `{"key1":10,"key2":20}`, - expected: map[string]float64{"key1": 10, "key2": 20}, - }, - "set with parsing error": { - input: `{"key1": 10, "key2": 20`, - error: "unexpected end of JSON input", - }, - "set with validation error": { - input: `{"key1": -10, "key2": 20}`, - error: "value cannot be negative", - }, - "set with incompatible value type": { - input: `{"key1": "abc", "key2": "def"}`, - error: "json: cannot unmarshal string into Go value of type float64", - }, - } - - for name, tt := range tc { - t.Run("numeric/"+name, func(t *testing.T) { - lm := NewLimitsMap(fakeFloat64Validator) - err := lm.Set(tt.input) - if tt.error != "" { - require.Error(t, err) - require.Equal(t, tt.error, err.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expected, lm.Read()) - require.Equal(t, tt.input, lm.String()) - } - }) - } - }) - - t.Run("string", func(t *testing.T) { - tc := map[string]struct { - input string - expected map[string]string - error string - }{ - - "set without error": { - input: `{"key1":"abc","key2":"def"}`, - expected: map[string]string{"key1": "abc", "key2": "def"}, - }, - "set with parsing error": { - input: `{"key1": "abc", "key2": "def`, - error: "unexpected end of JSON input", - }, - "set with validation error": { - input: `{"key1": "", "key2": "def"}`, - error: "value cannot be empty", - }, - "set with incompatible value type": { - input: `{"key1": 10, "key2": 20}`, - error: "json: cannot unmarshal number into Go value of type string", - }, - } - - for name, tt := range tc { - t.Run("string/"+name, func(t *testing.T) { - lm := NewLimitsMap(fakeStringValidator) - err := lm.Set(tt.input) - if tt.error != "" { - require.Error(t, err) - require.Equal(t, tt.error, err.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expected, lm.Read()) - require.Equal(t, tt.input, lm.String()) - } - }) - } - }) -} - -func TestLimitsMap_UnmarshalYAML(t *testing.T) { - t.Run("numeric", func(t *testing.T) { - tc := []struct { - name string - input string - expected map[string]float64 - error string - }{ - { - name: "unmarshal without error", - input: ` -key1: 10 -key2: 20 -`, - expected: map[string]float64{"key1": 10, "key2": 20}, - }, - { - name: "unmarshal with validation error", - input: ` -key1: -10 -key2: 20 -`, - error: "value cannot be negative", - }, - { - name: "unmarshal with parsing error", - input: ` -key1: 10 -key2: 20 - key3: 30 -`, - error: "yaml: line 3: found a tab character that violates indentation", - }, - } - - for _, tt := range tc { - t.Run(tt.name, func(t *testing.T) { - lm := NewLimitsMap(fakeFloat64Validator) - err := yaml.Unmarshal([]byte(tt.input), &lm) - if tt.error != "" { - require.Error(t, err) - require.Equal(t, tt.error, err.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expected, lm.data) - } - }) - } - }) - - t.Run("string", func(t *testing.T) { - tc := []struct { - name string - input string - expected map[string]string - error string - }{ - { - name: "unmarshal without error", - input: ` -key1: abc -key2: def -`, - expected: map[string]string{"key1": "abc", "key2": "def"}, - }, - { - name: "unmarshal with validation error", - input: ` -key1: abc -key2: "" -`, - error: "value cannot be empty", - }, - { - name: "unmarshal with parsing error", - input: ` -key1: abc -key2: def - key3: ghi -`, - error: "yaml: line 3: found a tab character that violates indentation", - }, - } - - for _, tt := range tc { - t.Run(tt.name, func(t *testing.T) { - lm := NewLimitsMap(fakeStringValidator) - err := yaml.Unmarshal([]byte(tt.input), &lm) - if tt.error != "" { - require.Error(t, err) - require.Equal(t, tt.error, err.Error()) - } else { - require.NoError(t, err) - require.Equal(t, tt.expected, lm.data) - } - }) - } - }) - -} - -func TestLimitsMap_MarshalYAML(t *testing.T) { - t.Run("numeric", func(t *testing.T) { - lm := NewLimitsMap(fakeFloat64Validator) - lm.data["key1"] = 10 - lm.data["key2"] = 20 - - out, err := yaml.Marshal(&lm) - require.NoError(t, err) - require.Equal(t, "key1: 10\nkey2: 20\n", string(out)) - }) - - t.Run("string", func(t *testing.T) { - lm := NewLimitsMap(fakeStringValidator) - lm.data["key1"] = "abc" - lm.data["key2"] = "def" - - out, err := yaml.Marshal(&lm) - require.NoError(t, err) - require.Equal(t, "key1: abc\nkey2: def\n", string(out)) - }) -} - -func TestLimitsMap_Equal(t *testing.T) { - t.Run("numeric", func(t *testing.T) { - tc := map[string]struct { - map1 LimitsMap[float64] - map2 LimitsMap[float64] - expected bool - }{ - "Equal maps with same key-value pairs": { - map1: LimitsMap[float64]{data: map[string]float64{"key1": 1.1, "key2": 2.2}}, - map2: LimitsMap[float64]{data: map[string]float64{"key1": 1.1, "key2": 2.2}}, - expected: true, - }, - "Different maps with different lengths": { - map1: LimitsMap[float64]{data: map[string]float64{"key1": 1.1}}, - map2: LimitsMap[float64]{data: map[string]float64{"key1": 1.1, "key2": 2.2}}, - expected: false, - }, - "Different maps with same keys but different values": { - map1: LimitsMap[float64]{data: map[string]float64{"key1": 1.1}}, - map2: LimitsMap[float64]{data: map[string]float64{"key1": 1.2}}, - expected: false, - }, - "Equal empty maps": { - map1: LimitsMap[float64]{data: map[string]float64{}}, - map2: LimitsMap[float64]{data: map[string]float64{}}, - expected: true, - }, - } - - for name, tt := range tc { - t.Run(name, func(t *testing.T) { - require.Equal(t, tt.expected, tt.map1.Equal(LimitsMap[float64]{data: tt.map2.data})) - require.Equal(t, tt.expected, cmp.Equal(tt.map1, tt.map2)) - }) - } - }) - - t.Run("string", func(t *testing.T) { - tc := map[string]struct { - map1 LimitsMap[string] - map2 LimitsMap[string] - expected bool - }{ - "Equal maps with same key-value pairs": { - map1: LimitsMap[string]{data: map[string]string{"key1": "abc", "key2": "def"}}, - map2: LimitsMap[string]{data: map[string]string{"key1": "abc", "key2": "def"}}, - expected: true, - }, - "Different maps with different lengths": { - map1: LimitsMap[string]{data: map[string]string{"key1": "abc"}}, - map2: LimitsMap[string]{data: map[string]string{"key1": "abc", "key2": "def"}}, - expected: false, - }, - "Different maps with same keys but different values": { - map1: LimitsMap[string]{data: map[string]string{"key1": "abc"}}, - map2: LimitsMap[string]{data: map[string]string{"key1": "def"}}, - expected: false, - }, - "Equal empty maps": { - map1: LimitsMap[string]{data: map[string]string{}}, - map2: LimitsMap[string]{data: map[string]string{}}, - expected: true, - }, - } - - for name, tt := range tc { - t.Run(name, func(t *testing.T) { - require.Equal(t, tt.expected, tt.map1.Equal(LimitsMap[string]{data: tt.map2.data})) - require.Equal(t, tt.expected, cmp.Equal(tt.map1, tt.map2)) - }) - } - }) - -} - -func TestLimitsMap_Clone(t *testing.T) { - t.Run("numeric", func(t *testing.T) { - // Create an initial LimitsMap with some data. - original := NewLimitsMap[float64](nil) - original.data["limit1"] = 1.0 - original.data["limit2"] = 2.0 - - // Clone the original LimitsMap. - cloned := original.Clone() - - // Check that the cloned LimitsMap is equal to the original. - require.True(t, original.Equal(cloned), "expected cloned LimitsMap to be different from original") - - // Modify the original LimitsMap and ensure the cloned map is not affected. - original.data["limit1"] = 10.0 - require.False(t, cloned.data["limit1"] == 10.0, "expected cloned LimitsMap to be unaffected by changes to original") - - // Modify the cloned LimitsMap and ensure the original map is not affected. - cloned.data["limit3"] = 3.0 - _, exists := original.data["limit3"] - require.False(t, exists, "expected original LimitsMap to be unaffected by changes to cloned") - }) - - t.Run("string", func(t *testing.T) { - // Create an initial LimitsMap with some data. - original := NewLimitsMap[string](nil) - original.data["limit1"] = "abc" - original.data["limit2"] = "def" - - // Clone the original LimitsMap. - cloned := original.Clone() - - // Check that the cloned LimitsMap is equal to the original. - require.True(t, original.Equal(cloned), "expected cloned LimitsMap to be different from original") - - // Modify the original LimitsMap and ensure the cloned map is not affected. - original.data["limit1"] = "zxcv" - require.False(t, cloned.data["limit1"] == "zxcv", "expected cloned LimitsMap to be unaffected by changes to original") - - // Modify the cloned LimitsMap and ensure the original map is not affected. - cloned.data["limit3"] = "test" - _, exists := original.data["limit3"] - require.False(t, exists, "expected original LimitsMap to be unaffected by changes to cloned") - }) -} - -func TestLimitsMap_updateMap(t *testing.T) { - t.Run("does not apply partial updates", func(t *testing.T) { - initialData := map[string]float64{"a": 1.0, "b": 2.0} - updateData := map[string]float64{"a": 3.0, "b": -3.0, "c": 5.0} - - limitsMap := LimitsMap[float64]{data: initialData, validator: fakeFloat64Validator} - - err := limitsMap.updateMap(updateData) - require.Error(t, err) - - // Verify that no partial updates were applied. - // Because maps in Go are accessed in random order, there's a chance that the validation will fail on the first invalid element of the map thus not asserting partial updates. - expectedData := map[string]float64{"a": 1.0, "b": 2.0} - require.Equal(t, expectedData, limitsMap.Read()) - }) - - t.Run("updates totally replace all values", func(t *testing.T) { - initialData := map[string]float64{"a": 1.0, "b": 2.0} - updateData := map[string]float64{"b": 5.0, "c": 6.0} - limitsMap := LimitsMap[float64]{data: initialData, validator: fakeFloat64Validator} - - err := limitsMap.updateMap(updateData) - require.NoError(t, err) - - expectedData := updateData - require.Equal(t, expectedData, limitsMap.Read()) - }) -} diff --git a/pkg/util/validation/notifications_limit_flag.go b/pkg/util/validation/notifications_limit_flag.go index 5b31ae1e829..f119ece2e62 100644 --- a/pkg/util/validation/notifications_limit_flag.go +++ b/pkg/util/validation/notifications_limit_flag.go @@ -8,6 +8,7 @@ package validation import ( "github.com/pkg/errors" + "github.com/grafana/dskit/flagext" "github.com/grafana/mimir/pkg/util" ) @@ -24,6 +25,6 @@ var allowedIntegrationNames = []string{ } // NotificationRateLimitMap returns a map that can be used as a flag for setting notification rate limits. -func NotificationRateLimitMap() LimitsMap[float64] { - return NewLimitsMap[float64](validateIntegrationLimit) +func NotificationRateLimitMap() flagext.LimitsMap[float64] { + return flagext.NewLimitsMap[float64](validateIntegrationLimit) } diff --git a/pkg/util/validation/notifications_limit_flag_test.go b/pkg/util/validation/notifications_limit_flag_test.go index 689a3d52067..f5232733e53 100644 --- a/pkg/util/validation/notifications_limit_flag_test.go +++ b/pkg/util/validation/notifications_limit_flag_test.go @@ -8,9 +8,9 @@ package validation import ( "bytes" "flag" - "maps" "testing" + "github.com/grafana/dskit/flagext" "github.com/stretchr/testify/assert" "github.com/stretchr/testify/require" "gopkg.in/yaml.v3" @@ -19,17 +19,23 @@ import ( func TestNotificationLimitsMap(t *testing.T) { for name, tc := range map[string]struct { args []string - expected LimitsMap[float64] + expected flagext.LimitsMap[float64] error string }{ "basic test": { args: []string{"-map-flag", "{\"email\": 100 }"}, - expected: LimitsMap[float64]{ + expected: flagext.NewLimitsMapWithData[float64]( + map[string]float64{ + "email": 100, + }, + validateIntegrationLimit, + ), + /*expected: flagext.LimitsMap[float64]{ validator: validateIntegrationLimit, data: map[string]float64{ "email": 100, }, - }, + },*/ }, "unknown integration": { @@ -55,14 +61,14 @@ func TestNotificationLimitsMap(t *testing.T) { assert.Equal(t, tc.error, err.Error()) } else { assert.NoError(t, err) - assert.True(t, maps.Equal(tc.expected.data, v.data)) + assert.True(t, tc.expected.Equal(v)) } }) } } type TestStruct struct { - Flag LimitsMap[float64] `yaml:"flag"` + Flag flagext.LimitsMap[float64] `yaml:"flag"` } func TestNotificationsLimitMapYaml(t *testing.T) { @@ -84,7 +90,7 @@ func TestNotificationsLimitMapYaml(t *testing.T) { err = yaml.Unmarshal(expected, &actualStruct) require.NoError(t, err) - assert.Equal(t, testStruct.Flag.data, actualStruct.Flag.data) + assert.True(t, testStruct.Flag.Equal(actualStruct.Flag)) } func TestUnknownIntegrationWhenLoadingYaml(t *testing.T) { diff --git a/tools/doc-generator/parse/parser.go b/tools/doc-generator/parse/parser.go index 9892879459c..0b360d2bf73 100644 --- a/tools/doc-generator/parse/parser.go +++ b/tools/doc-generator/parse/parser.go @@ -349,11 +349,11 @@ func getFieldName(field reflect.StructField) string { func getFieldCustomType(t reflect.Type) (string, bool) { // Handle custom data types used in the config switch t.String() { - case reflect.TypeOf(validation.LimitsMap[float64]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[float64]{}).String(): return "map of string to float64", true - case reflect.TypeOf(validation.LimitsMap[int]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[int]{}).String(): return "map of string to int", true - case reflect.TypeOf(validation.LimitsMap[string]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[string]{}).String(): return "map of string to string", true case reflect.TypeOf(&url.URL{}).String(): return "url", true @@ -365,9 +365,9 @@ func getFieldCustomType(t reflect.Type) (string, bool) { return "string", true case reflect.TypeOf([]*relabel.Config{}).String(): return "relabel_config...", true - case reflect.TypeOf([]*validation.BlockedQuery{}).String(): + case reflect.TypeOf([]*flagext.BlockedQuery{}).String(): return "blocked_queries_config...", true - case reflect.TypeOf([]*validation.BlockedRequest{}).String(): + case reflect.TypeOf([]*flagext.BlockedRequest{}).String(): return "blocked_requests_config...", true case reflect.TypeOf(asmodel.CustomTrackersConfig{}).String(): return "map of tracker name (string) to matcher (string)", true @@ -439,11 +439,11 @@ func getFieldType(t reflect.Type) (string, error) { func getCustomFieldType(t reflect.Type) (string, bool) { // Handle custom data types used in the config switch t.String() { - case reflect.TypeOf(validation.LimitsMap[float64]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[float64]{}).String(): return "map of string to float64", true - case reflect.TypeOf(validation.LimitsMap[int]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[int]{}).String(): return "map of string to int", true - case reflect.TypeOf(validation.LimitsMap[string]{}).String(): + case reflect.TypeOf(flagext.LimitsMap[string]{}).String(): return "map of string to string", true case reflect.TypeOf(&url.URL{}).String(): return "url", true @@ -455,9 +455,9 @@ func getCustomFieldType(t reflect.Type) (string, bool) { return "string", true case reflect.TypeOf([]*relabel.Config{}).String(): return "relabel_config...", true - case reflect.TypeOf([]*validation.BlockedQuery{}).String(): + case reflect.TypeOf([]*flagext.BlockedQuery{}).String(): return "blocked_queries_config...", true - case reflect.TypeOf([]*validation.BlockedRequest{}).String(): + case reflect.TypeOf([]*flagext.BlockedRequest{}).String(): return "blocked_requests_config...", true case reflect.TypeOf(asmodel.CustomTrackersConfig{}).String(): return "map of tracker name (string) to matcher (string)", true @@ -495,9 +495,9 @@ func ReflectType(typ string) reflect.Type { case "blocked_requests_config...": return reflect.TypeOf([]*validation.BlockedRequest{}) case "map of string to float64": - return reflect.TypeOf(validation.LimitsMap[float64]{}) + return reflect.TypeOf(flagext.LimitsMap[float64]{}) case "map of string to int": - return reflect.TypeOf(validation.LimitsMap[int]{}) + return reflect.TypeOf(flagext.LimitsMap[int]{}) case "list of durations": return reflect.TypeOf(tsdb.DurationList{}) default: From a5693ce1b20bb0f5143fef01cc615c09601e1862 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 09:54:31 -0600 Subject: [PATCH 03/10] Remove irrelevant untracked file --- pkg/ruler/notifier_config.go | 28 ---------------------------- 1 file changed, 28 deletions(-) delete mode 100644 pkg/ruler/notifier_config.go diff --git a/pkg/ruler/notifier_config.go b/pkg/ruler/notifier_config.go deleted file mode 100644 index cb822db837e..00000000000 --- a/pkg/ruler/notifier_config.go +++ /dev/null @@ -1,28 +0,0 @@ -package ruler - -import ( - "encoding/json" - "fmt" -) - -type AlertmanagerClientConfig struct { - AlertmanagerURL string `yaml:"alertmanager_url"` - NotifierConfig NotifierConfig `yaml:",inline" json:",inline"` -} - -func (acc *AlertmanagerClientConfig) String() string { - out, err := json.Marshal(acc) - if err != nil { - return fmt.Sprintf("failed to marshal: %v", err) - } - return string(out) -} - -func (acc *AlertmanagerClientConfig) Set(s string) error { - new := AlertmanagerClientConfig{} - if err := json.Unmarshal([]byte(s), &new); err != nil { - return err - } - *acc = new - return nil -} From 17bb9b43bb0b1c7e98564ed4f06d2f4ebecd3d72 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 10:44:25 -0600 Subject: [PATCH 04/10] Overzealous copy paste --- tools/doc-generator/parse/parser.go | 8 ++++---- 1 file changed, 4 insertions(+), 4 deletions(-) diff --git a/tools/doc-generator/parse/parser.go b/tools/doc-generator/parse/parser.go index 0b360d2bf73..de945619680 100644 --- a/tools/doc-generator/parse/parser.go +++ b/tools/doc-generator/parse/parser.go @@ -365,9 +365,9 @@ func getFieldCustomType(t reflect.Type) (string, bool) { return "string", true case reflect.TypeOf([]*relabel.Config{}).String(): return "relabel_config...", true - case reflect.TypeOf([]*flagext.BlockedQuery{}).String(): + case reflect.TypeOf([]*validation.BlockedQuery{}).String(): return "blocked_queries_config...", true - case reflect.TypeOf([]*flagext.BlockedRequest{}).String(): + case reflect.TypeOf([]*validation.BlockedRequest{}).String(): return "blocked_requests_config...", true case reflect.TypeOf(asmodel.CustomTrackersConfig{}).String(): return "map of tracker name (string) to matcher (string)", true @@ -455,9 +455,9 @@ func getCustomFieldType(t reflect.Type) (string, bool) { return "string", true case reflect.TypeOf([]*relabel.Config{}).String(): return "relabel_config...", true - case reflect.TypeOf([]*flagext.BlockedQuery{}).String(): + case reflect.TypeOf([]*validation.BlockedQuery{}).String(): return "blocked_queries_config...", true - case reflect.TypeOf([]*flagext.BlockedRequest{}).String(): + case reflect.TypeOf([]*validation.BlockedRequest{}).String(): return "blocked_requests_config...", true case reflect.TypeOf(asmodel.CustomTrackersConfig{}).String(): return "map of tracker name (string) to matcher (string)", true From 3d8e9e38b7d2d2e5d1a5d12aa87a9a27ac1f3aa2 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 10:58:36 -0600 Subject: [PATCH 05/10] another reference in tests --- pkg/ruler/notifier_test.go | 3 +-- 1 file changed, 1 insertion(+), 2 deletions(-) diff --git a/pkg/ruler/notifier_test.go b/pkg/ruler/notifier_test.go index 3d93c6b3d97..c47977ec32b 100644 --- a/pkg/ruler/notifier_test.go +++ b/pkg/ruler/notifier_test.go @@ -24,7 +24,6 @@ import ( "github.com/stretchr/testify/require" "github.com/grafana/mimir/pkg/util" - "github.com/grafana/mimir/pkg/util/validation" ) func TestBuildNotifierConfig(t *testing.T) { @@ -387,7 +386,7 @@ func TestBuildNotifierConfig(t *testing.T) { ClientID: "oauth2-client-id", ClientSecret: flagext.SecretWithValue("test"), TokenURL: "https://oauth2-token-endpoint.local/token", - EndpointParams: validation.NewLimitsMapWithData[string]( + EndpointParams: flagext.NewLimitsMapWithData[string]( map[string]string{ "param1": "value1", "param2": "value2", From 1beb63da802b8ad26d6eb2ee37d6f86f23e5cc18 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 11:14:10 -0600 Subject: [PATCH 06/10] lint --- pkg/util/validation/notifications_limit_flag.go | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/pkg/util/validation/notifications_limit_flag.go b/pkg/util/validation/notifications_limit_flag.go index f119ece2e62..7465671da59 100644 --- a/pkg/util/validation/notifications_limit_flag.go +++ b/pkg/util/validation/notifications_limit_flag.go @@ -6,9 +6,9 @@ package validation import ( + "github.com/grafana/dskit/flagext" "github.com/pkg/errors" - "github.com/grafana/dskit/flagext" "github.com/grafana/mimir/pkg/util" ) From 3112a996bbbb38cf98b7db1d7dcb07d14bbeaa09 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 11:25:34 -0600 Subject: [PATCH 07/10] make docs --- .../configure/configuration-parameters/index.md | 15 +++++++++++++++ 1 file changed, 15 insertions(+) diff --git a/docs/sources/mimir/configure/configuration-parameters/index.md b/docs/sources/mimir/configure/configuration-parameters/index.md index bb524bc6566..6d41b97058d 100644 --- a/docs/sources/mimir/configure/configuration-parameters/index.md +++ b/docs/sources/mimir/configure/configuration-parameters/index.md @@ -755,6 +755,21 @@ grpc_tls_config: # (advanced) Base path to serve all API routes from (e.g. /v1/) # CLI flag: -server.path-prefix [http_path_prefix: | default = ""] + +cluster_validation: + # Optionally define server's cluster validation label. + # CLI flag: -server.cluster-validation.label + [label: | default = ""] + + grpc: + # When enabled, cluster label validation will be executed. + # CLI flag: -server.cluster-validation.grpc.enabled + [enabled: | default = false] + + # When enabled, soft cluster label validation will be executed. Can be + # enabled only together with server.cluster-validation.grpc.enabled + # CLI flag: -server.cluster-validation.grpc.soft-validation + [softvalidation: | default = false] ``` ### distributor From e8d1e5b4090ca8f3dd6edb5892d220b904986c98 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 11:27:51 -0600 Subject: [PATCH 08/10] make reference-help --- cmd/mimir/config-descriptor.json | 50 ++++++++++++++++++++++++++++++++ cmd/mimir/help-all.txt.tmpl | 6 ++++ cmd/mimir/help.txt.tmpl | 6 ++++ 3 files changed, 62 insertions(+) diff --git a/cmd/mimir/config-descriptor.json b/cmd/mimir/config-descriptor.json index af0437812f9..197cc4f072d 100644 --- a/cmd/mimir/config-descriptor.json +++ b/cmd/mimir/config-descriptor.json @@ -715,6 +715,56 @@ "fieldFlag": "server.path-prefix", "fieldType": "string", "fieldCategory": "advanced" + }, + { + "kind": "block", + "name": "cluster_validation", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "label", + "required": false, + "desc": "Optionally define server's cluster validation label.", + "fieldValue": null, + "fieldDefaultValue": "", + "fieldFlag": "server.cluster-validation.label", + "fieldType": "string" + }, + { + "kind": "block", + "name": "grpc", + "required": false, + "desc": "", + "blockEntries": [ + { + "kind": "field", + "name": "enabled", + "required": false, + "desc": "When enabled, cluster label validation will be executed.", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "server.cluster-validation.grpc.enabled", + "fieldType": "boolean" + }, + { + "kind": "field", + "name": "softvalidation", + "required": false, + "desc": "When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled", + "fieldValue": null, + "fieldDefaultValue": false, + "fieldFlag": "server.cluster-validation.grpc.soft-validation", + "fieldType": "boolean" + } + ], + "fieldValue": null, + "fieldDefaultValue": null + } + ], + "fieldValue": null, + "fieldDefaultValue": null } ], "fieldValue": null, diff --git a/cmd/mimir/help-all.txt.tmpl b/cmd/mimir/help-all.txt.tmpl index b959b67e048..bb4253487fd 100644 --- a/cmd/mimir/help-all.txt.tmpl +++ b/cmd/mimir/help-all.txt.tmpl @@ -3161,6 +3161,12 @@ Usage of ./cmd/mimir/mimir: Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. -runtime-config.reload-period duration How often to check runtime config files. (default 10s) + -server.cluster-validation.grpc.enabled + When enabled, cluster label validation will be executed. + -server.cluster-validation.grpc.soft-validation + When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled + -server.cluster-validation.label string + Optionally define server's cluster validation label. -server.graceful-shutdown-timeout duration Timeout for graceful shutdowns (default 30s) -server.grpc-conn-limit int diff --git a/cmd/mimir/help.txt.tmpl b/cmd/mimir/help.txt.tmpl index 9b3daf16e36..0947df939f0 100644 --- a/cmd/mimir/help.txt.tmpl +++ b/cmd/mimir/help.txt.tmpl @@ -771,6 +771,12 @@ Usage of ./cmd/mimir/mimir: The tenant's shard size when sharding is used by ruler. Value of 0 disables shuffle sharding for the tenant, and tenant rules will be sharded across all ruler replicas. -runtime-config.file comma-separated-list-of-strings Comma separated list of yaml files with the configuration that can be updated at runtime. Runtime config files will be merged from left to right. + -server.cluster-validation.grpc.enabled + When enabled, cluster label validation will be executed. + -server.cluster-validation.grpc.soft-validation + When enabled, soft cluster label validation will be executed. Can be enabled only together with server.cluster-validation.grpc.enabled + -server.cluster-validation.label string + Optionally define server's cluster validation label. -server.grpc-listen-address string gRPC server listen address. -server.grpc-listen-port int From 94ae1581dc489b02c6eda7ad74cacb48f139577c Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 11:29:06 -0600 Subject: [PATCH 09/10] drop comment --- pkg/util/validation/notifications_limit_flag_test.go | 6 ------ 1 file changed, 6 deletions(-) diff --git a/pkg/util/validation/notifications_limit_flag_test.go b/pkg/util/validation/notifications_limit_flag_test.go index f5232733e53..6a2051f5f59 100644 --- a/pkg/util/validation/notifications_limit_flag_test.go +++ b/pkg/util/validation/notifications_limit_flag_test.go @@ -30,12 +30,6 @@ func TestNotificationLimitsMap(t *testing.T) { }, validateIntegrationLimit, ), - /*expected: flagext.LimitsMap[float64]{ - validator: validateIntegrationLimit, - data: map[string]float64{ - "email": 100, - }, - },*/ }, "unknown integration": { From 460428e9d1965185315d9db2c51159074ab14367 Mon Sep 17 00:00:00 2001 From: Alex Weaver Date: Fri, 28 Feb 2025 11:48:33 -0600 Subject: [PATCH 10/10] Get tidy vendor again to pull in dskit 654 --- go.mod | 4 +- go.sum | 8 +-- .../dskit/cache/memcached_server_selector.go | 68 ++++++++----------- .../grafana/gomemcache/memcache/selector.go | 41 +++++++---- vendor/modules.txt | 6 +- 5 files changed, 62 insertions(+), 65 deletions(-) diff --git a/go.mod b/go.mod index b27bec2d521..de16b73dfc0 100644 --- a/go.mod +++ b/go.mod @@ -22,7 +22,7 @@ require ( github.com/golang/snappy v0.0.4 github.com/google/gopacket v1.1.19 github.com/gorilla/mux v1.8.1 - github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed + github.com/grafana/dskit v0.0.0-20250228172433-c35b68b89416 github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc github.com/hashicorp/golang-lru v1.0.2 // indirect github.com/influxdata/influxdb/v2 v2.7.11 @@ -221,7 +221,7 @@ require ( github.com/googleapis/enterprise-certificate-proxy v0.3.4 // indirect github.com/googleapis/gax-go/v2 v2.14.1 // indirect github.com/gosimple/slug v1.1.1 // indirect - github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 + github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1 github.com/hashicorp/consul/api v1.31.0 // indirect github.com/hashicorp/errwrap v1.1.0 // indirect github.com/hashicorp/go-cleanhttp v0.5.2 // indirect diff --git a/go.sum b/go.sum index bab9d9566e1..faa25a5347f 100644 --- a/go.sum +++ b/go.sum @@ -1272,16 +1272,16 @@ github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc h1:PXZQA2WCxe85T github.com/grafana-tools/sdk v0.0.0-20220919052116-6562121319fc/go.mod h1:AHHlOEv1+GGQ3ktHMlhuTUwo3zljV3QJbC0+8o2kn+4= github.com/grafana/alerting v0.0.0-20250225150117-15e285d78df2 h1:kESrzm0FcRVLmGIQCgl1MCwDGLH4sLzWphr7mcFdbfI= github.com/grafana/alerting v0.0.0-20250225150117-15e285d78df2/go.mod h1:hdGB3dSl8Ma9Rjo2YiAEAjMkZ5HiNJbNDqRKDefRZrM= -github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed h1:fyMK3XSnKgheQVXuL84QM/IWu+bruBC8Wez5en7S+YI= -github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed/go.mod h1:TOlLjBa9TGnuo3Ow7X0zfFFSOye7FFhsN2wtaGXknJI= +github.com/grafana/dskit v0.0.0-20250228172433-c35b68b89416 h1:Of6D3MxfXQ5tCaAYT/BtcKU5oGsp4YHDGeNF4Kc/wzI= +github.com/grafana/dskit v0.0.0-20250228172433-c35b68b89416/go.mod h1:bs/kOQFsasdmaU2gT1xcRJN4Em5zg0BX5G8O9Pt3XJY= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc h1:BW+LjKJDz0So5LI8UZfW5neWeKpSkWqhmGjQFzcFfLM= github.com/grafana/e2e v0.1.2-0.20240118170847-db90b84177fc/go.mod h1:JVmqPBe8A/pZWwRoJW5ZjyALeY5OXMzPl7LrVXOdZAI= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937 h1:fwwnG/NcygoS6XbAaEyK2QzMXI/BZIEJvQ3CD+7XZm8= github.com/grafana/franz-go v0.0.0-20241009100846-782ba1442937/go.mod h1:NreRdJ2F7dziDY/m6VyspWd6sNxHKXdMZI42UfQ3GXM= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce h1:WI1olbgS+sEl77qxEYbmt9TgRUz7iLqmjh8lYPpGlKQ= github.com/grafana/goautoneg v0.0.0-20240607115440-f335c04c58ce/go.mod h1:GFAN9Jn9t1cX7sNfc6ZoFyc4f7i8jtm3SajrWdZM2EE= -github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 h1:1TeKhyS+pvzOeyLV1XPZsiqebnKky/AKS3pJNNbHVPo= -github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40/go.mod h1:IGRj8oOoxwJbHBYl1+OhS9UjQR0dv6SQOep7HqmtyFU= +github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1 h1:vR5nELq+KtGO+IiGW+AclWeQ7uhLHCEz/zyQwbQVNnQ= +github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1/go.mod h1:j/s0jkda4UXTemDs7Pgw/vMT06alWc42CHisvYac0qw= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe h1:yIXAAbLswn7VNWBIvM71O2QsgfgW9fRXZNR0DXe6pDU= github.com/grafana/memberlist v0.3.1-0.20220714140823-09ffed8adbbe/go.mod h1:MS2lj3INKhZjWNqd3N0m3J+Jxf3DAOnAH9VT3Sh9MUE= github.com/grafana/mimir-prometheus v0.0.0-20250227233049-0c8e28d13db8 h1:HMH5igbkbJvH01RV8bnjAsFUBLLyoEK8ab6Rb+/p+js= diff --git a/vendor/github.com/grafana/dskit/cache/memcached_server_selector.go b/vendor/github.com/grafana/dskit/cache/memcached_server_selector.go index 2ef53169919..0cc1f874b29 100644 --- a/vendor/github.com/grafana/dskit/cache/memcached_server_selector.go +++ b/vendor/github.com/grafana/dskit/cache/memcached_server_selector.go @@ -12,15 +12,6 @@ import ( "github.com/grafana/gomemcache/memcache" ) -var ( - addrsPool = sync.Pool{ - New: func() interface{} { - addrs := make([]net.Addr, 0, 64) - return &addrs - }, - } -) - // MemcachedJumpHashSelector implements the memcache.ServerSelector // interface, utilizing a jump hash to distribute keys to servers. // @@ -30,9 +21,8 @@ var ( // with consistent DNS names where the naturally sorted order // is predictable (ie. Kubernetes statefulsets). type MemcachedJumpHashSelector struct { - // To avoid copy and pasting all memcache server list logic, - // we embed it and implement our features on top of it. - servers memcache.ServerList + mu sync.RWMutex + addrs []net.Addr } // SetServers changes a MemcachedJumpHashSelector's set of servers at @@ -53,46 +43,35 @@ func (s *MemcachedJumpHashSelector) SetServers(servers ...string) error { copy(sortedServers, servers) natsort.Sort(sortedServers) - return s.servers.SetServers(sortedServers...) + naddr, err := memcache.ResolveServers(sortedServers) + if err != nil { + return err + } + + s.mu.Lock() + defer s.mu.Unlock() + s.addrs = naddr + return nil } // PickServer returns the server address that a given item -// should be shared onto. +// should be sharded onto. func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { - // Unfortunately we can't read the list of server addresses from - // the original implementation, so we use Each() to fetch all of them. - addrs := *(addrsPool.Get().(*[]net.Addr)) - err := s.servers.Each(func(addr net.Addr) error { - addrs = append(addrs, addr) - return nil - }) - if err != nil { - return nil, err - } + s.mu.RLock() + defer s.mu.RUnlock() // No need of a jump hash in case of 0 or 1 servers. - if len(addrs) == 0 { - addrs = (addrs)[:0] - addrsPool.Put(&addrs) + if len(s.addrs) == 0 { return nil, memcache.ErrNoServers } - if len(addrs) == 1 { - picked := addrs[0] - - addrs = (addrs)[:0] - addrsPool.Put(&addrs) - - return picked, nil + if len(s.addrs) == 1 { + return s.addrs[0], nil } // Pick a server using the jump hash. cs := xxhash.Sum64String(key) - idx := jumpHash(cs, len(addrs)) - picked := (addrs)[idx] - - addrs = (addrs)[:0] - addrsPool.Put(&addrs) - + idx := jumpHash(cs, len(s.addrs)) + picked := s.addrs[idx] return picked, nil } @@ -100,5 +79,12 @@ func (s *MemcachedJumpHashSelector) PickServer(key string) (net.Addr, error) { // If f returns a non-nil error, iteration will stop and that // error will be returned. func (s *MemcachedJumpHashSelector) Each(f func(net.Addr) error) error { - return s.servers.Each(f) + s.mu.RLock() + defer s.mu.RUnlock() + for _, a := range s.addrs { + if err := f(a); nil != err { + return err + } + } + return nil } diff --git a/vendor/github.com/grafana/gomemcache/memcache/selector.go b/vendor/github.com/grafana/gomemcache/memcache/selector.go index 89ad81e0d83..3daba1a9e05 100644 --- a/vendor/github.com/grafana/gomemcache/memcache/selector.go +++ b/vendor/github.com/grafana/gomemcache/memcache/selector.go @@ -23,6 +23,29 @@ import ( "sync" ) +// ResolveServers resolves each given server name to a UNIX or TCP address. +// An error is returned if any of the server names fail to resolve. +func ResolveServers(servers []string) ([]net.Addr, error) { + naddr := make([]net.Addr, len(servers)) + for i, server := range servers { + if strings.Contains(server, "/") { + addr, err := net.ResolveUnixAddr("unix", server) + if err != nil { + return nil, err + } + naddr[i] = newStaticAddr(addr) + } else { + tcpaddr, err := net.ResolveTCPAddr("tcp", server) + if err != nil { + return nil, err + } + naddr[i] = newStaticAddr(tcpaddr) + } + } + + return naddr, nil +} + // ServerSelector is the interface that selects a memcache server // as a function of the item's key. // @@ -66,21 +89,9 @@ func (s *staticAddr) String() string { return s.str } // resolve. No attempt is made to connect to the server. If any error // is returned, no changes are made to the ServerList. func (ss *ServerList) SetServers(servers ...string) error { - naddr := make([]net.Addr, len(servers)) - for i, server := range servers { - if strings.Contains(server, "/") { - addr, err := net.ResolveUnixAddr("unix", server) - if err != nil { - return err - } - naddr[i] = newStaticAddr(addr) - } else { - tcpaddr, err := net.ResolveTCPAddr("tcp", server) - if err != nil { - return err - } - naddr[i] = newStaticAddr(tcpaddr) - } + naddr, err := ResolveServers(servers) + if err != nil { + return err } ss.mu.Lock() diff --git a/vendor/modules.txt b/vendor/modules.txt index 48ca03ae23c..ddff5b4d305 100644 --- a/vendor/modules.txt +++ b/vendor/modules.txt @@ -651,7 +651,7 @@ github.com/grafana/alerting/receivers/webex github.com/grafana/alerting/receivers/webhook github.com/grafana/alerting/receivers/wecom github.com/grafana/alerting/templates -# github.com/grafana/dskit v0.0.0-20250227233310-511f086c60ed +# github.com/grafana/dskit v0.0.0-20250228172433-c35b68b89416 ## explicit; go 1.21 github.com/grafana/dskit/backoff github.com/grafana/dskit/ballast @@ -707,8 +707,8 @@ github.com/grafana/e2e github.com/grafana/e2e/cache github.com/grafana/e2e/db github.com/grafana/e2e/images -# github.com/grafana/gomemcache v0.0.0-20241016125027-0a5bcc5aef40 -## explicit; go 1.22 +# github.com/grafana/gomemcache v0.0.0-20250228145437-da7b95fd2ac1 +## explicit; go 1.21 github.com/grafana/gomemcache/memcache # github.com/grafana/pyroscope-go/godeltaprof v0.1.8 ## explicit; go 1.18