Skip to content

Commit 87af02b

Browse files
Cancel plan execution if its serial executor loop was rejected
If the engine's task executor is configured to reject execution upon saturation (e.g. with AbortPolicy) we will now abort the plan that is being executed. We do not recommend rejecting work at this level since it will cause in-flight plans to be aborted. Instead, we recommend using standard techniques for managing at the engine level, e.g. back-pressure, load shedding, or degraded responses. The time spent on plan execution can also be bounded using a timeout task.
1 parent 55ea575 commit 87af02b

File tree

7 files changed

+339
-5
lines changed

7 files changed

+339
-5
lines changed

CHANGELOG.md

Lines changed: 6 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1,3 +1,9 @@
1+
v1.3.7
2+
------
3+
4+
* We now cancel a plan if its execution fails due to a
5+
RejectedExecutionException being raised from the engine's task executor.
6+
17
v1.3.6
28
------
39

src/com/linkedin/parseq/Engine.java

Lines changed: 25 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -18,18 +18,19 @@
1818

1919
import com.linkedin.parseq.internal.ContextImpl;
2020
import com.linkedin.parseq.internal.InternalUtil;
21+
import com.linkedin.parseq.internal.RejectedSerialExecutionHandler;
22+
import com.linkedin.parseq.internal.SerialExecutionException;
2123
import com.linkedin.parseq.internal.SerialExecutor;
2224
import com.linkedin.parseq.internal.TaskLogImpl;
2325
import com.linkedin.parseq.promise.Promise;
2426
import com.linkedin.parseq.promise.PromiseListener;
2527
import org.slf4j.ILoggerFactory;
2628
import org.slf4j.Logger;
29+
import org.slf4j.LoggerFactory;
2730

28-
import java.util.HashMap;
2931
import java.util.Map;
3032
import java.util.concurrent.CountDownLatch;
3133
import java.util.concurrent.Executor;
32-
import java.util.concurrent.ExecutorService;
3334
import java.util.concurrent.TimeUnit;
3435
import java.util.concurrent.atomic.AtomicReference;
3536

@@ -42,6 +43,7 @@
4243
public class Engine
4344
{
4445
private static final String LOGGER_BASE = Engine.class.getName();
46+
private static final Logger LOG = LoggerFactory.getLogger(LOGGER_BASE);
4547

4648
private static final State INIT = new State(StateName.RUN, 0);
4749
private static final State TERMINATED = new State(StateName.TERMINATED, 0);
@@ -126,7 +128,8 @@ public void run(final Task<?> task)
126128

127129
final Logger planLogger = _loggerFactory.getLogger(LOGGER_BASE + ":planClass=" + task.getClass().getName());
128130
final TaskLog taskLog = new TaskLogImpl(task, _allLogger, _rootLogger, planLogger);
129-
new ContextImpl(new SerialExecutor(_taskExecutor), _timerExecutor, task, taskLog, this).runTask();
131+
new ContextImpl(new SerialExecutor(_taskExecutor, new CancelPlanRejectionHandler(task)),
132+
_timerExecutor, task, taskLog, this).runTask();
130133

131134
InternalUtil.unwildcardTask(task).addListener(_taskDoneListener);
132135
}
@@ -234,4 +237,23 @@ private State(final StateName stateName, final long pendingCount)
234237
_stateName = stateName;
235238
}
236239
}
240+
241+
private static class CancelPlanRejectionHandler implements RejectedSerialExecutionHandler
242+
{
243+
private final Task<?> _task;
244+
245+
private CancelPlanRejectionHandler(Task<?> task)
246+
{
247+
_task = task;
248+
}
249+
250+
@Override
251+
public void rejectedExecution(Throwable error)
252+
{
253+
final String msg = "Serial executor loop failed for plan: " + _task.getName();
254+
final SerialExecutionException ex = new SerialExecutionException(msg, error);
255+
final boolean wasCancelled = _task.cancel(ex);
256+
LOG.error(msg + ". The plan was " + (wasCancelled ? "" : "not ") + "cancelled.", ex);
257+
}
258+
}
237259
}
Lines changed: 16 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,16 @@
1+
package com.linkedin.parseq.internal;
2+
3+
/**
4+
* A handler that is invoked if the {@link SerialExecutor}'s execution loop
5+
* fails during resubmission to the underlying executor.
6+
*/
7+
public interface RejectedSerialExecutionHandler
8+
{
9+
/**
10+
* This method is invoked if a {@link SerialExecutor}'s execution loop cannot
11+
* be resubmitted to the underlying executor.
12+
*
13+
* @param error the error that was raised by the underlying executor.
14+
*/
15+
void rejectedExecution(Throwable error);
16+
}
Lines changed: 11 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,11 @@
1+
package com.linkedin.parseq.internal;
2+
3+
public class SerialExecutionException extends Exception
4+
{
5+
private static final long serialVersionUID = 0L;
6+
7+
public SerialExecutionException(String msg, Throwable error)
8+
{
9+
super(msg, error);
10+
}
11+
}

src/com/linkedin/parseq/internal/SerialExecutor.java

