public void send()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java [539:587]


    public void send(final ControlMessage.Message message) {
      switch (message.getType()) {
        // Messages sent to the master
        case TaskStateChanged:
          final ControlMessage.TaskStateChangedMsg taskStateChangedMsg = message.getTaskStateChangedMsg();

          scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
            taskStateChangedMsg.getTaskId(),
            taskStateChangedMsg.getAttemptIdx(),
            MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
            taskStateChangedMsg.getVertexPutOnHoldId(),
            MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
          break;
        case ExecutorFailed:
          // Executor failed due to user code.
          final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
          final String failedExecutorId = executorFailedMsg.getExecutorId();
          final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
          LOG.error(failedExecutorId, " failed, Stack Trace: ", exception);
          throw new SimulationException(exception);
        case RunTimePassMessage:
          scheduler.onRunTimePassMessage(
            // TODO #436: Dynamic task resizing.
            message.getRunTimePassMessageMsg().getTaskId(),
            message.getRunTimePassMessageMsg().getEntryList());
          break;
        case MetricMessageReceived:
          final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
          metricList.forEach(metric ->
            scheduler.handleMetricMessage(
              metric.getMetricType(), metric.getMetricId(),
              metric.getMetricField(), metric.getMetricValue().toByteArray()));
          break;
        //  Messages sent to the executor
        case ScheduleTask:
          final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
          final Task task =
            SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
          scheduler.simulatedTaskExecutorMap.get(executorId).onTaskReceived(task);
          break;
        // No metric messaging in simulation.
        case MetricFlushed:
        case RequestMetricFlush:
          break;
        default:
          throw new IllegalMessageException(
            new Exception("This message should not be received by Master or the Executor :" + message.getType()));
      }
    }