Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

ingest: add long polling to TestConcurrentFetchers #9971

Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
178 changes: 77 additions & 101 deletions pkg/storage/ingest/fetcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -352,7 +352,7 @@ func TestConcurrentFetchers(t *testing.T) {

fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, concurrency, 0)

fetches, _ := fetchers.PollFetches(ctx)
fetches := longPollFetches(fetchers, 5, 2*time.Second)
assert.Equal(t, fetches.NumRecords(), 5)

// We expect no more records returned by PollFetches() and no buffered records.
Expand All @@ -375,7 +375,7 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i)))
}

fetches, _ := fetchers.PollFetches(ctx)
fetches := longPollFetches(fetchers, 3, 2*time.Second)
assert.Equal(t, fetches.NumRecords(), 3)

// We expect no more records returned by PollFetches() and no buffered records.
Expand Down Expand Up @@ -448,11 +448,8 @@ func TestConcurrentFetchers(t *testing.T) {
}

// Consume all expected records.
consumedRecords := 0
for consumedRecords < 10 {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, 10, 2*time.Second)
consumedRecords := fetches.NumRecords()
assert.Equal(t, 10, consumedRecords)

// We expect no more records returned by PollFetches() and no buffered records.
Expand Down Expand Up @@ -481,11 +478,8 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(fmt.Sprintf("record-%d", i)))
}

var totalRecords int
for totalRecords < 20 {
fetches, _ := fetchers.PollFetches(ctx)
totalRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, 20, 2*time.Second)
totalRecords := fetches.NumRecords()

assert.Equal(t, 20, totalRecords)

Expand Down Expand Up @@ -522,12 +516,10 @@ func TestConcurrentFetchers(t *testing.T) {

const expectedRecords = 5
fetchedRecordsContents := make([]string, 0, expectedRecords)
for len(fetchedRecordsContents) < expectedRecords {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value))
})
}
fetches := longPollFetches(fetchers, expectedRecords, 2*time.Second)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsContents = append(fetchedRecordsContents, string(r.Value))
})

assert.Equal(t, []string{
"record-4",
Expand Down Expand Up @@ -566,13 +558,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Poll for fetches and verify
fetchedRecords := make([]string, 0, recordsPerRound)
for len(fetchedRecords) < recordsPerRound {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, string(r.Value))
t.Log("fetched", r.Offset, string(r.Value))
})
}
fetches := longPollFetches(fetchers, recordsPerRound, 2*time.Second)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, string(r.Value))
t.Log("fetched", r.Offset, string(r.Value))
})

// Verify fetched records
assert.Equal(t, expectedRecords, fetchedRecords, "Fetched records in round %d do not match expected", round)
Expand All @@ -599,11 +589,7 @@ func TestConcurrentFetchers(t *testing.T) {
producedOffset := produceRecord(ctx, t, client, topicName, partitionID, record)
// verify that the record is fetched.

var fetches kgo.Fetches
require.Eventually(t, func() bool {
fetches, _ = fetchers.PollFetches(ctx)
return len(fetches.Records()) == 1
}, 5*time.Second, 100*time.Millisecond)
fetches := longPollFetches(fetchers, 1, 5*time.Second)

require.Equal(t, fetches.Records()[0].Value, record)
require.Equal(t, fetches.Records()[0].Offset, producedOffset)
Expand Down Expand Up @@ -667,41 +653,31 @@ func TestConcurrentFetchers(t *testing.T) {
fetchers := createConcurrentFetchers(ctx, t, client, topicName, partitionID, 0, initialConcurrency, 0)

fetchedRecords := make([]*kgo.Record, 0)
fetchedCount := atomic.NewInt64(0)

fetchRecords := func(duration time.Duration) {
deadline := time.Now().Add(duration)
for time.Now().Before(deadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
fetchedCount.Inc()
})
}
}

// Initial fetch with starting concurrency
fetchRecords(2 * time.Second)
initialFetched := fetchedCount.Load()
fetches := longPollFetches(fetchers, math.MaxInt, 2*time.Second)
initialFetched := fetches.NumRecords()

// Update to higher concurrency
fetchers.Update(ctx, 4)
fetchRecords(3 * time.Second)
highConcurrencyFetched := fetchedCount.Load() - initialFetched
fetches = longPollFetches(fetchers, math.MaxInt, 3*time.Second)
highConcurrencyFetched := fetches.NumRecords()

