5
5
import java .util .ArrayList ;
6
6
import java .util .List ;
7
7
import java .util .concurrent .Callable ;
8
+ import java .util .concurrent .CountDownLatch ;
8
9
import java .util .concurrent .Executors ;
9
10
import java .util .concurrent .ScheduledExecutorService ;
10
11
import java .util .concurrent .TimeUnit ;
11
- import java .util .concurrent .atomic . AtomicInteger ;
12
+ import java .util .concurrent .TimeoutException ;
12
13
13
14
import static com .linkedin .parseq .Tasks .par ;
14
15
import static org .testng .AssertJUnit .assertEquals ;
16
+ import static org .testng .AssertJUnit .assertFalse ;
15
17
import static org .testng .AssertJUnit .assertTrue ;
16
18
17
19
/**
20
22
*/
21
23
public class TestAsyncCallableTask extends BaseEngineTest
22
24
{
23
- protected static final String SUCCESS = "success" ;
24
- protected static final String FAIL_TIME_OUT = "Failure - Timed out" ;
25
-
26
25
@ Test
27
26
public void testConcurrentTasks () throws InterruptedException
28
27
{
29
- final int size = 2 ; // Should be <= CallableWrapperTask size
28
+ // This test ensures that a single plan can run more than one blocking task
29
+ // at a time if the AsyncCallableTask feature is used.
30
+
31
+ final int size = 2 ; // Degree of parallelism
32
+ final CountDownLatch latch = new CountDownLatch (size );
30
33
31
- final List <AsyncCallableTask <String >> tasks = new ArrayList <AsyncCallableTask <String >>(size );
34
+ final List <AsyncCallableTask <Void >> tasks = new ArrayList <AsyncCallableTask <Void >>(size );
32
35
for (int counter = 0 ; counter < size ; counter ++)
33
36
{
34
- tasks .add (counter , new AsyncCallableTask <String >(new ConcurrentCallable (size )));
37
+ tasks .add (counter , new AsyncCallableTask <Void >(new Callable <Void >() {
38
+ @ Override
39
+ public Void call () throws Exception
40
+ {
41
+ latch .countDown ();
42
+ if (!latch .await (5 , TimeUnit .SECONDS ))
43
+ {
44
+ throw new TimeoutException ("Latch should have reached 0 before timeout" );
45
+ }
46
+ return null ;
47
+ }
48
+ }));
35
49
}
36
50
37
- final ParTask <? > par = par (tasks );
51
+ final ParTask <Void > par = par (tasks );
38
52
getEngine ().run (par );
39
53
40
- par .await ();
54
+ assertTrue ( par .await (5 , TimeUnit . SECONDS ) );
41
55
42
56
assertEquals (2 , par .getSuccessful ().size ());
43
57
assertEquals (2 , par .getTasks ().size ());
44
58
assertEquals (2 , par .get ().size ());
59
+
45
60
for (int counter = 0 ; counter < size ; counter ++)
46
61
{
47
- assertEquals (SUCCESS , tasks .get (counter ).get ());
62
+ assertTrue (tasks .get (counter ).isDone ());
63
+ assertFalse (tasks .get (counter ).isFailed ());
48
64
}
49
65
}
50
66
67
+ @ Test
68
+ public void testThrowingCallable () throws InterruptedException
69
+ {
70
+ // Ensures that if a callable wrapped in an AsyncCallableTask throws that
71
+ // the wrapping task correctly reports the error state.
72
+ final Error error = new Error ();
73
+ final Task <Void > task = new AsyncCallableTask <Void >(new Callable <Void >()
74
+ {
75
+ @ Override
76
+ public Void call () throws Exception
77
+ {
78
+ throw error ;
79
+ }
80
+ });
81
+
82
+ getEngine ().run (task );
83
+
84
+ assertTrue (task .await (5 , TimeUnit .SECONDS ));
85
+
86
+ assertTrue (task .isDone ());
87
+ assertTrue (task .isFailed ());
88
+ assertEquals (error , task .getError ());
89
+ }
90
+
51
91
@ Test
52
92
public void testTaskWithoutExecutor () throws InterruptedException
53
93
{
@@ -70,7 +110,7 @@ public Integer call() throws Exception
70
110
});
71
111
engine .run (task );
72
112
73
- task .await ();
113
+ assertTrue ( task .await (5 , TimeUnit . SECONDS ) );
74
114
75
115
assertTrue (task .isFailed ());
76
116
assertTrue (task .getError () instanceof IllegalStateException );
@@ -82,32 +122,4 @@ public Integer call() throws Exception
82
122
scheduler .shutdownNow ();
83
123
}
84
124
}
85
- }
86
-
87
- class ConcurrentCallable implements Callable <String >
88
- {
89
- private static final long TEN_SECONDS = 10 * 1000 ;
90
- private static AtomicInteger _counter = new AtomicInteger (0 );
91
-
92
- private Integer _gate ;
93
-
94
- protected ConcurrentCallable (int gate )
95
- {
96
- _gate = gate ;
97
- }
98
- @ Override
99
- public String call () throws Exception
100
- {
101
- _counter .incrementAndGet ();
102
- long end = System .currentTimeMillis () + TEN_SECONDS ; // Prevent the test for running more than 10 seconds.
103
- while (_counter .get () < _gate )
104
- {
105
- if (end < System .currentTimeMillis ())
106
- {
107
- return TestAsyncCallableTask .FAIL_TIME_OUT ;
108
- }
109
- Thread .sleep (100 );
110
- }
111
- return TestAsyncCallableTask .SUCCESS ;
112
- }
113
125
}
0 commit comments