From ac3cb5895912207b98e96729883c843d53a6e361 Mon Sep 17 00:00:00 2001 From: Marco Pracucci Date: Wed, 17 Jan 2024 11:45:44 +0100 Subject: [PATCH] Chore: simplify runAsync() and runAsyncAfter() usage in ingest storage tests (#7147) Signed-off-by: Marco Pracucci --- .../ingest/partition_offset_reader_test.go | 3 --- .../ingest/partition_offset_watcher_test.go | 4 ---- pkg/storage/ingest/util_test.go | 4 ---- pkg/storage/ingest/writer_test.go | 17 ++++++++--------- 4 files changed, 8 insertions(+), 20 deletions(-) diff --git a/pkg/storage/ingest/partition_offset_reader_test.go b/pkg/storage/ingest/partition_offset_reader_test.go index d59def778a3..372e29c3051 100644 --- a/pkg/storage/ingest/partition_offset_reader_test.go +++ b/pkg/storage/ingest/partition_offset_reader_test.go @@ -45,7 +45,6 @@ func TestPartitionOffsetReader(t *testing.T) { // Run few goroutines waiting for the last produced offset. wg := sync.WaitGroup{} - wg.Add(2) for i := 0; i < 2; i++ { runAsync(&wg, func() { @@ -151,7 +150,6 @@ func TestPartitionOffsetReader_getLastProducedOffset(t *testing.T) { }) wg := sync.WaitGroup{} - wg.Add(2) // Run the 1st getLastProducedOffset() with a timeout which is expected to expire // before the request will succeed. @@ -257,7 +255,6 @@ func TestPartitionOffsetReader_FetchLastProducedOffset(t *testing.T) { }) wg := sync.WaitGroup{} - wg.Add(2) // The 1st FetchLastProducedOffset() is called before the service start so it's expected // to wait the result of the 1st request. diff --git a/pkg/storage/ingest/partition_offset_watcher_test.go b/pkg/storage/ingest/partition_offset_watcher_test.go index 33d34da1b30..fec61716b15 100644 --- a/pkg/storage/ingest/partition_offset_watcher_test.go +++ b/pkg/storage/ingest/partition_offset_watcher_test.go @@ -166,7 +166,6 @@ func TestPartitionOffsetWatcher(t *testing.T) { require.NoError(t, services.StartAndAwaitRunning(ctx, w)) wg := sync.WaitGroup{} - wg.Add(2) runAsync(&wg, func() { assert.Equal(t, errPartitionOffsetWatcherStopped, w.Wait(ctx, 1)) @@ -208,7 +207,6 @@ func TestPartitionOffsetWatcher_Concurrency(t *testing.T) { t.Cleanup(cancelCtx) wg := sync.WaitGroup{} - wg.Add(numWatchingGoroutines + numNotifyingGoroutines) // Start all watching goroutines. for i := 0; i < numWatchingGoroutines; i++ { @@ -264,7 +262,6 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) { // Start all watching goroutines. wg := sync.WaitGroup{} - wg.Add(numWatchingGoroutines) for i := 0; i < numWatchingGoroutines; i++ { runAsync(&wg, func() { @@ -305,7 +302,6 @@ func BenchmarkPartitionOffsetWatcher(b *testing.B) { // Start all watching goroutines. wg := sync.WaitGroup{} - wg.Add(numWatchingGoroutines) for i := 0; i < numWatchingGoroutines; i++ { runAsync(&wg, func() { diff --git a/pkg/storage/ingest/util_test.go b/pkg/storage/ingest/util_test.go index 794352896dd..2acee7b66df 100644 --- a/pkg/storage/ingest/util_test.go +++ b/pkg/storage/ingest/util_test.go @@ -106,8 +106,6 @@ func TestResultPromise(t *testing.T) { ) // Spawn few goroutines waiting for the result. - wg.Add(3) - for i := 0; i < 3; i++ { runAsync(&wg, func() { actual, err := rw.wait(ctx) @@ -132,8 +130,6 @@ func TestResultPromise(t *testing.T) { ) // Spawn few goroutines waiting for the result. - wg.Add(3) - for i := 0; i < 3; i++ { runAsync(&wg, func() { actual, err := rw.wait(ctx) diff --git a/pkg/storage/ingest/writer_test.go b/pkg/storage/ingest/writer_test.go index 5684ad770fa..2284d29312a 100644 --- a/pkg/storage/ingest/writer_test.go +++ b/pkg/storage/ingest/writer_test.go @@ -133,7 +133,6 @@ func TestWriter_WriteSync(t *testing.T) { }) wg := sync.WaitGroup{} - wg.Add(3) // Write the first record, which is expected to be sent immediately. runAsync(&wg, func() { @@ -198,7 +197,6 @@ func TestWriter_WriteSync(t *testing.T) { }) wg := sync.WaitGroup{} - wg.Add(3) runAsync(&wg, func() { require.NoError(t, writer.WriteSync(ctx, partitionID, tenantID, &mimirpb.WriteRequest{Timeseries: series1, Metadata: nil, Source: mimirpb.API})) @@ -266,11 +264,10 @@ func TestWriter_WriteSync(t *testing.T) { var ( firstRequest = atomic.NewBool(true) firstRequestReceived = make(chan struct{}) + wg = sync.WaitGroup{} ) - wg := sync.WaitGroup{} - wg.Add(3) - + wg.Add(1) cluster.ControlKey(int16(kmsg.Produce), func(request kmsg.Request) (kmsg.Response, error, bool) { // Ensure the test waits for this too, since the client request will fail earlier // (if we don't wait, the test will end before this function and then goleak will @@ -349,6 +346,8 @@ func getProduceRequestRecordsCount(req *kmsg.ProduceRequest) (int, error) { } func runAsync(wg *sync.WaitGroup, fn func()) { + wg.Add(1) + go func() { defer wg.Done() fn() @@ -356,6 +355,8 @@ func runAsync(wg *sync.WaitGroup, fn func()) { } func runAsyncAfter(wg *sync.WaitGroup, waitFor chan struct{}, fn func()) { + wg.Add(1) + go func() { defer wg.Done() <-waitFor @@ -366,14 +367,12 @@ func runAsyncAfter(wg *sync.WaitGroup, waitFor chan struct{}, fn func()) { // runAsyncAndAssertCompletionOrder runs all executors functions concurrently and asserts that the functions // completes in the given order. func runAsyncAndAssertCompletionOrder(t *testing.T, executors ...func()) { - wg := sync.WaitGroup{} - wg.Add(len(executors)) - - // Keep track of the actual execution order. var ( + // Keep track of the actual execution order. actualOrderMx = sync.Mutex{} actualOrder = make([]int, 0, len(executors)) expectedOrder = make([]int, 0, len(executors)) + wg = sync.WaitGroup{} ) for i, executor := range executors {