1
1
package se .krka .futures ;
2
2
3
+ import com .google .common .util .concurrent .MoreExecutors ;
3
4
import org .junit .Test ;
4
5
5
6
import java .io .Closeable ;
6
- import java .io .IOException ;
7
7
import java .util .concurrent .CompletableFuture ;
8
8
import java .util .concurrent .ExecutionException ;
9
+ import java .util .concurrent .Executor ;
9
10
import java .util .concurrent .TimeUnit ;
10
11
import java .util .concurrent .TimeoutException ;
12
+ import java .util .concurrent .atomic .AtomicBoolean ;
11
13
12
14
import static org .junit .Assert .assertEquals ;
13
15
import static org .junit .Assert .assertTrue ;
14
- import static org .junit .Assert .fail ;
15
16
16
17
public class OnTimeoutTest {
17
18
@ Test
18
19
public void testTimeoutNotBlocking () throws Exception {
20
+ waitForNotBlocking ();
19
21
assertTrue (measureTimeout () < 200 );
20
22
}
21
23
22
24
@ Test
23
25
public void testTimeoutNotBlockingTwice () throws Exception {
26
+ waitForNotBlocking ();
24
27
assertTrue (measureTimeout () < 200 );
25
28
assertTrue (measureTimeout () < 200 );
26
29
}
27
30
28
31
@ Test
29
32
public void testTimeoutBlocking () throws Exception {
33
+ waitForNotBlocking ();
30
34
try (Killer killer = new Killer (killTimeout ())) {
31
35
expectFailure ();
32
36
}
33
37
}
34
38
35
39
@ Test
36
40
public void testTimeoutBlocking2 () throws Exception {
41
+ waitForNotBlocking ();
37
42
try (Killer killer = new Killer (killTimeout2 ())) {
38
43
expectFailure ();
39
44
}
40
45
}
41
46
47
+ @ Test
48
+ public void testTimeoutBlockingWithDelayedExecutor () throws Exception {
49
+ waitForNotBlocking ();
50
+ try (Killer killer = new Killer (killTimeout3 ())) {
51
+ expectFailure ();
52
+ }
53
+ }
54
+
42
55
private void expectFailure () throws Exception {
43
56
long time = measureTimeout ();
44
- assertTrue ("time was " + time , time >= 1000 );
57
+ assertTrue ("time was " + time , time >= 900 );
45
58
}
46
59
47
60
private long measureTimeout () throws InterruptedException , TimeoutException {
@@ -59,36 +72,63 @@ private long measureTimeout() throws InterruptedException, TimeoutException {
59
72
}
60
73
61
74
private CompletableFuture <String > killTimeout () {
62
- return new CompletableFuture <String >()
63
- .orTimeout (1 , TimeUnit .MILLISECONDS )
64
- .handle ((s , t ) -> {
65
- try {
66
- sleepOnThread ();
67
- return "" ;
68
- } catch (InterruptedException e ) {
69
- throw new RuntimeException (e );
70
- }
71
- })
72
- .exceptionally (Throwable ::getMessage );
75
+ waitForNotBlocking ();
76
+ CompletableFuture <String > future = new CompletableFuture <String >()
77
+ .orTimeout (10 , TimeUnit .MILLISECONDS )
78
+ .exceptionally (Throwable ::getMessage )
79
+ .thenApply (s -> sleepOnThread (s , 1000 ));
80
+ waitForBlocking ();
81
+ return future ;
73
82
}
74
83
75
84
private CompletableFuture <String > killTimeout2 () {
76
- return new CompletableFuture <String >()
77
- .completeOnTimeout ("" , 1 , TimeUnit .MILLISECONDS )
78
- .thenApply (s -> {
79
- try {
80
- sleepOnThread ();
81
- return s ;
82
- } catch (InterruptedException e ) {
83
- return s ;
84
- }
85
- });
85
+ CompletableFuture <String > future = new CompletableFuture <String >()
86
+ .completeOnTimeout ("" , 10 , TimeUnit .MILLISECONDS )
87
+ .thenApply (s -> sleepOnThread (s , 1000 ));
88
+ waitForBlocking ();
89
+ return future ;
86
90
}
87
91
88
- private void sleepOnThread () throws InterruptedException {
89
- int millis = 1000 ;
90
- System .out .println ("Sleeping on " + Util .currThread () + " for " + millis + " ms" );
91
- Thread .sleep (millis );
92
+ private CompletableFuture <String > killTimeout3 () {
93
+ Executor executor = CompletableFuture .delayedExecutor (1 , TimeUnit .MILLISECONDS , MoreExecutors .directExecutor ());
94
+ CompletableFuture <String > future = new CompletableFuture <>();
95
+ executor .execute (() -> {
96
+ sleepOnThread (null , 1000 );
97
+ future .complete ("value" );
98
+ });
99
+ waitForBlocking ();
100
+ return future ;
101
+ }
102
+
103
+ private static final AtomicBoolean BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER = new AtomicBoolean (false );
104
+
105
+ private <T > T sleepOnThread (T value , int millis ) {
106
+ if (Util .currThread ().equals ("CompletableFutureDelayScheduler" )) {
107
+ BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER .set (true );
108
+ }
109
+ try {
110
+ System .out .println ("Sleeping on " + Util .currThread () + " for " + millis + " ms" );
111
+ Thread .sleep (millis );
112
+ } catch (InterruptedException e ) {
113
+ e .printStackTrace ();
114
+ } finally {
115
+ if (Util .currThread ().equals ("CompletableFutureDelayScheduler" )) {
116
+ BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER .set (false );
117
+ }
118
+ }
119
+ return value ;
120
+ }
121
+
122
+ private void waitForBlocking () {
123
+ while (!BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER .get ()) {
124
+ Thread .yield ();
125
+ }
126
+ }
127
+
128
+ private void waitForNotBlocking () {
129
+ while (BLOCKING_COMPLETABLE_FUTURE_DELAY_SCHEDULER .get ()) {
130
+ Thread .yield ();
131
+ }
92
132
}
93
133
94
134
private static class Killer implements Closeable {
0 commit comments