private void handleControlMessage()

in runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java [446:491]


  private void handleControlMessage(final ControlMessage.Message message) {
    switch (message.getType()) {
      case TaskStateChanged:
        final ControlMessage.TaskStateChangedMsg taskStateChangedMsg
          = message.getTaskStateChangedMsg();

        scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
          taskStateChangedMsg.getTaskId(),
          taskStateChangedMsg.getAttemptIdx(),
          MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
          taskStateChangedMsg.getVertexPutOnHoldId(),
          MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
        break;
      case ExecutorFailed:
        // Executor failed due to user code.
        final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
        final String failedExecutorId = executorFailedMsg.getExecutorId();
        final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
        LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
        throw new RuntimeException(exception);
      case RunTimePassMessage:
        ((BatchScheduler) scheduler).onRunTimePassMessage(
          message.getRunTimePassMessageMsg().getTaskId(),
          message.getRunTimePassMessageMsg().getEntryList());
        break;
      case MetricMessageReceived:
        final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
        metricList.forEach(metric -> metricMessageHandler.onMetricMessageReceived(
            metric.getMetricType(), metric.getMetricId(),
            metric.getMetricField(), metric.getMetricValue().toByteArray()));
        break;
      case ExecutorDataCollected:
        final String serializedData = message.getDataCollected().getData();
        clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
          .setType(ControlMessage.DriverToClientMessageType.DataCollected)
          .setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
          .build());
        break;
      case MetricFlushed:
        metricCountDownLatch.countDown();
        break;
      default:
        throw new IllegalMessageException(
          new Exception("This message should not be received by Master :" + message.getType()));
    }
  }