in tez-runtime-internals/src/main/java/org/apache/tez/runtime/LogicalIOProcessorRuntimeTask.java [168:300]
public void initialize() throws Exception {
LOG.info("Initializing LogicalProcessorIORuntimeTask");
Preconditions.checkState(this.state == State.NEW, "Already initialized");
this.state = State.INITED;
int numTasks = 0;
int inputIndex = 0;
for (InputSpec inputSpec : taskSpec.getInputs()) {
this.initializerCompletionService.submit(
new InitializeInputCallable(inputSpec, inputIndex++));
numTasks++;
}
int outputIndex = 0;
for (OutputSpec outputSpec : taskSpec.getOutputs()) {
this.initializerCompletionService.submit(
new InitializeOutputCallable(outputSpec, outputIndex++));
numTasks++;
}
// Initialize processor in the current thread.
initializeLogicalIOProcessor();
int completedTasks = 0;
while (completedTasks < numTasks) {
LOG.info("Waiting for " + (numTasks-completedTasks) + " initializers to finish");
Future<Void> future = initializerCompletionService.take();
try {
future.get();
completedTasks++;
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
} else {
throw new Exception(e);
}
}
}
LOG.info("All initializers finished");
// group inputs depend on inputs beings initialized. So must be done after.
initializeGroupInputs();
// Register the groups so that appropriate calls can be made.
this.inputReadyTracker
.setGroupedInputs(groupInputsMap == null ? null : groupInputsMap.values());
// Grouped input start will be controlled by the start of the GroupedInput
// Construct the set of groupedInputs up front so that start is not invoked on them.
Set<String> groupInputs = Sets.newHashSet();
// Construct Inputs/Outputs map argument for processor.run()
// first add the group inputs
if (groupInputSpecs !=null && !groupInputSpecs.isEmpty()) {
for (GroupInputSpec groupInputSpec : groupInputSpecs) {
runInputMap.put(groupInputSpec.getGroupName(),
groupInputsMap.get(groupInputSpec.getGroupName()));
groupInputs.addAll(groupInputSpec.getGroupVertices());
}
}
initialMemoryDistributor.makeInitialAllocations();
LOG.info("Starting Inputs/Outputs");
int numAutoStarts = 0;
for (InputSpec inputSpec : inputSpecs) {
if (groupInputs.contains(inputSpec.getSourceVertexName())) {
LOG.info("Ignoring " + inputSpec.getSourceVertexName()
+ " for start, since it will be controlled via it's Group");
continue;
}
if (!inputAlreadyStarted(taskSpec.getVertexName(), inputSpec.getSourceVertexName())) {
startedInputsMap.put(taskSpec.getVertexName(), inputSpec.getSourceVertexName());
numAutoStarts++;
this.initializerCompletionService.submit(new StartInputCallable(inputsMap.get(inputSpec
.getSourceVertexName()), inputSpec.getSourceVertexName()));
LOG.info("Input: " + inputSpec.getSourceVertexName()
+ " being auto started by the framework. Subsequent instances will not be auto-started");
}
}
if (groupInputSpecs != null) {
for (GroupInputSpec group : groupInputSpecs) {
if (!inputAlreadyStarted(taskSpec.getVertexName(), group.getGroupName())) {
numAutoStarts++;
this.initializerCompletionService.submit(new StartInputCallable(groupInputsMap.get(group
.getGroupName()), group.getGroupName()));
LOG.info("InputGroup: " + group.getGroupName()
+ " being auto started by the framework. Subsequent instance will not be auto-started");
}
}
}
// Shutdown after all tasks complete.
this.initializerExecutor.shutdown();
completedTasks = 0;
LOG.info("Num IOs determined for AutoStart: " + numAutoStarts);
while (completedTasks < numAutoStarts) {
LOG.info("Waiting for " + (numAutoStarts - completedTasks) + " IOs to start");
Future<Void> future = initializerCompletionService.take();
try {
future.get();
completedTasks++;
} catch (ExecutionException e) {
if (e.getCause() instanceof Exception) {
throw (Exception) e.getCause();
} else {
throw new Exception(e);
}
}
}
LOG.info("AutoStartComplete");
// then add the non-grouped inputs
for (InputSpec inputSpec : inputSpecs) {
if (!groupInputs.contains(inputSpec.getSourceVertexName())) {
LogicalInput input = inputsMap.get(inputSpec.getSourceVertexName());
runInputMap.put(inputSpec.getSourceVertexName(), input);
}
}
for (OutputSpec outputSpec : outputSpecs) {
LogicalOutput output = outputsMap.get(outputSpec.getDestinationVertexName());
String outputName = outputSpec.getDestinationVertexName();
runOutputMap.put(outputName, output);
}
// TODO Maybe close initialized inputs / outputs in case of failure to
// initialize.
startRouterThread();
}