Skip to content

Commit

Permalink
chore(dataobj): use windowing for downloading pages and column metadata
Browse files Browse the repository at this point in the history
  • Loading branch information
rfratto committed Mar 11, 2025
1 parent 10bfa34 commit 53fa52c
Showing 1 changed file with 181 additions and 80 deletions.
261 changes: 181 additions & 80 deletions pkg/dataobj/internal/encoding/decoder_range.go
Original file line number Diff line number Diff line change
@@ -1,6 +1,7 @@
package encoding

import (
"bytes"
"context"
"fmt"
"io"
Expand All @@ -12,6 +13,16 @@ import (
"github.com/grafana/loki/v3/pkg/dataobj/internal/result"
)

// windowSize specifies the maximum amount of data to download at once from
// object storage. 16MB is chosen based on S3's [recommendations] for
// Byte-Range fetches, which recommends either 8MB or 16MB.
//
// As windowing is designed to reduce the number of requests made to object
// storage, 16MB is chosen over 8MB, as it will lead to fewer requests.
//
// [recommendations]: https://docs.aws.amazon.com/whitepapers/latest/s3-optimizing-performance-best-practices/use-byte-range-fetches.html
const windowSize = 16_000_000

// rangeReader is an interface that can read a range of bytes from an object.
type rangeReader interface {
// Size returns the full size of the object.
Expand Down Expand Up @@ -114,31 +125,56 @@ func (rd *rangeStreamsDecoder) Columns(ctx context.Context, section *filemd.Sect
}

func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.ColumnDesc) result.Seq[[]*streamsmd.PageDesc] {
getPages := func(ctx context.Context, column *streamsmd.ColumnDesc) ([]*streamsmd.PageDesc, error) {
rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize))
if err != nil {
return nil, fmt.Errorf("reading column metadata: %w", err)
return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error {
results := make([][]*streamsmd.PageDesc, len(columns))

columnInfo := func(c *streamsmd.ColumnDesc) (uint64, uint64) {
return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize
}
defer rc.Close()

br, release := getBufioReader(rc)
defer release()
for window := range iterWindows(columns, columnInfo, windowSize) {
if len(window) == 0 {
continue
}

md, err := decodeStreamsColumnMetadata(br)
if err != nil {
return nil, err
}
return md.Pages, nil
}
var (
windowOffset = window.Start().GetInfo().MetadataOffset
windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset
)

// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func([]*streamsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(pages) {
return fmt.Errorf("reading column data: %w", err)
}
defer rc.Close()

data := make([]byte, windowSize)
if _, err := io.ReadFull(rc, data); err != nil {
return fmt.Errorf("read column data: %w", err)
}

for _, wp := range window {
// Find the slice in the data for this column.
var (
columnOffset = wp.Data.GetInfo().MetadataOffset
dataOffset = columnOffset - windowOffset
)

r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize])

md, err := decodeStreamsColumnMetadata(r)
if err != nil {
return err
}

// wp.Position is the position of the column in the original pages
// slice; this retains the proper order of data in results.
results[wp.Position] = md.Pages
}
}

for _, data := range results {
if !yield(data) {
return nil
}
}
Expand All @@ -148,31 +184,51 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C
}

func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(ctx context.Context, page *streamsmd.PageDesc) (dataset.PageData, error) {
rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize))
if err != nil {
return nil, fmt.Errorf("reading page data: %w", err)
return result.Iter(func(yield func(dataset.PageData) bool) error {
results := make([]dataset.PageData, len(pages))

pageInfo := func(p *streamsmd.PageDesc) (uint64, uint64) {
return p.GetInfo().DataOffset, p.GetInfo().DataSize
}
defer rc.Close()

br, release := getBufioReader(rc)
defer release()
// TODO(rfratto): If there are many windows, it may make sense to read them
// in parallel.
for window := range iterWindows(pages, pageInfo, windowSize) {
if len(window) == 0 {
continue
}

data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(br, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}
var (
windowOffset = window.Start().GetInfo().DataOffset
windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset
)

// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(data) {
return fmt.Errorf("reading page data: %w", err)
}
defer rc.Close()

data := make([]byte, windowSize)
if _, err := io.ReadFull(rc, data); err != nil {
return fmt.Errorf("read page data: %w", err)
}

for _, wp := range window {
// Find the slice in the data for this page.
var (
pageOffset = wp.Data.GetInfo().DataOffset
dataOffset = pageOffset - windowOffset
)

// wp.Position is the position of the page in the original pages slice;
// this retains the proper order of data in results.
results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize])
}
}

for _, data := range results {
if !yield(data) {
return nil
}
}
Expand Down Expand Up @@ -206,31 +262,56 @@ func (rd *rangeLogsDecoder) Columns(ctx context.Context, section *filemd.Section
}