// Update to lower concurrency
fetchers.Update(ctx, 1)
fetchRecords(3 * time.Second)
fetches = longPollFetches(fetchers, math.MaxInt, 3*time.Second)
lowerConcurrentFetched := fetches.NumRecords()

cancelProduce()
// Produce everything that's left now.
fetchRecords(time.Second)
// Consume everything that's left now.
fetches = longPollFetches(fetchers, math.MaxInt, 2*time.Second)
finalFetched := fetches.NumRecords()
totalProduced := producedCount.Load()
totalFetched := fetchedCount.Load()
totalFetched := initialFetched + highConcurrencyFetched + lowerConcurrentFetched + finalFetched

// Verify fetched records
assert.True(t, totalFetched > 0, "Expected to fetch some records")
assert.Equal(t, totalFetched, totalProduced, "Should not fetch more records than produced")
assert.Equal(t, int64(totalFetched), totalProduced, "Should not fetch more records than produced")
assert.True(t, highConcurrencyFetched > initialFetched, "Expected to fetch more records with higher concurrency")

// Verify record contents
Expand Down Expand Up @@ -754,16 +730,9 @@ func TestConcurrentFetchers(t *testing.T) {
produceRecord(ctx, t, client, topicName, partitionID, []byte(record))
}

fetchedRecords := make([]*kgo.Record, 0, additionalRecords)
fetchDeadline := time.Now().Add(5 * time.Second)

// Fetch records
for len(fetchedRecords) < additionalRecords && time.Now().Before(fetchDeadline) {
fetches, _ := fetchers.PollFetches(ctx)
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecords = append(fetchedRecords, r)
})
}
fetches := longPollFetches(fetchers, additionalRecords, 5*time.Second)
fetchedRecords := fetches.Records()

// Verify fetched records
assert.LessOrEqual(t, len(fetchedRecords), additionalRecords,
Expand Down Expand Up @@ -813,13 +782,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Expect that we've received all records.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initiallyProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches := longPollFetches(fetchers, initiallyProducedRecords, 2*time.Second)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

// Produce a few more records
const additionalRecords = 3
Expand All @@ -830,13 +797,11 @@ func TestConcurrentFetchers(t *testing.T) {
}

// Fetchers shouldn't be stalled and should continue fetching as the HWM moves forward.
for len(fetchedRecordsBytes) < initiallyProducedRecords+additionalRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches = longPollFetches(fetchers, additionalRecords, 2*time.Second)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

assert.Equal(t, producedRecordsBytes, fetchedRecordsBytes)

Expand Down Expand Up @@ -878,13 +843,11 @@ func TestConcurrentFetchers(t *testing.T) {

// Fetch and verify records; this should unblock the fetchers.
var fetchedRecordsBytes [][]byte
for len(fetchedRecordsBytes) < initialRecords {
fetches, _ := fetchers.PollFetches(ctx)
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})
}
fetches := longPollFetches(fetchers, initialRecords, 2*time.Second) // Ensure no more records are fetched.
assert.NoError(t, fetches.Err())
fetches.EachRecord(func(r *kgo.Record) {
fetchedRecordsBytes = append(fetchedRecordsBytes, r.Value)
})

