in tez-runtime-library/src/main/java/org/apache/tez/dag/library/vertexmanager/InputReadyVertexManager.java [138:205]
void handleSourceTaskFinished(String vertex, Integer taskId) {
SourceVertexInfo srcInfo = srcVertexInfo.get(vertex);
if (srcInfo.taskIsFinished[taskId.intValue()] == null) {
// not a duplicate completion
srcInfo.taskIsFinished[taskId.intValue()] = new Boolean(true);
srcInfo.numFinishedTasks++;
if (srcInfo.edgeProperty.getDataMovementType() == DataMovementType.ONE_TO_ONE) {
oneToOneSrcTasksDoneCount[taskId.intValue()]++;
// keep the latest container that completed as the location hint
// After there is standard data size info available then use it
oneToOneLocationHints[taskId.intValue()] = context.getTaskContainer(vertex, taskId);
}
}
// custom edge needs to tell us which of our tasks its connected to
// for now only-built in edges supported
// Check if current source task's vertex is completed.
if (srcInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE
&& srcInfo.numTasks != srcInfo.numFinishedTasks) {
// we depend on all tasks to finish. So nothing to do now.
return;
}
// currently finished vertex task may trigger us to schedule
for (SourceVertexInfo vInfo : srcVertexInfo.values()) {
if (vInfo.edgeProperty.getDataMovementType() != DataMovementType.ONE_TO_ONE) {
// we depend on all tasks to finish.
if (vInfo.numTasks != vInfo.numFinishedTasks) {
// we depend on all tasks to finish. So nothing to do now.
return;
}
}
}
// all source vertices will full dependencies are done
List<TaskWithLocationHint> tasksToStart = null;
if (numOneToOneEdges == 0) {
// no 1-1 dependency. Start all tasks
int numTasks = taskIsStarted.length;
LOG.info("Starting all " + numTasks + "tasks for vertex: " + context.getVertexName());
tasksToStart = Lists.newArrayListWithCapacity(numTasks);
for (int i=0; i<numTasks; ++i) {
taskIsStarted[i] = true;
tasksToStart.add(new TaskWithLocationHint(new Integer(i), null));
}
} else {
// start only the ready 1-1 tasks
tasksToStart = Lists.newLinkedList();
for (int i=0; i<taskIsStarted.length; ++i) {
if (!taskIsStarted[i] && oneToOneSrcTasksDoneCount[i] == numOneToOneEdges) {
taskIsStarted[i] = true;
TaskLocationHint locationHint = null;
if (oneToOneLocationHints[i] != null) {
locationHint = new TaskLocationHint(oneToOneLocationHints[i].getId());
}
LOG.info("Starting task " + i + " for vertex: "
+ context.getVertexName() + " with location: "
+ ((locationHint != null) ? locationHint.getAffinitizedContainer() : "null"));
tasksToStart.add(new TaskWithLocationHint(new Integer(i), locationHint));
}
}
}
if (tasksToStart != null && !tasksToStart.isEmpty()) {
context.scheduleVertexTasks(tasksToStart);
}
}