Skip to content

Commit

Permalink
Chore: simplify runAsync() and runAsyncAfter() usage in ingest storag…
Browse files Browse the repository at this point in the history
…e tests (#7147)

Signed-off-by: Marco Pracucci <[email protected]>
  • Loading branch information
pracucci authored Jan 17, 2024
1 parent 099a06e commit ac3cb58
Show file tree
Hide file tree
Showing 4 changed files with 8 additions and 20 deletions.
3 changes: 0 additions & 3 deletions pkg/storage/ingest/partition_offset_reader_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -45,7 +45,6 @@ func TestPartitionOffsetReader(t *testing.T) {

// Run few goroutines waiting for the last produced offset.
wg := sync.WaitGroup{}
wg.Add(2)

for i := 0; i < 2; i++ {
runAsync(&wg, func() {
Expand Down Expand Up @@ -151,7 +150,6 @@ func TestPartitionOffsetReader_getLastProducedOffset(t *testing.T) {
})

wg := sync.WaitGroup{}
wg.Add(2)

// Run the 1st getLastProducedOffset() with a timeout which is expected to expire
// before the request will succeed.
Expand Down Expand Up @@ -257,7 +255,6 @@ func TestPartitionOffsetReader_FetchLastProducedOffset(t *testing.T) {
})

wg := sync.WaitGroup{}
wg.Add(2)

// The 1st FetchLastProducedOffset() is called before the service start so it's expected
// to wait the result of the 1st request.
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/ingest/partition_offset_watcher_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -166,7 +166,6 @@ func TestPartitionOffsetWatcher(t *testing.T) {
require.NoError(t, services.StartAndAwaitRunning(ctx, w))

wg := sync.WaitGroup{}
wg.Add(2)

runAsync(&wg, func() {
assert.Equal(t, errPartitionOffsetWatcherStopped, w.Wait(ctx, 1))
Expand Down Expand Up @@ -208,7 +207,6 @@ func TestPartitionOffsetWatcher_Concurrency(t *testing.T) {
t.Cleanup(cancelCtx)

wg := sync.WaitGroup{}
wg.Add(numWatchingGoroutines + numNotifyingGoroutines)

// Start all watching goroutines.
for i := 0; i < numWatchingGoroutines; i++ {
Expand Down Expand Up @@ -264,7 +262,6 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) {

// Start all watching goroutines.
wg := sync.WaitGroup{}
wg.Add(numWatchingGoroutines)

for i := 0; i < numWatchingGoroutines; i++ {
runAsync(&wg, func() {
Expand Down Expand Up @@ -305,7 +302,6 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) {

// Start all watching goroutines.
wg := sync.WaitGroup{}
wg.Add(numWatchingGoroutines)

for i := 0; i < numWatchingGoroutines; i++ {
runAsync(&wg, func() {
Expand Down
4 changes: 0 additions & 4 deletions pkg/storage/ingest/util_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -106,8 +106,6 @@ func TestResultPromise(t *testing.T) {
)

// Spawn few goroutines waiting for the result.
wg.Add(3)

for i := 0; i < 3; i++ {
runAsync(&wg, func() {
actual, err := rw.wait(ctx)
Expand All @@ -132,8 +130,6 @@ func TestResultPromise(t *testing.T) {
)

// Spawn few goroutines waiting for the result.
wg.Add(3)

for i := 0; i < 3; i++ {
runAsync(&wg, func() {
actual, err := rw.wait(ctx)
Expand Down
17 changes: 8 additions & 9 deletions pkg/storage/ingest/writer_test.go
Original file line number Diff line number Diff line change
Expand Up @@ -133,7 +133,6 @@ func TestWriter_WriteSync(t *testing.T) {
})

wg := sync.WaitGroup{}
wg.Add(3)

// Write the first record, which is expected to be sent immediately.
runAsync(&wg, func() {
Expand Down Expand Up @@ -198,7 +197,6 @@ func TestWriter_WriteSync(t *testing.T) {
})

wg := sync.WaitGroup{}
wg.Add(3)

runAsync(&wg, func() {
require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API}))
Expand Down Expand Up @@ -266,11 +264,10 @@ func TestWriter_WriteSync(t *testing.T) {
var (
firstRequest = atomic.NewBool(true)
firstRequestReceived = make(chan struct{})
wg = sync.WaitGroup{}
)

wg := sync.WaitGroup{}
wg.Add(3)

wg.Add(1)
cluster.ControlKey(int16(kmsg.Produce), func(request kmsg.Request) (kmsg.Response, error, bool) {
// Ensure the test waits for this too, since the client request will fail earlier
// (if we don't wait, the test will end before this function and then goleak will
Expand Down Expand Up @@ -349,13 +346,17 @@ func getProduceRequestRecordsCount(req *kmsg.ProduceRequest) (int, error) {
}

func runAsync(wg *sync.WaitGroup, fn func()) {
wg.Add(1)

go func() {
defer wg.Done()
fn()
}()
}

func runAsyncAfter(wg *sync.WaitGroup, waitFor chan struct{}, fn func()) {
wg.Add(1)

go func() {
defer wg.Done()
<-waitFor
Expand All @@ -366,14 +367,12 @@ func runAsyncAfter(wg *sync.WaitGroup, waitFor chan struct{}, fn func()) {
// runAsyncAndAssertCompletionOrder runs all executors functions concurrently and asserts that the functions
// completes in the given order.
func runAsyncAndAssertCompletionOrder(t *testing.T, executors ...func()) {
wg := sync.WaitGroup{}
wg.Add(len(executors))

// Keep track of the actual execution order.
var (
// Keep track of the actual execution order.
actualOrderMx = sync.Mutex{}
actualOrder = make([]int, 0, len(executors))
expectedOrder = make([]int, 0, len(executors))
wg = sync.WaitGroup{}
)

for i, executor := range executors {
Expand Down

0 comments on commit ac3cb58

Please sign in to comment.