in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java [142:187]
private void launchTask(final Task task) {
LOG.info("Launch task: {}", task.getTaskId());
try {
final long deserializationStartTime = System.currentTimeMillis();
final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
SerializationUtils.deserialize(task.getSerializedIRDag());
metricMessageSender.send("TaskMetric", task.getTaskId(), "taskDeserializationTime",
SerializationUtils.serialize(System.currentTimeMillis() - deserializationStartTime));
final TaskStateManager taskStateManager =
new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);
task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null)));
irDag.getVertices().forEach(v ->
irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
e.getPropertyValue(CompressionProperty.class).orElse(null),
e.getPropertyValue(DecompressionProperty.class).orElse(null))));
final TaskExecutor executor = new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory,
broadcastManagerWorker, metricMessageSender, persistentConnectionToMasterMap, latencyMarkSendPeriod);
taskExecutorList.add(executor);
executor.execute();
} catch (final Exception e) {
persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
ControlMessage.Message.newBuilder()
.setId(RuntimeIdManager.generateMessageId())
.setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
.setType(ControlMessage.MessageType.ExecutorFailed)
.setExecutorFailedMsg(ControlMessage.ExecutorFailedMsg.newBuilder()
.setExecutorId(executorId)
.setException(ByteString.copyFrom(SerializationUtils.serialize(e)))
.build())
.build());
throw e;
}
}