private void launchTask()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/Executor.java [142:187]


  private void launchTask(final Task task) {
    LOG.info("Launch task: {}", task.getTaskId());
    try {
      final long deserializationStartTime = System.currentTimeMillis();
      final DAG<IRVertex, RuntimeEdge<IRVertex>> irDag =
        SerializationUtils.deserialize(task.getSerializedIRDag());
      metricMessageSender.send("TaskMetric", task.getTaskId(), "taskDeserializationTime",
        SerializationUtils.serialize(System.currentTimeMillis() - deserializationStartTime));
      final TaskStateManager taskStateManager =
        new TaskStateManager(task, executorId, persistentConnectionToMasterMap, metricMessageSender);

      task.getTaskIncomingEdges().forEach(e -> serializerManager.register(e.getId(),
        getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
        getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
        e.getPropertyValue(CompressionProperty.class).orElse(null),
        e.getPropertyValue(DecompressionProperty.class).orElse(null)));
      task.getTaskOutgoingEdges().forEach(e -> serializerManager.register(e.getId(),
        getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
        getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
        e.getPropertyValue(CompressionProperty.class).orElse(null),
        e.getPropertyValue(DecompressionProperty.class).orElse(null)));
      irDag.getVertices().forEach(v ->
        irDag.getOutgoingEdgesOf(v).forEach(e -> serializerManager.register(e.getId(),
          getEncoderFactory(e.getPropertyValue(EncoderProperty.class).get()),
          getDecoderFactory(e.getPropertyValue(DecoderProperty.class).get()),
          e.getPropertyValue(CompressionProperty.class).orElse(null),
          e.getPropertyValue(DecompressionProperty.class).orElse(null))));

      final TaskExecutor executor = new TaskExecutor(task, irDag, taskStateManager, intermediateDataIOFactory,
        broadcastManagerWorker, metricMessageSender, persistentConnectionToMasterMap, latencyMarkSendPeriod);
      taskExecutorList.add(executor);
      executor.execute();
    } catch (final Exception e) {
      persistentConnectionToMasterMap.getMessageSender(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID).send(
        ControlMessage.Message.newBuilder()
          .setId(RuntimeIdManager.generateMessageId())
          .setListenerId(MessageEnvironment.RUNTIME_MASTER_MESSAGE_LISTENER_ID)
          .setType(ControlMessage.MessageType.ExecutorFailed)
          .setExecutorFailedMsg(ControlMessage.ExecutorFailedMsg.newBuilder()
            .setExecutorId(executorId)
            .setException(ByteString.copyFrom(SerializationUtils.serialize(e)))
            .build())
          .build());
      throw e;
    }
  }