From 4d7c7f3d750eeae0306b1de912febc77ea8eb67d Mon Sep 17 00:00:00 2001 From: =?UTF-8?q?Bj=C3=B6rn=20Kautler?= Date: Sun, 26 Mar 2023 20:00:39 +0200 Subject: [PATCH] Fix usages of some async utilities --- .../builtin/RepeatUntilFailureExtension.java | 8 +-- .../extension/builtin/TimeoutInterceptor.java | 59 ++++++++++++------- .../util/concurrent/AsyncConditions.java | 7 +-- .../smoke/extension/ParallelSpec.groovy | 6 +- .../InvokingMocksFromMultipleThreads.groovy | 26 ++------ 5 files changed, 51 insertions(+), 55 deletions(-) diff --git a/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/RepeatUntilFailureExtension.java b/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/RepeatUntilFailureExtension.java index 52af45d0fc..41ddf83202 100644 --- a/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/RepeatUntilFailureExtension.java +++ b/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/RepeatUntilFailureExtension.java @@ -37,12 +37,8 @@ public void runIterations(IDataIterator dataIterator, IIterationRunner iteration dataIterator.forEachRemaining(arguments::add); for (int attempt = 0; attempt < maxAttempts; attempt++) { for (Object[] args : arguments) { - try { - ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).get(); - if (executionResult == ExecutionResult.FAILED) { - return; - } - } catch (InterruptedException | ExecutionException e) { + ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).join(); + if (executionResult == ExecutionResult.FAILED) { return; } } diff --git a/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/TimeoutInterceptor.java b/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/TimeoutInterceptor.java index 28bc6da492..2c0e8050fe 100644 --- a/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/TimeoutInterceptor.java +++ b/spock-core/src/main/java/org/spockframework/runtime/extension/builtin/TimeoutInterceptor.java @@ -70,9 +70,34 @@ public void run() { long timeoutAt = 0; int unsuccessfulInterruptAttempts = 0; - syncWithThread(startLatch, "feature", methodName); + boolean syncedWithFeature = false; + try { + startLatch.countDown(); + syncedWithFeature = startLatch.await(5, TimeUnit.SECONDS); + } catch (InterruptedException ignored) { + // this is our own thread, so we can ignore the interruption safely + } + if (!syncedWithFeature) { + System.out.printf("[spock.lang.Timeout] Could not sync with Feature for method '%s'", methodName); + } + while (waitMillis > 0) { + long waitStart = System.nanoTime(); + try { + synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS); + } catch (InterruptedException ignored) { + // this is our own thread, so we can ignore the interruption safely and continue the remaining waiting + waitMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - waitStart); + continue; + } + break; + } + if (!synced) { + stackTrace = mainThread.getStackTrace(); + waitMillis = 250; + } while (!synced) { + mainThread.interrupt(); try { synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS); } catch (InterruptedException ignored) { @@ -80,23 +105,22 @@ public void run() { // the latter returns. Once this mission has been accomplished, this thread will die quickly } if (!synced) { - long now = System.nanoTime(); - if (stackTrace.length == 0) { - logMethodTimeout(methodName, timeoutSeconds); - stackTrace = mainThread.getStackTrace(); - waitMillis = 250; - timeoutAt = now; - } else { - waitMillis *= 2; - logUnsuccessfulInterrupt(methodName, now, timeoutAt, waitMillis, ++unsuccessfulInterruptAttempts); - } - mainThread.interrupt(); + System.out.printf("[spock.lang.Timeout] Method '%s' has not yet returned - interrupting. Next try in %1.2f seconds.\n", + methodName, waitMillis / 1000.); } } } }.start(); - syncWithThread(startLatch, "watcher", methodName); + boolean syncedWithWatcher = false; + try { + startLatch.countDown(); + syncedWithWatcher = startLatch.await(5, TimeUnit.SECONDS); + } finally { + if (!syncedWithWatcher) { + System.out.printf("[spock.lang.Timeout] Could not sync with Watcher for method '%s'", invocation.getMethod().getName()); + } + } Throwable saved = null; try { @@ -216,13 +240,4 @@ private static Pair findThreadSection(List lines, Stri return null; } - - private static void syncWithThread(CountDownLatch startLatch, String threadName, String methodName) { - try { - startLatch.countDown(); - startLatch.await(5, TimeUnit.SECONDS); - } catch (InterruptedException ignored) { - System.out.printf("[spock.lang.Timeout] Could not sync with %s thread for method '%s'", threadName, methodName); - } - } } diff --git a/spock-core/src/main/java/spock/util/concurrent/AsyncConditions.java b/spock-core/src/main/java/spock/util/concurrent/AsyncConditions.java index b010f78793..a56657a979 100644 --- a/spock-core/src/main/java/spock/util/concurrent/AsyncConditions.java +++ b/spock-core/src/main/java/spock/util/concurrent/AsyncConditions.java @@ -131,15 +131,14 @@ public void await() throws Throwable { * @throws Throwable the first exception thrown by an evaluate block */ public void await(double seconds) throws Throwable { - latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS); + boolean evalBlocksFinished = latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS); if (!exceptions.isEmpty()) throw exceptions.poll(); - long pendingEvalBlocks = latch.getCount(); - if (pendingEvalBlocks > 0) { + if (!evalBlocksFinished) { String msg = String.format("Async conditions timed out " + "after %1.2f seconds; %d out of %d evaluate blocks did not complete in time", - seconds, pendingEvalBlocks, numEvalBlocks); + seconds, latch.getCount(), numEvalBlocks); throw new SpockTimeoutError(seconds, msg); } } diff --git a/spock-specs/src/test/groovy/org/spockframework/smoke/extension/ParallelSpec.groovy b/spock-specs/src/test/groovy/org/spockframework/smoke/extension/ParallelSpec.groovy index 8e24026c35..fdc9fdc2d1 100644 --- a/spock-specs/src/test/groovy/org/spockframework/smoke/extension/ParallelSpec.groovy +++ b/spock-specs/src/test/groovy/org/spockframework/smoke/extension/ParallelSpec.groovy @@ -265,7 +265,7 @@ class ParallelSpec extends EmbeddedSpecification { @ResourceLock(value = "a", mode = ResourceAccessMode.READ) def writeA() { when: - incrementAndBlock(atomicInteger, latch) + incrementAndBlock(atomicInteger, latch, 10000) then: atomicInteger.get() == 3 @@ -519,7 +519,7 @@ class ParallelSpec extends EmbeddedSpecification { throws InterruptedException { int value = sharedResource.incrementAndGet() countDownLatch.countDown() - countDownLatch.await(timeout, MILLISECONDS) + assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired' return value } @@ -527,7 +527,7 @@ class ParallelSpec extends EmbeddedSpecification { throws InterruptedException { int value = sharedResource.get() countDownLatch.countDown() - countDownLatch.await(timeout, MILLISECONDS) + assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired' assert value == sharedResource.get() } } diff --git a/spock-specs/src/test/groovy/org/spockframework/smoke/mock/InvokingMocksFromMultipleThreads.groovy b/spock-specs/src/test/groovy/org/spockframework/smoke/mock/InvokingMocksFromMultipleThreads.groovy index 8a2cc0d927..8976e695f8 100644 --- a/spock-specs/src/test/groovy/org/spockframework/smoke/mock/InvokingMocksFromMultipleThreads.groovy +++ b/spock-specs/src/test/groovy/org/spockframework/smoke/mock/InvokingMocksFromMultipleThreads.groovy @@ -33,14 +33,11 @@ class InvokingMocksFromMultipleThreads extends Specification { def numThreads = 10 def list = Mock(List) def latch = new CountDownLatch(numThreads) - @AutoCleanup("shutdownNow") - @Shared - def executorService = createExecutorService() def "invoking a mock from multiple threads"() { when: numThreads.times { threadId -> - executorService.submit { + Thread.start { try { 100.times { count -> list.add(count) @@ -51,7 +48,7 @@ class InvokingMocksFromMultipleThreads extends Specification { } } } - awaitLatch() + assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired' then: interaction { @@ -63,7 +60,7 @@ class InvokingMocksFromMultipleThreads extends Specification { def "invoking a mock from multiple threads - too many invocations"() { when: numThreads.times { threadId -> - executorService.submit { + Thread.start { try { 100.times { count -> list.add(count) @@ -77,7 +74,7 @@ class InvokingMocksFromMultipleThreads extends Specification { } } } - awaitLatch() + assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired' then: interaction { @@ -89,7 +86,7 @@ class InvokingMocksFromMultipleThreads extends Specification { def "invoking a mock from multiple threads - too few invocations"() { when: numThreads.times { threadId -> - executorService.submit { + Thread.start { try { 100.times { count -> if (!(threadId == 0 && count == 99)) list.add(count) @@ -102,22 +99,11 @@ class InvokingMocksFromMultipleThreads extends Specification { } } } - awaitLatch() + assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired' then: interaction { 100.times { count -> numThreads * list.add(count) } } } - - - private static ExecutorService createExecutorService() { - return Jvm.current.java21Compatible ? Executors."newVirtualThreadPerTaskExecutor"() : Executors.newCachedThreadPool() { new Thread(it).tap { it.daemon = true } } - } - - private void awaitLatch() { - if (!latch.await(WAIT_TIME_S, TimeUnit.SECONDS)) { - throw new IllegalStateException("The test threads did not terminate in ${WAIT_TIME_S}s.") - } - } }