Skip to content

Commit

Permalink
Modify mergeQueryable to allow reuse (#10526)
Browse files Browse the repository at this point in the history
* Modify mergeQueryable to allow reuse

* Revise according to code review

* Try suggestion
  • Loading branch information
zenador authored Jan 31, 2025
1 parent b748116 commit f1ad1e2
Showing 1 changed file with 85 additions and 26 deletions.
111 changes: 85 additions & 26 deletions pkg/querier/tenantfederation/merge_queryable.go
Original file line number Diff line number Diff line change
Expand Up @@ -51,7 +51,7 @@ func NewQueryable(upstream storage.Queryable, bypassWithSingleID bool, maxConcur
}, nil
},
}
return NewMergeQueryable(defaultTenantLabel, callbacks, tenant.NewMultiResolver(), bypassWithSingleID, maxConcurrency, reg, logger)
return NewMergeQueryable(defaultTenantLabel, callbacks, tenant.NewMultiResolver(), bypassWithSingleID, maxConcurrency, reg, logger, nil)
}

// MergeQueryableCallbacks contains callbacks to NewMergeQueryable, for customizing its behaviour.
Expand Down Expand Up @@ -103,17 +103,15 @@ func (q *tenantQuerier) Close() error {
// If the label `idLabelName` already exists, its value is overwritten and
// the previous value is exposed through a new label prefixed with "original_".
// This behaviour is not implemented recursively.
func NewMergeQueryable(idLabelName string, callbacks MergeQueryableCallbacks, resolver tenant.Resolver, bypassWithSingleID bool, maxConcurrency int, reg prometheus.Registerer, logger log.Logger) storage.Queryable {
// Note that we allow tenant.Resolver to be injected instead of using the
// tenant.TenantIDs() method because GEM needs to inject different behavior
// here for the cluster federation feature.
return &mergeQueryable{
logger: logger,
idLabelName: idLabelName,
callbacks: callbacks,
resolver: resolver,
bypassWithSingleID: bypassWithSingleID,
maxConcurrency: maxConcurrency,
//
// Passing in multiTenantSelectFunc allows for customizing the behavior of the
// Select method that is used to query multiple tenants in parallel. When left
// nil, the default behavior is used.
func NewMergeQueryable(idLabelName string, callbacks MergeQueryableCallbacks, resolver tenant.Resolver, bypassWithSingleID bool, maxConcurrency int, reg prometheus.Registerer, logger log.Logger, multiTenantSelectFunc MultiTenantSelectFunc) storage.Queryable {
if multiTenantSelectFunc == nil {
multiTenantSelectFunc = defaultMultiTenantSelectFunc
}
metrics := mergeQueryableMetrics{
tenantsQueried: promauto.With(reg).NewHistogram(prometheus.HistogramOpts{
Name: "cortex_querier_federation_tenants_queried",
Help: "Number of tenants queried for a single standard query.",
Expand All @@ -129,15 +127,34 @@ func NewMergeQueryable(idLabelName string, callbacks MergeQueryableCallbacks, re
NativeHistogramMinResetDuration: 1 * time.Hour,
}),
}
// Note that we allow tenant.Resolver to be injected instead of using the
// tenant.TenantIDs() method because GEM needs to inject different behavior
// here for the cluster federation feature.
return &mergeQueryable{
logger: logger,
idLabelName: idLabelName,
callbacks: callbacks,
resolver: resolver,
bypassWithSingleID: bypassWithSingleID,
maxConcurrency: maxConcurrency,
multiTenantSelectFunc: multiTenantSelectFunc,
mergeQueryableMetrics: metrics,
}
}

type mergeQueryable struct {
logger log.Logger
idLabelName string
bypassWithSingleID bool
callbacks MergeQueryableCallbacks
resolver tenant.Resolver
maxConcurrency int
logger log.Logger
idLabelName string
bypassWithSingleID bool
callbacks MergeQueryableCallbacks
resolver tenant.Resolver
maxConcurrency int
multiTenantSelectFunc MultiTenantSelectFunc

mergeQueryableMetrics
}

type mergeQueryableMetrics struct {
tenantsQueried prometheus.Histogram
upstreamQueryWaitDuration prometheus.Histogram
}
Expand All @@ -159,6 +176,7 @@ func (m *mergeQueryable) Querier(mint int64, maxt int64) (storage.Querier, error
bypassWithSingleID: m.bypassWithSingleID,
tenantsQueried: m.tenantsQueried,
upstreamQueryWaitDuration: m.upstreamQueryWaitDuration,
multiTenantSelectFunc: m.multiTenantSelectFunc,
}, nil
}

Expand All @@ -177,6 +195,7 @@ type mergeQuerier struct {
bypassWithSingleID bool
tenantsQueried prometheus.Histogram
upstreamQueryWaitDuration prometheus.Histogram
multiTenantSelectFunc MultiTenantSelectFunc
}

// LabelValues returns all potential values for a label name given involved federation IDs.
Expand Down Expand Up @@ -352,41 +371,81 @@ func (m *mergeQuerier) Select(ctx context.Context, sortSeries bool, hints *stora
matchedIDs, filteredMatchers := FilterValuesByMatchers(m.idLabelName, ids, matchers...)

jobs := make([]string, 0, len(matchedIDs))
seriesSets := make([]storage.SeriesSet, len(matchedIDs))
for id := range matchedIDs {
jobs = append(jobs, id)
}

wrMergeQuerier := waitRecordingMergeQuerier{
start: start,
upstreamQueryWaitDuration: m.upstreamQueryWaitDuration,
upstream: m.upstream,
}

return m.multiTenantSelectFunc(ctx, jobs, wrMergeQuerier, m.idLabelName, m.maxConcurrency, sortSeries, hints, filteredMatchers...)
}

// MultiTenantSelectFunc is an overrideable function that queries multiple tenants in parallel.
// By default, jobs would be the list of tenant IDs, idLabelName is used to add a label to the series
// to identify the tenant it belongs to, maxConcurrency is the maximum number of concurrent queries allowed,
// and ctx, sortSeries, hints, and matchers are the same as the Select method.
type MultiTenantSelectFunc func(ctx context.Context, jobs []string, selMergeQuerier SelectMergeQuerier, idLabelName string, maxConcurrency int, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet

func defaultMultiTenantSelectFunc(ctx context.Context, jobs []string, selMergeQuerier SelectMergeQuerier, idLabelName string, maxConcurrency int, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
seriesSets := make([]storage.SeriesSet, len(jobs))

// We don't use the context passed to this function, since the context has to live longer
// than the call to ForEachJob (i.e. as long as seriesSets)
run := func(_ context.Context, idx int) error {
m.upstreamQueryWaitDuration.Observe(time.Since(start).Seconds())
id := jobs[idx]
seriesSets[idx] = &addLabelsSeriesSet{
upstream: m.upstream.Select(ctx, id, sortSeries, hints, filteredMatchers...),
labels: []labels.Label{
seriesSets[idx] = NewAddLabelsSeriesSet(
selMergeQuerier.Select(ctx, id, sortSeries, hints, matchers...),
[]labels.Label{
{
Name: m.idLabelName,
Name: idLabelName,
Value: id,
},
},
}
)
return nil
}

if err := concurrency.ForEachJob(ctx, len(jobs), m.maxConcurrency, run); err != nil {
if err := concurrency.ForEachJob(ctx, len(jobs), maxConcurrency, run); err != nil {
return storage.ErrSeriesSet(err)
}

return storage.NewMergeSeriesSet(seriesSets, 0, storage.ChainedSeriesMerge)
}

type SelectMergeQuerier interface {
Select(ctx context.Context, id string, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet
}

// waitRecordingMergeQuerier is a wrapper for MergeQuerierUpstream that records the time it took to start
// the upstream query.
type waitRecordingMergeQuerier struct {
start time.Time
upstreamQueryWaitDuration prometheus.Histogram
upstream MergeQuerierUpstream
}

func (q waitRecordingMergeQuerier) Select(ctx context.Context, id string, sortSeries bool, hints *storage.SelectHints, matchers ...*labels.Matcher) storage.SeriesSet {
q.upstreamQueryWaitDuration.Observe(time.Since(q.start).Seconds())
return q.upstream.Select(ctx, id, sortSeries, hints, matchers...)
}

type addLabelsSeriesSet struct {
upstream storage.SeriesSet
labels []labels.Label
currSeries storage.Series
}

func NewAddLabelsSeriesSet(upstream storage.SeriesSet, labels []labels.Label) storage.SeriesSet {
return &addLabelsSeriesSet{
upstream: upstream,
labels: labels,
}
}

func (m *addLabelsSeriesSet) Next() bool {
m.currSeries = nil
return m.upstream.Next()
Expand Down

0 comments on commit f1ad1e2

Please sign in to comment.