Skip to content

Commit c637098

Browse files
Test BaseTask as a SettablePromise
1 parent 10083bb commit c637098

File tree

5 files changed

+513
-374
lines changed

5 files changed

+513
-374
lines changed

src/com/linkedin/parseq/BaseTask.java

Lines changed: 3 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -464,7 +464,7 @@ private void purgeListeners(State<T> state)
464464
{
465465
taskListener.onUpdate(this, trace);
466466
}
467-
catch (Exception e)
467+
catch (Throwable e)
468468
{
469469
LOG.warn("Promise listener threw exception. Ignoring and continuing. Listener: " + taskListener, e);
470470
}
@@ -478,7 +478,7 @@ private void purgeListeners(State<T> state)
478478
{
479479
promiseListener.onResolved(this);
480480
}
481-
catch (Exception e)
481+
catch (Throwable e)
482482
{
483483
LOG.warn("Promise listener threw exception. Ignoring and continuing. Listener: " + promiseListener, e);
484484
}
@@ -496,7 +496,7 @@ private void notifyTaskListeners(State<T> state)
496496
{
497497
taskListener.onUpdate(this, trace);
498498
}
499-
catch (Exception e)
499+
catch (Throwable e)
500500
{
501501
LOG.warn("Promise listener threw exception. Ignoring and continuing. Listener: " + taskListener, e);
502502
}

src/com/linkedin/parseq/promise/SettablePromiseImpl.java

Lines changed: 14 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -94,7 +94,7 @@ public void addListener(final PromiseListener<T> listener)
9494
{
9595
if (isDone())
9696
{
97-
listener.onResolved(this);
97+
safeNotifyListener(listener);
9898
return;
9999
}
100100

@@ -140,14 +140,19 @@ private void purgeListeners()
140140
PromiseListener<T> listener;
141141
while((listener = _listeners.poll()) != null)
142142
{
143-
try
144-
{
145-
listener.onResolved(this);
146-
}
147-
catch (Throwable e)
148-
{
149-
LOGGER.warn("An exception was thrown by listener: " + listener.getClass(), e);
150-
}
143+
safeNotifyListener(listener);
144+
}
145+
}
146+
147+
private void safeNotifyListener(PromiseListener<T> listener)
148+
{
149+
try
150+
{
151+
listener.onResolved(this);
152+
}
153+
catch (Throwable e)
154+
{
155+
LOGGER.warn("An exception was thrown by listener: " + listener.getClass(), e);
151156
}
152157
}
153158

Lines changed: 95 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,95 @@
1+
package com.linkedin.parseq;
2+
3+
import com.linkedin.parseq.promise.AbstractSettablePromiseTest;
4+
import com.linkedin.parseq.promise.Promise;
5+
import com.linkedin.parseq.promise.Promises;
6+
import com.linkedin.parseq.promise.SettablePromise;
7+
import org.testng.annotations.AfterMethod;
8+
import org.testng.annotations.BeforeMethod;
9+
10+
import java.util.concurrent.Executors;
11+
import java.util.concurrent.ScheduledExecutorService;
12+
import java.util.concurrent.TimeUnit;
13+
14+
public class TestTaskAsSettablePromise extends AbstractSettablePromiseTest
15+
{
16+
ScheduledExecutorService _scheduler;
17+
Engine _engine;
18+
19+
@BeforeMethod
20+
@Override
21+
public void setUp() throws Exception
22+
{
23+
super.setUp();
24+
25+
_scheduler = Executors.newScheduledThreadPool(5);
26+
EngineBuilder engineBuilder = new EngineBuilder()
27+
.setTaskExecutor(_scheduler)
28+
.setTimerScheduler(_scheduler);
29+
_engine = engineBuilder.build();
30+
}
31+
32+
@AfterMethod
33+
@Override
34+
public void tearDown() throws Exception
35+
{
36+
_engine.shutdown();
37+
_engine.awaitTermination(200, TimeUnit.MILLISECONDS);
38+
_engine = null;
39+
_scheduler.shutdownNow();
40+
_scheduler = null;
41+
42+
super.tearDown();
43+
}
44+
45+
@Override
46+
protected <T> Promise<T> createPromise()
47+
{
48+
return new TaskAndPromise<T>();
49+
}
50+
51+
@Override
52+
protected <T> void setPromiseValue(Promise<T> promise, T value)
53+
{
54+
final TaskAndPromise<T> task = (TaskAndPromise<T>)promise;
55+
_engine.run(task);
56+
task._promise.done(value);
57+
58+
try
59+
{
60+
task.await(5, TimeUnit.SECONDS);
61+
}
62+
catch (InterruptedException e)
63+
{
64+
// Do nothing
65+
}
66+
}
67+
68+
@Override
69+
protected <T> void setPromiseError(Promise<T> promise, Throwable error)
70+
{
71+
final TaskAndPromise<T> task = (TaskAndPromise<T>)promise;
72+
_engine.run(task);
73+
task._promise.fail(error);
74+
75+
try
76+
{
77+
task.await(5, TimeUnit.SECONDS);
78+
}
79+
catch (InterruptedException e)
80+
{
81+
// Do nothing
82+
}
83+
}
84+
85+
private static class TaskAndPromise<T> extends BaseTask<T>
86+
{
87+
private final SettablePromise<T> _promise = Promises.settable();
88+
89+
@Override
90+
protected Promise<? extends T> run(Context context) throws Throwable
91+
{
92+
return _promise;
93+
}
94+
}
95+
}

0 commit comments

Comments
 (0)