// Set up control function to monitor fetch requests
var checkRequestOffset func(req kmsg.Request) (kmsg.Response, error, bool)
Expand Down Expand Up @@ -1057,10 +1020,9 @@ func TestConcurrentFetchers(t *testing.T) {
assert.LessOrEqualf(t, fetchers.BufferedRecords(), int64(maxInflightBytes), "Should still not buffer more than %d bytes after consuming some records", maxInflightBytes)

// Consume all remaining records and verify total
for totalConsumedRecords < totalProducedRecords {
fetches, _ = fetchers.PollFetches(ctx)
totalConsumedRecords += fetches.NumRecords()
}
// We produce a lot of data, give enough time so that the slow CI doesn't flake
fetches = longPollFetches(fetchers, totalProducedRecords-totalConsumedRecords, 20*time.Second)
totalConsumedRecords += fetches.NumRecords()

// Allow time for more fetches
waitForStableBufferedRecords(t, fetchers)
Expand All @@ -1085,6 +1047,8 @@ func TestConcurrentFetchers(t *testing.T) {
smallRecordSize = 1000
)

require.True(t, smallRecordsCount%2 == 0, "we divide the smallRecordsCount by 2 later on, it must be divisible by 2")

ctx, cancel := context.WithTimeout(context.Background(), 30*time.Second)
defer cancel()

Expand All @@ -1107,11 +1071,8 @@ func TestConcurrentFetchers(t *testing.T) {

assert.LessOrEqualf(t, fetchers.BufferedBytes(), int64(maxInflightBytes), "Should not buffer more than %d bytes of large records", maxInflightBytes)
// Consume all large records
consumedRecords := 0
for consumedRecords < largeRecordsCount {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches := longPollFetches(fetchers, largeRecordsCount, 10*time.Second)
consumedRecords := fetches.NumRecords()

pollFetchesAndAssertNoRecords(t, fetchers)
t.Log("Consumed all large records")
Expand All @@ -1125,10 +1086,8 @@ func TestConcurrentFetchers(t *testing.T) {
t.Logf("Produced %d small records", smallRecordsCount)

// Consume half of the small records. This should be enough to stabilize the records size estimation.
for consumedRecords < largeRecordsCount+smallRecordsCount/2 {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed half of the small records")

// Assert that the buffer is well utilized.
Expand All @@ -1139,14 +1098,13 @@ func TestConcurrentFetchers(t *testing.T) {
assert.GreaterOrEqual(t, fetchers.BufferedBytes(), int64(maxInflightBytes/2), "Should still buffer a decent number of records")

// Consume the rest of the small records.
const totalProducedRecords = largeRecordsCount + smallRecordsCount
for consumedRecords < totalProducedRecords {
fetches, _ := fetchers.PollFetches(ctx)
consumedRecords += fetches.NumRecords()
}
// Consume half of the small records. This should be enough to stabilize the records size estimation.
fetches = longPollFetches(fetchers, smallRecordsCount/2, 10*time.Second)
consumedRecords += fetches.NumRecords()
t.Log("Consumed rest of the small records")

// Verify we received correct number of records
const totalProducedRecords = largeRecordsCount + smallRecordsCount
assert.Equal(t, totalProducedRecords, consumedRecords, "Should have consumed all records")

// Verify no more records are buffered. First wait for the buffered records to stabilize.
Expand Down Expand Up @@ -1374,7 +1332,8 @@ func TestFetchResult_Merge(t *testing.T) {
}

func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Client, topic string, partition int32, startOffset int64, concurrency int, maxInflightBytes int32) *concurrentFetchers {
logger := log.NewNopLogger()
logger := testingLogger.WithT(t)

reg := prometheus.NewPedanticRegistry()
metrics := newReaderMetrics(partition, reg, noopReaderMetricsSource{})

Expand Down Expand Up @@ -1419,6 +1378,23 @@ func createConcurrentFetchers(ctx context.Context, t *testing.T, client *kgo.Cli
return f
}

// longPollFetches polls fetches until the timeout is reached or the number of records is at least minRecords.
func longPollFetches(fetchers *concurrentFetchers, minRecords int, timeout time.Duration) kgo.Fetches {
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()

allFetches := make(kgo.Fetches, 0)
for ctx.Err() == nil && allFetches.NumRecords() < minRecords {
fetches, _ := fetchers.PollFetches(ctx)
if fetches.Err() != nil {
continue
}
allFetches = append(allFetches, fetches...)
}

return allFetches
}

// pollFetchesAndAssertNoRecords ensures that PollFetches() returns 0 records and there are
// no buffered records in fetchers. Since some records are discarded in the PollFetches(),
// we may have to call it multiple times to process all buffered records that need to be
Expand All @@ -1439,7 +1415,7 @@ func pollFetchesAndAssertNoRecords(t *testing.T, fetchers *concurrentFetchers) {
}

// We always expect that PollFetches() returns zero records.
require.Len(t, fetches.Records(), 0)
assert.Len(t, fetches.Records(), 0)

// If there are no buffered records, we're good. We can end the assertion.
if fetchers.BufferedRecords() == 0 {
Expand Down
Loading