Skip to content

Commit 10083bb

Browse files
Cooperative notification on setting a promise's value
1 parent 078a485 commit 10083bb

File tree

1 file changed

+56
-63
lines changed

1 file changed

+56
-63
lines changed

src/com/linkedin/parseq/promise/SettablePromiseImpl.java

Lines changed: 56 additions & 63 deletions
Original file line numberDiff line numberDiff line change
@@ -19,12 +19,10 @@
1919
import org.slf4j.Logger;
2020
import org.slf4j.LoggerFactory;
2121

22-
import java.util.ArrayList;
23-
import java.util.List;
22+
import java.util.concurrent.ConcurrentLinkedQueue;
2423
import java.util.concurrent.CountDownLatch;
2524
import java.util.concurrent.TimeUnit;
26-
import java.util.concurrent.locks.Lock;
27-
import java.util.concurrent.locks.ReentrantLock;
25+
import java.util.concurrent.atomic.AtomicReference;
2826

2927
/**
3028
* @author Chris Pettitt ([email protected])
@@ -36,11 +34,8 @@
3634

3735
private final CountDownLatch _awaitLatch = new CountDownLatch(1);
3836

39-
private final Lock _lock = new ReentrantLock();
40-
private List<PromiseListener<T>> _listeners = new ArrayList<PromiseListener<T>>();
41-
private volatile T _value;
42-
private volatile Throwable _error;
43-
private volatile boolean _done;
37+
private final AtomicReference<Result<T>> _result = new AtomicReference<Result<T>>();
38+
private final ConcurrentLinkedQueue<PromiseListener<T>> _listeners = new ConcurrentLinkedQueue<PromiseListener<T>>();
4439

4540
@Override
4641
public void done(final T value) throws PromiseResolvedException
@@ -57,30 +52,29 @@ public void fail(final Throwable error) throws PromiseResolvedException
5752
@Override
5853
public T get() throws PromiseException
5954
{
60-
ensureDone();
61-
if (_error != null)
55+
Result<T> result = getResult();
56+
if (result._error != null)
6257
{
63-
throw new PromiseException(_error);
58+
throw new PromiseException(result._error);
6459
}
65-
return _value;
60+
return result._value;
6661
}
6762

6863
@Override
6964
public Throwable getError() throws PromiseUnresolvedException
7065
{
71-
ensureDone();
72-
return _error;
66+
return getResult()._error;
7367
}
7468

7569
@Override
7670
public T getOrDefault(final T defaultValue) throws PromiseUnresolvedException
7771
{
78-
ensureDone();
79-
if (_error != null)
72+
Result<T> result = getResult();
73+
if (result._error != null)
8074
{
8175
return defaultValue;
8276
}
83-
return _value;
77+
return result._value;
8478
}
8579

8680
@Override
@@ -98,93 +92,92 @@ public boolean await(final long time, final TimeUnit unit) throws InterruptedExc
9892
@Override
9993
public void addListener(final PromiseListener<T> listener)
10094
{
101-
_lock.lock();
102-
try
103-
{
104-
if (!isDone())
105-
{
106-
_listeners.add(listener);
107-
return;
108-
}
109-
}
110-
finally
95+
if (isDone())
11196
{
112-
_lock.unlock();
97+
listener.onResolved(this);
98+
return;
11399
}
114100

115-
notifyListener(listener);
101+
_listeners.add(listener);
102+
103+
if (isDone())
104+
purgeListeners();
116105
}
117106

118107
@Override
119108
public boolean isDone()
120109
{
121-
return _done;
110+
return _result.get() != null;
122111
}
123112

124113
@Override
125114
public boolean isFailed()
126115
{
127-
return isDone() && _error != null;
116+
final Result<T> voe = _result.get();
117+
return voe != null && voe._error != null;
128118
}
129119

130120
private void doFinish(T value, Throwable error) throws PromiseResolvedException
131121
{
132-
final List<PromiseListener<T>> listeners;
133-
134-
_lock.lock();
135-
try
136-
{
137-
ensureNotDone();
138-
_value = value;
139-
_error = error;
140-
listeners = _listeners;
141-
_listeners = null;
142-
_done = true;
143-
}
144-
finally
122+
if (!_result.compareAndSet(null, new Result<T>(value, error)))
145123
{
146-
_lock.unlock();
124+
throw new PromiseResolvedException("Promise has already been satisfied");
147125
}
148126

149-
for (int i = listeners.size() - 1; i >= 0; i--)
150-
{
151-
notifyListener(listeners.get(i));
152-
}
127+
purgeListeners();
153128

154129
_awaitLatch.countDown();
155130
}
156131

157-
private void notifyListener(final PromiseListener<T> listener)
132+
private void purgeListeners()
158133
{
159134
// We intentionally catch Throwable around the listener invocation because
160135
// it will cause the notifier loop and subsequent count down in doFinish to
161136
// be skipped, which will certainly lead to bad behavior. It could be argued
162137
// that the catch should not apply for use of notifyListener from
163138
// addListener, but it seems better to err on the side of consistency and
164139
// least surprise.
165-
try
166-
{
167-
listener.onResolved(this);
168-
}
169-
catch (Throwable e)
140+
PromiseListener<T> listener;
141+
while((listener = _listeners.poll()) != null)
170142
{
171-
LOGGER.warn("An exception was thrown by listener: " + listener.getClass(), e);
143+
try
144+
{
145+
listener.onResolved(this);
146+
}
147+
catch (Throwable e)
148+
{
149+
LOGGER.warn("An exception was thrown by listener: " + listener.getClass(), e);
150+
}
172151
}
173152
}
174153

175-
private void ensureNotDone() throws PromiseResolvedException
154+
private Result<T> getResult() throws PromiseUnresolvedException
176155
{
177-
if (isDone())
156+
final Result<T> voe = _result.get();
157+
if (voe == null)
178158
{
179-
throw new PromiseResolvedException("Promise has already been satisfied");
159+
throw new PromiseUnresolvedException("Promise has not yet been satisfied");
180160
}
161+
return voe;
181162
}
182163

183-
private void ensureDone() throws PromiseUnresolvedException
164+
private static class Result<T>
184165
{
185-
if (!isDone())
166+
private final T _value;
167+
private final Throwable _error;
168+
169+
public Result(T value, Throwable error)
186170
{
187-
throw new PromiseUnresolvedException("Promise has not yet been satisfied");
171+
if (error != null)
172+
{
173+
_value = null;
174+
_error = error;
175+
}
176+
else
177+
{
178+
_value = value;
179+
_error = null;
180+
}
188181
}
189182
}
190183
}

0 commit comments

Comments
 (0)