Skip to content

Commit

Permalink
Fix usages of some async utilities
Browse files Browse the repository at this point in the history
  • Loading branch information
Vampire committed Mar 21, 2024
1 parent 9ecb29e commit afd35f8
Show file tree
Hide file tree
Showing 5 changed files with 51 additions and 55 deletions.
Original file line number Diff line number Diff line change
Expand Up @@ -37,12 +37,8 @@ public void runIterations(IDataIterator dataIterator, IIterationRunner iteration
dataIterator.forEachRemaining(arguments::add);
for (int attempt = 0; attempt < maxAttempts; attempt++) {
for (Object[] args : arguments) {
try {
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).get();
if (executionResult == ExecutionResult.FAILED) {
return;
}
} catch (InterruptedException | ExecutionException e) {
ExecutionResult executionResult = iterationRunner.runIteration(args, maxIterations).join();
if (executionResult == ExecutionResult.FAILED) {
return;
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -70,33 +70,57 @@ public void run() {
long timeoutAt = 0;
int unsuccessfulInterruptAttempts = 0;

syncWithThread(startLatch, "feature", methodName);
boolean syncedWithFeature = false;
try {
startLatch.countDown();
syncedWithFeature = startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely
}
if (!syncedWithFeature) {
System.out.printf("[spock.lang.Timeout] Could not sync with Feature for method '%s'", methodName);
}

while (waitMillis > 0) {
long waitStart = System.nanoTime();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// this is our own thread, so we can ignore the interruption safely and continue the remaining waiting
waitMillis -= TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - waitStart);
continue;
}
break;
}
if (!synced) {
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
}
while (!synced) {
mainThread.interrupt();
try {
synced = sync.offer(stackTrace, waitMillis, TimeUnit.MILLISECONDS);
} catch (InterruptedException ignored) {
// The mission of this thread is to repeatedly interrupt the main thread until
// the latter returns. Once this mission has been accomplished, this thread will die quickly
}
if (!synced) {
long now = System.nanoTime();
if (stackTrace.length == 0) {
logMethodTimeout(methodName, timeoutSeconds);
stackTrace = mainThread.getStackTrace();
waitMillis = 250;
timeoutAt = now;
} else {
waitMillis *= 2;
logUnsuccessfulInterrupt(methodName, now, timeoutAt, waitMillis, ++unsuccessfulInterruptAttempts);
}
mainThread.interrupt();
System.out.printf("[spock.lang.Timeout] Method '%s' has not yet returned - interrupting. Next try in %1.2f seconds.\n",
methodName, waitMillis / 1000.);
}
}
}
}.start();

syncWithThread(startLatch, "watcher", methodName);
boolean syncedWithWatcher = false;
try {
startLatch.countDown();
syncedWithWatcher = startLatch.await(5, TimeUnit.SECONDS);
} finally {
if (!syncedWithWatcher) {
System.out.printf("[spock.lang.Timeout] Could not sync with Watcher for method '%s'", invocation.getMethod().getName());
}
}

Throwable saved = null;
try {
Expand Down Expand Up @@ -216,13 +240,4 @@ private static Pair<Integer, Integer> findThreadSection(List<String> lines, Stri

return null;
}

private static void syncWithThread(CountDownLatch startLatch, String threadName, String methodName) {
try {
startLatch.countDown();
startLatch.await(5, TimeUnit.SECONDS);
} catch (InterruptedException ignored) {
System.out.printf("[spock.lang.Timeout] Could not sync with %s thread for method '%s'", threadName, methodName);
}
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -131,15 +131,14 @@ public void await() throws Throwable {
* @throws Throwable the first exception thrown by an evaluate block
*/
public void await(double seconds) throws Throwable {
latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
boolean evalBlocksFinished = latch.await((long) (seconds * 1000), TimeUnit.MILLISECONDS);
if (!exceptions.isEmpty())
throw exceptions.poll();

long pendingEvalBlocks = latch.getCount();
if (pendingEvalBlocks > 0) {
if (!evalBlocksFinished) {
String msg = String.format("Async conditions timed out " +
"after %1.2f seconds; %d out of %d evaluate blocks did not complete in time",
seconds, pendingEvalBlocks, numEvalBlocks);
seconds, latch.getCount(), numEvalBlocks);
throw new SpockTimeoutError(seconds, msg);
}
}
Expand Down
Original file line number Diff line number Diff line change
Expand Up @@ -265,7 +265,7 @@ class ParallelSpec extends EmbeddedSpecification {
@ResourceLock(value = "a", mode = ResourceAccessMode.READ)
def writeA() {
when:
incrementAndBlock(atomicInteger, latch)
incrementAndBlock(atomicInteger, latch, 10000)
then:
atomicInteger.get() == 3
Expand Down Expand Up @@ -519,15 +519,15 @@ class ParallelSpec extends EmbeddedSpecification {
throws InterruptedException {
int value = sharedResource.incrementAndGet()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
return value
}

static void storeAndBlockAndCheck(AtomicInteger sharedResource, CountDownLatch countDownLatch, long timeout = 100)
throws InterruptedException {
int value = sharedResource.get()
countDownLatch.countDown()
countDownLatch.await(timeout, MILLISECONDS)
assert countDownLatch.await(timeout, MILLISECONDS) : 'Timeout expired'
assert value == sharedResource.get()
}
}
Original file line number Diff line number Diff line change
Expand Up @@ -33,14 +33,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
def numThreads = 10
def list = Mock(List)
def latch = new CountDownLatch(numThreads)
@AutoCleanup("shutdownNow")
@Shared
def executorService = createExecutorService()

def "invoking a mock from multiple threads"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
list.add(count)
Expand All @@ -51,7 +48,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -63,7 +60,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
def "invoking a mock from multiple threads - too many invocations"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
list.add(count)
Expand All @@ -77,7 +74,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
Expand All @@ -89,7 +86,7 @@ class InvokingMocksFromMultipleThreads extends Specification {
def "invoking a mock from multiple threads - too few invocations"() {
when:
numThreads.times { threadId ->
executorService.submit {
Thread.start {
try {
100.times { count ->
if (!(threadId == 0 && count == 99)) list.add(count)
Expand All @@ -102,22 +99,11 @@ class InvokingMocksFromMultipleThreads extends Specification {
}
}
}
awaitLatch()
assert latch.await(10, TimeUnit.SECONDS) : 'Timeout expired'

then:
interaction {
100.times { count -> numThreads * list.add(count) }
}
}


private static ExecutorService createExecutorService() {
return Jvm.current.java21Compatible ? Executors."newVirtualThreadPerTaskExecutor"() : Executors.newCachedThreadPool() { new Thread(it).tap { it.daemon = true } }
}

private void awaitLatch() {
if (!latch.await(WAIT_TIME_S, TimeUnit.SECONDS)) {
throw new IllegalStateException("The test threads did not terminate in ${WAIT_TIME_S}s.")
}
}
}

0 comments on commit afd35f8

Please sign in to comment.