Skip to content

Commit b4a0793

Browse files
Introduce PlanContext for plan-level configuration and facilities
1 parent 2499904 commit b4a0793

File tree

3 files changed

+93
-39
lines changed

3 files changed

+93
-39
lines changed

src/com/linkedin/parseq/Engine.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -21,6 +21,7 @@
2121
import com.linkedin.parseq.internal.RejectedSerialExecutionHandler;
2222
import com.linkedin.parseq.internal.SerialExecutionException;
2323
import com.linkedin.parseq.internal.SerialExecutor;
24+
import com.linkedin.parseq.internal.PlanContext;
2425
import com.linkedin.parseq.internal.TaskLogger;
2526
import com.linkedin.parseq.promise.Promise;
2627
import com.linkedin.parseq.promise.PromiseListener;
@@ -32,6 +33,7 @@
3233
import java.util.concurrent.CountDownLatch;
3334
import java.util.concurrent.Executor;
3435
import java.util.concurrent.TimeUnit;
36+
import java.util.concurrent.atomic.AtomicLong;
3537
import java.util.concurrent.atomic.AtomicReference;
3638

3739
/**
@@ -50,6 +52,8 @@ public class Engine
5052

5153
private static enum StateName { RUN, SHUTDOWN, TERMINATED }
5254

55+
private final AtomicLong NEXT_PLAN_ID = new AtomicLong();
56+
5357
private final Executor _taskExecutor;
5458
private final DelayedExecutor _timerExecutor;
5559
private final ILoggerFactory _loggerFactory;
@@ -126,10 +130,11 @@ public void run(final Task<?> task)
126130
newState = new State(StateName.RUN, currState._pendingCount + 1);
127131
} while (!_stateRef.compareAndSet(currState, newState));
128132

133+
final long planId = NEXT_PLAN_ID.getAndIncrement();
129134
final Logger planLogger = _loggerFactory.getLogger(LOGGER_BASE + ":planClass=" + task.getClass().getName());
130135
final TaskLogger taskLogger = new TaskLogger(task, _allLogger, _rootLogger, planLogger);
131-
new ContextImpl(new SerialExecutor(_taskExecutor, new CancelPlanRejectionHandler(task)),
132-
_timerExecutor, task, taskLogger, this).runTask();
136+
final Executor taskExecutor = new SerialExecutor(_taskExecutor, new CancelPlanRejectionHandler(task));
137+
new ContextImpl(new PlanContext(planId, this, taskExecutor, _timerExecutor, taskLogger), task).runTask();
133138

134139
InternalUtil.unwildcardTask(task).addListener(_taskDoneListener);
135140
}

src/com/linkedin/parseq/internal/ContextImpl.java

Lines changed: 15 additions & 37 deletions
Original file line numberDiff line numberDiff line change
@@ -19,9 +19,7 @@
1919
import com.linkedin.parseq.After;
2020
import com.linkedin.parseq.Cancellable;
2121
import com.linkedin.parseq.Context;
22-
import com.linkedin.parseq.DelayedExecutor;
2322
import com.linkedin.parseq.EarlyFinishException;
24-
import com.linkedin.parseq.Engine;
2523
import com.linkedin.parseq.Task;
2624
import com.linkedin.parseq.promise.Promise;
2725
import com.linkedin.parseq.promise.PromiseListener;
@@ -32,7 +30,6 @@
3230
import java.util.Iterator;
3331
import java.util.List;
3432
import java.util.concurrent.ConcurrentLinkedQueue;
35-
import java.util.concurrent.Executor;
3633
import java.util.concurrent.TimeUnit;
3734

3835
/**
@@ -57,17 +54,10 @@ public class ContextImpl implements Context, Cancellable
5754
}
5855
}
5956

60-
private final DelayedExecutor _timerScheduler;
61-
62-
/* An executor that provides two guarantees:
63-
*
64-
* 1. Only one task is executed at a time
65-
* 2. The completion of a task happens-before the execution of the next task
66-
*
67-
* For more on the happens-before constraint see the java.util.concurrent
68-
* package documentation.
57+
/**
58+
* Plan level configuration and facilities.
6959
*/
70-
private final Executor _taskExecutor;
60+
private final PlanContext _planContext;
7161

7262
private final Task<Object> _task;
7363

@@ -78,36 +68,24 @@ public class ContextImpl implements Context, Cancellable
7868

7969
private final Task<?> _parent;
8070
private final List<Task<?>> _predecessorTasks;
81-
private final TaskLogger _taskLogger;
82-
83-
private final Engine _engine;
8471

8572
private final ConcurrentLinkedQueue<Cancellable> _cancellables = new ConcurrentLinkedQueue<Cancellable>();
8673

87-
public ContextImpl(final Executor taskExecutor,
88-
final DelayedExecutor timerScheduler,
89-
final Task<?> task,
90-
final TaskLogger taskLogger,
91-
final Engine engine)
74+
public ContextImpl(final PlanContext planContext,
75+
final Task<?> task)
9276
{
93-
this(taskExecutor, timerScheduler, task, NO_PARENT, NO_PREDECESSORS, taskLogger, engine);
77+
this(planContext, task, NO_PARENT, NO_PREDECESSORS);
9478
}
9579

