Skip to content

Commit 078a485

Browse files
Better garbage collection
Previously each task had a reference to other tasks in its plan, causing memory for all tasks to be held as long as a reference to one task was strongly reachable. This change uses indirection to reduce the amount of memory reachable: we uniquely identify each task in a plan and use this for building relationships for trace purposes.
1 parent cb3dc78 commit 078a485

Some content is hidden

Large Commits have some content hidden by default. Use the searchbox below for content that may be hidden.

42 files changed

+1643
-2351
lines changed

src/com/linkedin/parseq/BaseTask.java

Lines changed: 312 additions & 214 deletions
Large diffs are not rendered by default.

src/com/linkedin/parseq/Engine.java

Lines changed: 7 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -23,6 +23,8 @@
2323
import com.linkedin.parseq.internal.SerialExecutor;
2424
import com.linkedin.parseq.internal.PlanContext;
2525
import com.linkedin.parseq.internal.TaskLogger;
26+
import com.linkedin.parseq.internal.trace.TraceCapturerImpl;
27+
import com.linkedin.parseq.internal.trace.TraceCapturer;
2628
import com.linkedin.parseq.promise.Promise;
2729
import com.linkedin.parseq.promise.PromiseListener;
2830
import org.slf4j.ILoggerFactory;
@@ -132,9 +134,12 @@ public void run(final Task<?> task)
132134

133135
final long planId = NEXT_PLAN_ID.getAndIncrement();
134136
final Logger planLogger = _loggerFactory.getLogger(LOGGER_BASE + ":planClass=" + task.getClass().getName());
135-
final TaskLogger taskLogger = new TaskLogger(task, _allLogger, _rootLogger, planLogger);
137+
final TaskLogger taskLogger = new TaskLogger(planId, task, _allLogger, _rootLogger, planLogger);
138+
final TraceCapturer traceCapturer = new TraceCapturerImpl(task);
139+
final int taskId = traceCapturer.registerTask(task);
136140
final Executor taskExecutor = new SerialExecutor(_taskExecutor, new CancelPlanRejectionHandler(task));
137-
new ContextImpl(new PlanContext(planId, this, taskExecutor, _timerExecutor, taskLogger), task).runTask();
141+
final ContextImpl context = new ContextImpl(new PlanContext(planId, this, taskExecutor, _timerExecutor, traceCapturer, taskLogger), task, taskId);
142+
context.runTask();
138143

139144
InternalUtil.unwildcardTask(task).addListener(_taskDoneListener);
140145
}

src/com/linkedin/parseq/ParTaskImpl.java

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -82,7 +82,7 @@ public void onResolved(Promise<Object> resolvedPromise)
8282
{
8383
if (task.isFailed())
8484
{
85-
if (allEarlyFinish && ResultType.fromTask(task) != ResultType.EARLY_FINISH)
85+
if (allEarlyFinish && task.getShallowTrace().getResultType() != ResultType.EARLY_FINISH)
8686
{
8787
allEarlyFinish = false;
8888
}

src/com/linkedin/parseq/Task.java

Lines changed: 26 additions & 33 deletions
Original file line numberDiff line numberDiff line change
@@ -16,15 +16,11 @@
1616

1717
package com.linkedin.parseq;
1818

19-
import com.linkedin.parseq.internal.TaskLogger;
19+
import com.linkedin.parseq.internal.TaskListener;
2020
import com.linkedin.parseq.promise.Promise;
21-
import com.linkedin.parseq.trace.Related;
2221
import com.linkedin.parseq.trace.ShallowTrace;
2322
import com.linkedin.parseq.trace.Trace;
2423

25-
import java.util.Collection;
26-
import java.util.Set;
27-
2824
/**
2925
* A task represents a deferred execution that also contains its resulting
3026
* value. In addition, tasks include some tracing information that can be
@@ -38,11 +34,11 @@
3834
public interface Task<T> extends Promise<T>, Cancellable
3935
{
4036
/**
41-
* Returns the name of this task.
42-
*
43-
* @return the name of this task
37+
* Returns the name of this task. If no name was set during construction
38+
* this method will return the value of {@link #toString()}. In most
39+
* cases it is preferable to explicitly set a name.
4440
*/
45-
public String getName();
41+
String getName();
4642

4743
/**
4844
* Returns the priority for this task.
@@ -71,41 +67,38 @@ public interface Task<T> extends Promise<T>, Cancellable
7167
boolean setPriority(int priority);
7268

7369
/**
74-
* Attempts to run the task with the given context. This method is
75-
* reserved for use by {@link Engine} and {@link Context}.
76-
*
77-
* @param context the context to use while running this step
78-
* @param taskLogger the logger used for task events
79-
* @param parent the parent of this task
80-
* @param predecessors that lead to the execution of this task
81-
*/
82-
void contextRun(Context context, TaskLogger taskLogger,
83-
Task<?> parent, Collection<Task<?>> predecessors);
84-
85-
86-
/**
87-
* Returns the ShallowTrace for this task. The ShallowTrace will be
88-
* a point-in-time snapshot and may change over time until the task is
89-
* completed.
90-
*
91-
* @return the ShallowTrace related to this task
70+
* Returns a trace for this task alone. The trace is a point-in-time
71+
* snapshot, so successive invocations of {@code getShallowTrace} may
72+
* return different results.
9273
*/
9374
ShallowTrace getShallowTrace();
9475

9576
/**
96-
* Returns the Trace for this task. The Trace will be a point-in-time snapshot
77+
* Returns the trace for this task. The trace will be a point-in-time snapshot
9778
* and may change over time until the task is completed.
9879
*
99-
* @return the Trace related to this task
80+
* @return the trace related to this task
10081
*/
10182
Trace getTrace();
10283

10384
/**
104-
* Returns the set of relationships of this task. The parent relationships are not included.
85+
* Attempts to bind this task to the given context. A task may only be bound
86+
* once. This method is reserved for use by the ParSeq framework only.
10587
*
106-
* @see com.linkedin.parseq.trace.Relationship the available relationships
107-
* @return the set of relationships of this task.
88+
* @param context the context to assign to this task.
89+
* @return {@code true} if the assignment was successful.
90+
*/
91+
boolean assignContext(Context context);
92+
93+
/**
94+
* Attempts to run the task with the given context. This method is for use by
95+
* the ParSeq framework only.
10896
*/
109-
Set<Related<Task<?>>> getRelationships();
97+
void contextRun();
11098

99+
/**
100+
* Adds a listener to this task that is notified each time the task changes
101+
* state. This method is for use by the ParSeq framework only.
102+
*/
103+
void addTaskListener(TaskListener tracingTaskListener);
111104
}

0 commit comments

Comments
 (0)