Lines changed: 26 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -27,28 +27,52 @@
2727
* <p/>
2828
* For more on the happens-before constraint see the {@code java.util.concurrent}
2929
* package documentation.
30+
* <p/>
31+
* It is possible for the underlying executor to throw an exception signaling
32+
* that it is not able to accept new work. For example, this can occur with an
33+
* executor that has a bounded queue size and an
34+
* {@link java.util.concurrent.ThreadPoolExecutor.AbortPolicy}. If this occurs
35+
* the executor will run the {@code rejectionHandler} to signal this failure
36+
* to a layer that can more appropriate handle this event.
3037
*
3138
* @author Chris Pettitt ([email protected])
3239
*/
3340
public class SerialExecutor implements Executor
3441
{
3542
private final Executor _executor;
43+
private final RejectedSerialExecutionHandler _rejectionHandler;
3644
private final ExecutorLoop _executorLoop = new ExecutorLoop();
3745
private final FIFOPriorityQueue<Runnable> _queue = new FIFOPriorityQueue<Runnable>();
3846
private final AtomicInteger _pendingCount = new AtomicInteger();
3947

40-
public SerialExecutor(final Executor executor)
48+
public SerialExecutor(final Executor executor, final RejectedSerialExecutionHandler rejectionHandler)
4149
{
50+
assert executor != null;
51+
assert rejectionHandler != null;
52+
4253
_executor = executor;
54+
_rejectionHandler = rejectionHandler;
4355
}
4456

4557
public void execute(final Runnable runnable)
4658
{
4759
_queue.add(runnable);
4860
if (_pendingCount.getAndIncrement() == 0)
61+
{
62+
tryExecuteLoop();
63+
}
64+
}
65+
66+
private void tryExecuteLoop()
67+
{
68+
try
4969
{
5070
_executor.execute(_executorLoop);
5171
}
72+
catch (Throwable t)
73+
{
74+
_rejectionHandler.rejectedExecution(t);
75+
}
5276
}
5377

5478
private class ExecutorLoop implements Runnable
@@ -78,7 +102,7 @@ public void run()
78102
// above for more details.
79103
if (_pendingCount.decrementAndGet() > 0)
80104
{
81-
_executor.execute(this);
105+
tryExecuteLoop();
82106
}
83107
}
84108
}

test/com/linkedin/parseq/TestEngine.java

Lines changed: 66 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -22,9 +22,13 @@
2222
import org.testng.annotations.BeforeMethod;
2323
import org.testng.annotations.Test;
2424

25+
import java.util.concurrent.ArrayBlockingQueue;
2526
import java.util.concurrent.CountDownLatch;
27+
import java.util.concurrent.ExecutorService;
2628
import java.util.concurrent.Executors;
29+
import java.util.concurrent.RejectedExecutionException;
2730
import java.util.concurrent.ScheduledExecutorService;
31+
import java.util.concurrent.ThreadPoolExecutor;
2832
import java.util.concurrent.TimeUnit;
2933

3034
import static org.testng.AssertJUnit.assertEquals;
@@ -155,4 +159,66 @@ protected Promise<? extends String> run(final Context context) throws Exception
155159
assertTrue(sucTask.await(50, TimeUnit.MILLISECONDS));
156160
assertEquals(sucValue, sucTask.get());
157161
}
162+
163+
@Test
164+
public void testFailPlanExecution() throws InterruptedException
165+
{
166+
// This test ensures that if execution of a plan's serial executor loop
167+
// fails, e.g. in the case that the underlying executor is saturated, that
168+
// we fail the plan. To simplify this test, we constructor our own executor
169+
// instead of using the default executor set up for test.
170+
final ExecutorService executorService = new ThreadPoolExecutor(1, 1,
171+
0, TimeUnit.SECONDS,
172+
new ArrayBlockingQueue<Runnable>(1),
173+
new ThreadPoolExecutor.AbortPolicy());
174+
final ScheduledExecutorService scheduledExecutorService = Executors.newSingleThreadScheduledExecutor();
175+
176+
try
177+
{
178+
final Engine engine = new EngineBuilder()
179+
.setTaskExecutor(executorService)
180+
.setTimerScheduler(scheduledExecutorService)
181+
.build();
182+
183+
// First we submit two tasks that will never finish. This saturates the
184+
// underlying executor by using its only thread and saturating its
185+
// single slot queue.
186+
engine.run(neverEndingBlockingTask());
187+
engine.run(neverEndingBlockingTask());
188+
189+
// Now we submit another task. The execution loop for this task will fail
190+
// during submit to the underlying executor. We expect that it will be
191+
// cancelled.
192+
final Task<?> task = neverEndingBlockingTask();
193+
engine.run(task);
194+
assertTrue(task.await(5, TimeUnit.SECONDS));
195+
assertTrue(task.isFailed());
196+
assertTrue("Expected underlying exception to be instance of RejectedExecutionException, but was: " + task.getError().getCause(),
197+
task.getError().getCause() instanceof RejectedExecutionException);
198+
199+
engine.shutdown();
200+
}
201+
finally
202+
{
203+
scheduledExecutorService.shutdownNow();
204+
executorService.shutdownNow();
205+
}
206+
}
207+
208+
/**
209+
* A task that blocks forever when it is executed, tying up whatever thread
210+
* executes it.
211+
*/
212+
private Task<?> neverEndingBlockingTask()
213+
{
214+
return new BaseTask<Object>()
215+
{
216+
@Override
217+
protected Promise<?> run(Context context) throws Throwable
218+
{
219+
new CountDownLatch(1).await();
220+
return Promises.value("A value that should never be seen!");
221+
}
222+
};
223+
}
158224
}

0 commit comments

Comments
 (0)