From 6be7eddd0f3df985f48254e96cb80dd0c30e6ed2 Mon Sep 17 00:00:00 2001 From: Arve Knudsen Date: Fri, 13 Dec 2024 10:08:39 +0100 Subject: [PATCH] Fix buffer freeing in blocksStoreQuerier Signed-off-by: Arve Knudsen --- pkg/querier/block_streaming.go | 2 +- pkg/querier/blocks_store_queryable.go | 177 +++++++++++++++----------- 2 files changed, 102 insertions(+), 77 deletions(-) diff --git a/pkg/querier/block_streaming.go b/pkg/querier/block_streaming.go index a66cef6e7b6..0c412819b5f 100644 --- a/pkg/querier/block_streaming.go +++ b/pkg/querier/block_streaming.go @@ -242,9 +242,9 @@ func (s *storeGatewayStreamReader) readStream(log *spanlogger.SpanLogger) error if err != nil { return translateReceivedError(err) } - defer msg.FreeBuffer() estimate := msg.GetStreamingChunksEstimate() + msg.FreeBuffer() if estimate == nil { return fmt.Errorf("expected to receive chunks estimate, but got message of type %T", msg.Result) } diff --git a/pkg/querier/blocks_store_queryable.go b/pkg/querier/blocks_store_queryable.go index c65fce9f755..64af7162aa8 100644 --- a/pkg/querier/blocks_store_queryable.go +++ b/pkg/querier/blocks_store_queryable.go @@ -793,93 +793,32 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return gCtx.Err() } - resp, err := stream.Recv() + var err error + var isEOS bool + var shouldRetry bool + mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, isEOS, shouldRetry, err = q.receiveMessage( + c, stream, queryLimiter, mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, + ) if errors.Is(err, io.EOF) { util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck break } if err != nil { - if shouldRetry(err) { - level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) - return nil - } - return err } - defer resp.FreeBuffer() - - // Response may either contain series, streaming series, warning or hints. - if s := resp.GetSeries(); s != nil { - // Take a safe copy of every label. - for i, l := range s.Labels { - s.Labels[i].Name = strings.Clone(l.Name) - s.Labels[i].Value = strings.Clone(l.Value) - } - mySeries = append(mySeries, s) - - // Add series fingerprint to query limiter; will return error if we are over the limit - if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil { - return err - } - - chunksCount, chunksSize := countChunksAndBytes(s) - q.metrics.chunksTotal.Add(float64(chunksCount)) - if err := queryLimiter.AddChunkBytes(chunksSize); err != nil { - return err - } - if err := queryLimiter.AddChunks(chunksCount); err != nil { - return err - } - if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil { - return err - } - } - - if w := resp.GetWarning(); w != "" { - myWarnings.Add(errors.New(w)) - } - - if h := resp.GetHints(); h != nil { - hints := hintspb.SeriesResponseHints{} - if err := types.UnmarshalAny(h, &hints); err != nil { - return errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) - } - - ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) - if err != nil { - return errors.Wrapf(err, "failed to parse queried block IDs from received hints") - } - - myQueriedBlocks = append(myQueriedBlocks, ids...) - } - - if s := resp.GetStats(); s != nil { - indexBytesFetched += s.FetchedIndexBytes + if shouldRetry { + level.Warn(log).Log("msg", "failed to receive series", "remote", c.RemoteAddress(), "err", err) + return nil } - if ss := resp.GetStreamingSeries(); ss != nil { - myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series)) - - for _, s := range ss.Series { - // Add series fingerprint to query limiter; will return error if we are over the limit - l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels) - - if limitErr := queryLimiter.AddSeries(l); limitErr != nil { - return limitErr - } - - myStreamingSeriesLabels = append(myStreamingSeriesLabels, l) + if isEOS { + // If we aren't expecting any series from this stream, close it now. + if len(myStreamingSeriesLabels) == 0 { + util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck } - if ss.IsEndOfSeriesStream { - // If we aren't expecting any series from this stream, close it now. - if len(myStreamingSeriesLabels) == 0 { - util.CloseAndExhaust[*storepb.SeriesResponse](stream) //nolint:errcheck - } - - // We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now. - break - } + // We expect "end of stream" to be sent after the hints and the stats have been sent, so we can break out of the loop now. + break } } @@ -987,6 +926,92 @@ func (q *blocksStoreQuerier) fetchSeriesFromStores(ctx context.Context, sp *stor return seriesSets, queriedBlocks, warnings, startStreamingChunks, estimateChunks, nil //nolint:govet // It's OK to return without cancelling reqCtx, see comment above. } +func (q *blocksStoreQuerier) receiveMessage(c BlocksStoreClient, stream storegatewaypb.StoreGateway_SeriesClient, queryLimiter *limiter.QueryLimiter, mySeries []*storepb.Series, myWarnings annotations.Annotations, myQueriedBlocks []ulid.ULID, myStreamingSeriesLabels []labels.Labels, indexBytesFetched uint64) ([]*storepb.Series, annotations.Annotations, []ulid.ULID, []labels.Labels, uint64, bool, bool, error) { + resp, err := stream.Recv() + if err != nil { + if errors.Is(err, io.EOF) { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + + if shouldRetry(err) { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, true, nil + } + + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + defer resp.FreeBuffer() + + // Response may either contain series, streaming series, warning or hints. + if s := resp.GetSeries(); s != nil { + // Take a safe copy of every label. + for i, l := range s.Labels { + s.Labels[i].Name = strings.Clone(l.Name) + s.Labels[i].Value = strings.Clone(l.Value) + } + mySeries = append(mySeries, s) + + // Add series fingerprint to query limiter; will return error if we are over the limit + if err := queryLimiter.AddSeries(mimirpb.FromLabelAdaptersToLabels(s.Labels)); err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + + chunksCount, chunksSize := countChunksAndBytes(s) + q.metrics.chunksTotal.Add(float64(chunksCount)) + if err := queryLimiter.AddChunkBytes(chunksSize); err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + if err := queryLimiter.AddChunks(chunksCount); err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + if err := queryLimiter.AddEstimatedChunks(chunksCount); err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, err + } + } + + if w := resp.GetWarning(); w != "" { + myWarnings.Add(errors.New(w)) + } + + if h := resp.GetHints(); h != nil { + hints := hintspb.SeriesResponseHints{} + if err := types.UnmarshalAny(h, &hints); err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to unmarshal series hints from %s", c.RemoteAddress()) + } + + ids, err := convertBlockHintsToULIDs(hints.QueriedBlocks) + if err != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, errors.Wrapf(err, "failed to parse queried block IDs from received hints") + } + + myQueriedBlocks = append(myQueriedBlocks, ids...) + } + + if s := resp.GetStats(); s != nil { + indexBytesFetched += s.FetchedIndexBytes + } + + if ss := resp.GetStreamingSeries(); ss != nil { + myStreamingSeriesLabels = slices.Grow(myStreamingSeriesLabels, len(ss.Series)) + + for _, s := range ss.Series { + l := mimirpb.FromLabelAdaptersToLabelsWithCopy(s.Labels) + + // Add series fingerprint to query limiter; will return error if we are over the limit + if limitErr := queryLimiter.AddSeries(l); limitErr != nil { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, limitErr + } + + myStreamingSeriesLabels = append(myStreamingSeriesLabels, l) + } + + if ss.IsEndOfSeriesStream { + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, true, false, nil + } + } + + return mySeries, myWarnings, myQueriedBlocks, myStreamingSeriesLabels, indexBytesFetched, false, false, nil +} + func shouldRetry(err error) bool { if errors.Is(err, context.Canceled) || errors.Is(err, context.DeadlineExceeded) { return false