in runtime/master/src/main/java/org/apache/nemo/runtime/master/scheduler/SimulationScheduler.java [539:587]
public void send(final ControlMessage.Message message) {
switch (message.getType()) {
// Messages sent to the master
case TaskStateChanged:
final ControlMessage.TaskStateChangedMsg taskStateChangedMsg = message.getTaskStateChangedMsg();
scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
taskStateChangedMsg.getTaskId(),
taskStateChangedMsg.getAttemptIdx(),
MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
taskStateChangedMsg.getVertexPutOnHoldId(),
MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
break;
case ExecutorFailed:
// Executor failed due to user code.
final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
final String failedExecutorId = executorFailedMsg.getExecutorId();
final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
LOG.error(failedExecutorId, " failed, Stack Trace: ", exception);
throw new SimulationException(exception);
case RunTimePassMessage:
scheduler.onRunTimePassMessage(
// TODO #436: Dynamic task resizing.
message.getRunTimePassMessageMsg().getTaskId(),
message.getRunTimePassMessageMsg().getEntryList());
break;
case MetricMessageReceived:
final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
metricList.forEach(metric ->
scheduler.handleMetricMessage(
metric.getMetricType(), metric.getMetricId(),
metric.getMetricField(), metric.getMetricValue().toByteArray()));
break;
// Messages sent to the executor
case ScheduleTask:
final ControlMessage.ScheduleTaskMsg scheduleTaskMsg = message.getScheduleTaskMsg();
final Task task =
SerializationUtils.deserialize(scheduleTaskMsg.getTask().toByteArray());
scheduler.simulatedTaskExecutorMap.get(executorId).onTaskReceived(task);
break;
// No metric messaging in simulation.
case MetricFlushed:
case RequestMetricFlush:
break;
default:
throw new IllegalMessageException(
new Exception("This message should not be received by Master or the Executor :" + message.getType()));
}
}