96-
private ContextImpl(final Executor taskExecutor,
97-
final DelayedExecutor timerScheduler,
80+
private ContextImpl(final PlanContext planContext,
9881
final Task<?> task,
9982
final Task<?> parent,
100-
final List<Task<?>> predecessorTasks,
101-
final TaskLogger taskLogger,
102-
final Engine engine)
83+
final List<Task<?>> predecessorTasks)
10384
{
104-
_timerScheduler = timerScheduler;
105-
_taskExecutor = taskExecutor;
85+
_planContext = planContext;
10686
_task = InternalUtil.unwildcardTask(task);
10787
_parent = parent;
10888
_predecessorTasks = predecessorTasks;
109-
_taskLogger = taskLogger;
110-
_engine = engine;
11189
}
11290

11391
public void runTask()
@@ -127,15 +105,15 @@ public void onResolved(Promise<Object> resolvedPromise)
127105
}
128106
});
129107

130-
_taskExecutor.execute(new PrioritizableRunnable()
108+
_planContext.execute(new PrioritizableRunnable()
131109
{
132110
@Override
133111
public void run()
134112
{
135113
_inTask.set(_task);
136114
try
137115
{
138-
_task.contextRun(ContextImpl.this, _taskLogger, _parent, _predecessorTasks);
116+
_task.contextRun(ContextImpl.this, _planContext.getTaskLogger(), _parent, _predecessorTasks);
139117
}
140118
finally
141119
{
@@ -156,7 +134,7 @@ public Cancellable createTimer(final long time, final TimeUnit unit,
156134
final Task<?> task)
157135
{
158136
checkInTask();
159-
final Cancellable cancellable = _timerScheduler.schedule(time, unit, new Runnable()
137+
final Cancellable cancellable = _planContext.schedule(time, unit, new Runnable()
160138
{
161139
@Override
162140
public void run()
@@ -214,19 +192,19 @@ public boolean cancel(Exception reason)
214192
{
215193
boolean result = _task.cancel(reason);
216194
//run the task to capture the trace data
217-
_task.contextRun(this, _taskLogger, _parent, _predecessorTasks);
195+
_task.contextRun(this, _planContext.getTaskLogger(), _parent, _predecessorTasks);
218196
return result;
219197
}
220198

221199
@Override
222200
public Object getEngineProperty(String key)
223201
{
224-
return _engine.getProperty(key);
202+
return _planContext.getEngineProperty(key);
225203
}
226204

227205
private ContextImpl createSubContext(final Task<?> task, final List<Task<?>> predecessors)
228206
{
229-
return new ContextImpl(_taskExecutor, _timerScheduler, task, _task, predecessors, _taskLogger, _engine);
207+
return new ContextImpl(_planContext, task, _task, predecessors);
230208
}
231209

232210
private void runSubTask(final Task<?> task, final List<Task<?>> predecessors)
Lines changed: 71 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,71 @@
1+
package com.linkedin.parseq.internal;
2+
3+
import com.linkedin.parseq.Cancellable;
4+
import com.linkedin.parseq.DelayedExecutor;
5+
import com.linkedin.parseq.Engine;
6+
7+
import java.util.concurrent.Executor;
8+
import java.util.concurrent.TimeUnit;
9+
10+
public class PlanContext
11+
{
12+
/** Unique identifier for this plan. */
13+
private final long _id;
14+
15+
/** The engine used to execute this plan. */
16+
private final Engine _engine;
17+
18+
/**
19+
* An executor that provides two guarantees:
20+
*
21+
* 1. Only one task is executed at a time
22+
* 2. The completion of a task happens-before the execution of the next task
23+
*
24+
* For more on the happens-before constraint see the java.util.concurrent
25+
* package documentation.
26+
*/
27+
private final Executor _taskExecutor;
28+
29+
/** Scheduler for running time delayed tasks. */
30+
private final DelayedExecutor _timerScheduler;
31+
32+
private final TaskLogger _taskLogger;
33+
34+
public PlanContext(final long id,
35+
final Engine engine,
36+
final Executor taskExecutor,
37+
final DelayedExecutor timerScheduler,
38+
final TaskLogger taskLogger)
39+
{
40+
_id = id;
41+
_engine = engine;
42+
_taskExecutor = taskExecutor;
43+
_timerScheduler = timerScheduler;
44+
_taskLogger = taskLogger;
45+
}
46+
47+
public long getId()
48+
{
49+
return _id;
50+
}
51+
52+
public void execute(Runnable runnable)
53+
{
54+
_taskExecutor.execute(runnable);
55+
}
56+
57+
public Cancellable schedule(long time, TimeUnit unit, Runnable runnable)
58+
{
59+
return _timerScheduler.schedule(time, unit, runnable);
60+
}
61+
62+
public Object getEngineProperty(String key)
63+
{
64+
return _engine.getProperty(key);
65+
}
66+
67+
public TaskLogger getTaskLogger()
68+
{
69+
return _taskLogger;
70+
}
71+
}

0 commit comments

Comments
 (0)