Skip to content

Commit 086624a

Browse files
author
jodzga
committed
Changed priorities of tasks scheduled by TimeoutWithErrorTask to execute
related tasks close together in order to avoid impact of SerialExecutor's queue on timeouts.
1 parent 76654d1 commit 086624a

File tree

2 files changed

+82
-9
lines changed

2 files changed

+82
-9
lines changed

src/com/linkedin/parseq/TimeoutWithErrorTask.java

Lines changed: 20 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -48,6 +48,7 @@ public TimeoutWithErrorTask(final String name, final long time,
4848
_time = time;
4949
_unit = unit;
5050
_task = task;
51+
setPriority(task.getPriority());
5152
}
5253

5354
@Override
@@ -68,7 +69,14 @@ public void run()
6869
}
6970
});
7071

72+
//timeout tasks should run as early as possible
73+
timeoutTask.setPriority(Priority.MAX_PRIORITY);
7174
context.createTimer(_time, _unit, timeoutTask);
75+
76+
//set priority of the task for which we just scheduled timeout
77+
//to be executed next, unless there exist other tasks with higher priority
78+
//e.g. other timeouts
79+
_task.setPriority(higherThan(getPriority()));
7280
context.run(_task);
7381

7482
_task.addListener(new PromiseListener<T>()
@@ -85,4 +93,16 @@ public void onResolved(Promise<T> resolvedPromise)
8593

8694
return result;
8795
}
96+
97+
/**
98+
* Returns priority higher than given priority. If given
99+
* priority has max value, then max is returned.
100+
*/
101+
private static int higherThan(int priority) {
102+
if (priority == Priority.MAX_PRIORITY) {
103+
return Priority.MAX_PRIORITY;
104+
} else {
105+
return priority + 1;
106+
}
107+
}
88108
}

test/com/linkedin/parseq/TestTasks.java

Lines changed: 62 additions & 9 deletions
Original file line numberDiff line numberDiff line change
@@ -16,21 +16,28 @@
1616

1717
package com.linkedin.parseq;
1818

19-
import com.linkedin.parseq.promise.Promise;
20-
import com.linkedin.parseq.promise.PromiseListener;
21-
import com.linkedin.parseq.promise.Promises;
22-
import org.testng.annotations.Test;
19+
import static com.linkedin.parseq.TestUtil.value;
20+
import static org.testng.AssertJUnit.assertEquals;
21+
import static org.testng.AssertJUnit.assertFalse;
22+
import static org.testng.AssertJUnit.assertTrue;
23+
import static org.testng.AssertJUnit.fail;
2324

25+
import java.io.IOException;
26+
import java.util.ArrayList;
27+
import java.util.List;
2428
import java.util.concurrent.Callable;
29+
import java.util.concurrent.Executors;
30+
import java.util.concurrent.ScheduledExecutorService;
2531
import java.util.concurrent.TimeUnit;
2632
import java.util.concurrent.TimeoutException;
2733
import java.util.concurrent.atomic.AtomicReference;
2834

29-
import static com.linkedin.parseq.TestUtil.value;
30-
import static org.testng.AssertJUnit.assertEquals;
31-
import static org.testng.AssertJUnit.assertFalse;
32-
import static org.testng.AssertJUnit.assertTrue;
33-
import static org.testng.AssertJUnit.fail;
35+
import org.testng.annotations.Test;
36+
37+
import com.linkedin.parseq.promise.Promise;
38+
import com.linkedin.parseq.promise.PromiseListener;
39+
import com.linkedin.parseq.promise.Promises;
40+
import com.linkedin.parseq.promise.SettablePromise;
3441

3542
/**
3643
* @author Chris Pettitt ([email protected])
@@ -171,6 +178,52 @@ public String call() throws Exception
171178
assertEquals(error, timeoutTask.getError());
172179
}
173180

181+
/**
182+
* Test scenario in which there are many TimeoutWithErrorTasks scheduled for execution
183+
* e.g. by using Tasks.par().
184+
*/
185+
@Test
186+
public void testManyTimeoutTaskWithoutTimeoutOnAQueeu() throws InterruptedException, IOException
187+
{
188+
final String value = "value";
189+
final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
190+
191+
List<Task<String>> tasks = new ArrayList<Task<String>>();
192+
for (int i = 0; i < 50; i++) {
193+
194+
// Task which simulates doing something for 0.5ms and setting response
195+
// asynchronously after 5ms.
196+
Task<String> t = new BaseTask<String>("test") {
197+
@Override
198+
protected Promise<? extends String> run(Context context) throws Throwable {
199+
final SettablePromise<String> result = Promises.settable();
200+
Thread.sleep(0, 500000);
201+
scheduler.schedule(new Runnable() {
202+
@Override
203+
public void run() {
204+
result.done(value);
205+
}
206+
}, 5, TimeUnit.MILLISECONDS);
207+
return result;
208+
}
209+
};
210+
// add 50ms timeout for the task
211+
tasks.add(Tasks.timeoutWithError(50, TimeUnit.MILLISECONDS, t));
212+
}
213+
214+
// final task runs all the tasks in parallel
215+
final Task<?> timeoutTask = Tasks.par(tasks);
216+
217+
getEngine().run(timeoutTask);
218+
219+
assertTrue(timeoutTask.await(5, TimeUnit.SECONDS));
220+
221+
scheduler.shutdown();
222+
223+
//tasks should not time out
224+
assertEquals(false, timeoutTask.isFailed());
225+
}
226+
174227
@Test
175228
public void testSetPriorityBelowMinValue()
176229
{

0 commit comments

Comments
 (0)