diff --git a/pkg/dataobj/internal/encoding/decoder_range.go b/pkg/dataobj/internal/encoding/decoder_range.go index 19d5847d2f1db..8c802917d72e9 100644 --- a/pkg/dataobj/internal/encoding/decoder_range.go +++ b/pkg/dataobj/internal/encoding/decoder_range.go @@ -1,6 +1,7 @@ package encoding import ( + "bytes" "context" "fmt" "io" @@ -12,6 +13,16 @@ import ( "github.com/grafana/loki/v3/pkg/dataobj/internal/result" ) +// windowSize specifies the maximum amount of data to download at once from +// object storage. 16MB is chosen based on S3's [recommendations] for +// Byte-Range fetches, which recommends either 8MB or 16MB. +// +// As windowing is designed to reduce the number of requests made to object +// storage, 16MB is chosen over 8MB, as it will lead to fewer requests. +// +// [recommendations]: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html +const windowSize = 16_000_000 + // rangeReader is an interface that can read a range of bytes from an object. type rangeReader interface { // Size returns the full size of the object. @@ -114,31 +125,56 @@ func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.Sect } func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] { - getPages := func(ctx context.Context, column *streamsmd.ColumnDesc) ([]*streamsmd.PageDesc, error) { - rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize)) - if err != nil { - return nil, fmt.Errorf("reading column metadata: %w", err) + return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error { + results := make([][]*streamsmd.PageDesc, len(columns)) + + columnInfo := func(c *streamsmd.ColumnDesc) (uint64, uint64) { + return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + for window := range iterWindows(columns, columnInfo, windowSize) { + if len(window) == 0 { + continue + } - md, err := decodeStreamsColumnMetadata(br) - if err != nil { - return nil, err - } - return md.Pages, nil - } + var ( + windowOffset = window.Start().GetInfo().MetadataOffset + windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error { - for _, column := range columns { - pages, err := getPages(ctx, column) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(pages) { + return fmt.Errorf("reading column data: %w", err) + } + defer rc.Close() + + data := make([]byte, windowSize) + if _, err := io.ReadFull(rc, data); err != nil { + return fmt.Errorf("read column data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this column. + var ( + columnOffset = wp.Data.GetInfo().MetadataOffset + dataOffset = columnOffset - windowOffset + ) + + r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize]) + + md, err := decodeStreamsColumnMetadata(r) + if err != nil { + return err + } + + // wp.Position is the position of the column in the original pages + // slice; this retains the proper order of data in results. + results[wp.Position] = md.Pages + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -148,31 +184,51 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C } func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] { - getPageData := func(ctx context.Context, page *streamsmd.PageDesc) (dataset.PageData, error) { - rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize)) - if err != nil { - return nil, fmt.Errorf("reading page data: %w", err) + return result.Iter(func(yield func(dataset.PageData) bool) error { + results := make([]dataset.PageData, len(pages)) + + pageInfo := func(p *streamsmd.PageDesc) (uint64, uint64) { + return p.GetInfo().DataOffset, p.GetInfo().DataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + // TODO(rfratto): If there are many windows, it may make sense to read them + // in parallel. + for window := range iterWindows(pages, pageInfo, windowSize) { + if len(window) == 0 { + continue + } - data := make([]byte, page.Info.DataSize) - if _, err := io.ReadFull(br, data); err != nil { - return nil, fmt.Errorf("read page data: %w", err) - } - return dataset.PageData(data), nil - } + var ( + windowOffset = window.Start().GetInfo().DataOffset + windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := getPageData(ctx, page) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(data) { + return fmt.Errorf("reading page data: %w", err) + } + defer rc.Close() + + data := make([]byte, windowSize) + if _, err := io.ReadFull(rc, data); err != nil { + return fmt.Errorf("read page data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this page. + var ( + pageOffset = wp.Data.GetInfo().DataOffset + dataOffset = pageOffset - windowOffset + ) + + // wp.Position is the position of the page in the original pages slice; + // this retains the proper order of data in results. + results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize]) + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -206,31 +262,56 @@ func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.Section } func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] { - getPages := func(ctx context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) { - rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize)) - if err != nil { - return nil, fmt.Errorf("reading column metadata: %w", err) + return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { + results := make([][]*logsmd.PageDesc, len(columns)) + + columnInfo := func(c *logsmd.ColumnDesc) (uint64, uint64) { + return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + for window := range iterWindows(columns, columnInfo, windowSize) { + if len(window) == 0 { + continue + } - md, err := decodeLogsColumnMetadata(br) - if err != nil { - return nil, err - } - return md.Pages, nil - } + var ( + windowOffset = window.Start().GetInfo().MetadataOffset + windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error { - for _, column := range columns { - pages, err := getPages(ctx, column) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(pages) { + return fmt.Errorf("reading column data: %w", err) + } + defer rc.Close() + + data := make([]byte, windowSize) + if _, err := io.ReadFull(rc, data); err != nil { + return fmt.Errorf("read column data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this column. + var ( + columnOffset = wp.Data.GetInfo().MetadataOffset + dataOffset = columnOffset - windowOffset + ) + + r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize]) + + md, err := decodeLogsColumnMetadata(r) + if err != nil { + return err + } + + // wp.Position is the position of the column in the original pages + // slice; this retains the proper order of data in results. + results[wp.Position] = md.Pages + } + } + + for _, data := range results { + if !yield(data) { return nil } } @@ -240,31 +321,51 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD } func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] { - getPageData := func(ctx context.Context, page *logsmd.PageDesc) (dataset.PageData, error) { - rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize)) - if err != nil { - return nil, fmt.Errorf("reading page data: %w", err) + return result.Iter(func(yield func(dataset.PageData) bool) error { + results := make([]dataset.PageData, len(pages)) + + pageInfo := func(p *logsmd.PageDesc) (uint64, uint64) { + return p.GetInfo().DataOffset, p.GetInfo().DataSize } - defer rc.Close() - br, release := getBufioReader(rc) - defer release() + // TODO(rfratto): If there are many windows, it may make sense to read them + // in parallel. + for window := range iterWindows(pages, pageInfo, windowSize) { + if len(window) == 0 { + continue + } - data := make([]byte, page.Info.DataSize) - if _, err := io.ReadFull(br, data); err != nil { - return nil, fmt.Errorf("read page data: %w", err) - } - return dataset.PageData(data), nil - } + var ( + windowOffset = window.Start().GetInfo().DataOffset + windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset + ) - // TODO(rfratto): this retrieves all pages for all columns individually; we - // may be able to batch requests to minimize roundtrips. - return result.Iter(func(yield func(dataset.PageData) bool) error { - for _, page := range pages { - data, err := getPageData(ctx, page) + rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize)) if err != nil { - return err - } else if !yield(data) { + return fmt.Errorf("reading page data: %w", err) + } + defer rc.Close() + + data := make([]byte, windowSize) + if _, err := io.ReadFull(rc, data); err != nil { + return fmt.Errorf("read page data: %w", err) + } + + for _, wp := range window { + // Find the slice in the data for this page. + var ( + pageOffset = wp.Data.GetInfo().DataOffset + dataOffset = pageOffset - windowOffset + ) + + // wp.Position is the position of the page in the original pages slice; + // this retains the proper order of data in results. + results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize]) + } + } + + for _, data := range results { + if !yield(data) { return nil } }