Skip to content

Commit 9644000

Browse files
committed
chore(dataobj): close window readers immediately after use
1 parent 53fa52c commit 9644000

File tree

1 file changed

+20
-17
lines changed

1 file changed

+20
-17
lines changed

pkg/dataobj/internal/encoding/decoder_range.go

+20-17
Original file line numberDiff line numberDiff line change
@@ -146,10 +146,8 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C
146146
if err != nil {
147147
return fmt.Errorf("reading column data: %w", err)
148148
}
149-
defer rc.Close()
150-
151-
data := make([]byte, windowSize)
152-
if _, err := io.ReadFull(rc, data); err != nil {
149+
data, err := readAndClose(rc, windowSize)
150+
if err != nil {
153151
return fmt.Errorf("read column data: %w", err)
154152
}
155153

@@ -183,6 +181,17 @@ func (rd *rangeStreamsDecoder) Pages(ctx context.Context, columns []*streamsmd.C
183181
})
184182
}
185183

184+
// readAndClose reads exactly size bytes from rc and then closes it.
185+
func readAndClose(rc io.ReadCloser, size uint64) ([]byte, error) {
186+
defer rc.Close()
187+
188+
data := make([]byte, size)
189+
if _, err := io.ReadFull(rc, data); err != nil {
190+
return nil, fmt.Errorf("read column data: %w", err)
191+
}
192+
return data, nil
193+
}
194+
186195
func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd.PageDesc) result.Seq[dataset.PageData] {
187196
return result.Iter(func(yield func(dataset.PageData) bool) error {
188197
results := make([]dataset.PageData, len(pages))
@@ -207,10 +216,8 @@ func (rd *rangeStreamsDecoder) ReadPages(ctx context.Context, pages []*streamsmd
207216
if err != nil {
208217
return fmt.Errorf("reading page data: %w", err)
209218
}
210-
defer rc.Close()
211-
212-
data := make([]byte, windowSize)
213-
if _, err := io.ReadFull(rc, data); err != nil {
219+
data, err := readAndClose(rc, windowSize)
220+
if err != nil {
214221
return fmt.Errorf("read page data: %w", err)
215222
}
216223

@@ -283,11 +290,9 @@ func (rd *rangeLogsDecoder) Pages(ctx context.Context, columns []*logsmd.ColumnD
283290
if err != nil {
284291
return fmt.Errorf("reading column data: %w", err)
285292
}
286-
defer rc.Close()
287-
288-
data := make([]byte, windowSize)
289-
if _, err := io.ReadFull(rc, data); err != nil {
290-
return fmt.Errorf("read column data: %w", err)
293+
data, err := readAndClose(rc, windowSize)
294+
if err != nil {
295+
return fmt.Errorf("read page data: %w", err)
291296
}
292297

293298
for _, wp := range window {
@@ -344,10 +349,8 @@ func (rd *rangeLogsDecoder) ReadPages(ctx context.Context, pages []*logsmd.PageD
344349
if err != nil {
345350
return fmt.Errorf("reading page data: %w", err)
346351
}
347-
defer rc.Close()
348-
349-
data := make([]byte, windowSize)
350-
if _, err := io.ReadFull(rc, data); err != nil {
352+
data, err := readAndClose(rc, windowSize)
353+
if err != nil {
351354
return fmt.Errorf("read page data: %w", err)
352355
}
353356

0 commit comments

Comments
 (0)