func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnDesc) result.Seq[[]*logsmd.PageDesc] {
getPages := func(ctx context.Context, column *logsmd.ColumnDesc) ([]*logsmd.PageDesc, error) {
rc, err := rd.rr.ReadRange(ctx, int64(column.Info.MetadataOffset), int64(column.Info.MetadataSize))
if err != nil {
return nil, fmt.Errorf("reading column metadata: %w", err)
return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
results := make([][]*logsmd.PageDesc, len(columns))

columnInfo := func(c *logsmd.ColumnDesc) (uint64, uint64) {
return c.GetInfo().MetadataOffset, c.GetInfo().MetadataSize
}
defer rc.Close()

br, release := getBufioReader(rc)
defer release()
for window := range iterWindows(columns, columnInfo, windowSize) {
if len(window) == 0 {
continue
}

md, err := decodeLogsColumnMetadata(br)
if err != nil {
return nil, err
}
return md.Pages, nil
}
var (
windowOffset = window.Start().GetInfo().MetadataOffset
windowSize = (window.End().GetInfo().MetadataOffset + window.End().GetInfo().MetadataSize) - windowOffset
)

// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func([]*logsmd.PageDesc) bool) error {
for _, column := range columns {
pages, err := getPages(ctx, column)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(pages) {
return fmt.Errorf("reading column data: %w", err)
}
defer rc.Close()

data := make([]byte, windowSize)
if _, err := io.ReadFull(rc, data); err != nil {
return fmt.Errorf("read column data: %w", err)
}

for _, wp := range window {
// Find the slice in the data for this column.
var (
columnOffset = wp.Data.GetInfo().MetadataOffset
dataOffset = columnOffset - windowOffset
)

r := bytes.NewReader(data[dataOffset : dataOffset+wp.Data.GetInfo().MetadataSize])

md, err := decodeLogsColumnMetadata(r)
if err != nil {
return err
}

// wp.Position is the position of the column in the original pages
// slice; this retains the proper order of data in results.
results[wp.Position] = md.Pages
}
}

for _, data := range results {
if !yield(data) {
return nil
}
}
Expand All @@ -240,31 +321,51 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD
}

func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageDesc) result.Seq[dataset.PageData] {
getPageData := func(ctx context.Context, page *logsmd.PageDesc) (dataset.PageData, error) {
rc, err := rd.rr.ReadRange(ctx, int64(page.Info.DataOffset), int64(page.Info.DataSize))
if err != nil {
return nil, fmt.Errorf("reading page data: %w", err)
return result.Iter(func(yield func(dataset.PageData) bool) error {
results := make([]dataset.PageData, len(pages))

pageInfo := func(p *logsmd.PageDesc) (uint64, uint64) {
return p.GetInfo().DataOffset, p.GetInfo().DataSize
}
defer rc.Close()

br, release := getBufioReader(rc)
defer release()
// TODO(rfratto): If there are many windows, it may make sense to read them
// in parallel.
for window := range iterWindows(pages, pageInfo, windowSize) {
if len(window) == 0 {
continue
}

data := make([]byte, page.Info.DataSize)
if _, err := io.ReadFull(br, data); err != nil {
return nil, fmt.Errorf("read page data: %w", err)
}
return dataset.PageData(data), nil
}
var (
windowOffset = window.Start().GetInfo().DataOffset
windowSize = (window.End().GetInfo().DataOffset + window.End().GetInfo().DataSize) - windowOffset
)

// TODO(rfratto): this retrieves all pages for all columns individually; we
// may be able to batch requests to minimize roundtrips.
return result.Iter(func(yield func(dataset.PageData) bool) error {
for _, page := range pages {
data, err := getPageData(ctx, page)
rc, err := rd.rr.ReadRange(ctx, int64(windowOffset), int64(windowSize))
if err != nil {
return err
} else if !yield(data) {
return fmt.Errorf("reading page data: %w", err)
}
defer rc.Close()

data := make([]byte, windowSize)
if _, err := io.ReadFull(rc, data); err != nil {
return fmt.Errorf("read page data: %w", err)
}

for _, wp := range window {
// Find the slice in the data for this page.
var (
pageOffset = wp.Data.GetInfo().DataOffset
dataOffset = pageOffset - windowOffset
)

// wp.Position is the position of the page in the original pages slice;
// this retains the proper order of data in results.
results[wp.Position] = dataset.PageData(data[dataOffset : dataOffset+wp.Data.GetInfo().DataSize])
}
}

for _, data := range results {
if !yield(data) {
return nil
}
}
Expand Down

0 comments on commit 53fa52c

Please sign in to comment.