in runtime/master/src/main/java/org/apache/nemo/runtime/master/RuntimeMaster.java [446:491]
private void handleControlMessage(final ControlMessage.Message message) {
switch (message.getType()) {
case TaskStateChanged:
final ControlMessage.TaskStateChangedMsg taskStateChangedMsg
= message.getTaskStateChangedMsg();
scheduler.onTaskStateReportFromExecutor(taskStateChangedMsg.getExecutorId(),
taskStateChangedMsg.getTaskId(),
taskStateChangedMsg.getAttemptIdx(),
MessageUtils.convertTaskState(taskStateChangedMsg.getState()),
taskStateChangedMsg.getVertexPutOnHoldId(),
MessageUtils.convertFailureCause(taskStateChangedMsg.getFailureCause()));
break;
case ExecutorFailed:
// Executor failed due to user code.
final ControlMessage.ExecutorFailedMsg executorFailedMsg = message.getExecutorFailedMsg();
final String failedExecutorId = executorFailedMsg.getExecutorId();
final Exception exception = SerializationUtils.deserialize(executorFailedMsg.getException().toByteArray());
LOG.error(failedExecutorId + " failed, Stack Trace: ", exception);
throw new RuntimeException(exception);
case RunTimePassMessage:
((BatchScheduler) scheduler).onRunTimePassMessage(
message.getRunTimePassMessageMsg().getTaskId(),
message.getRunTimePassMessageMsg().getEntryList());
break;
case MetricMessageReceived:
final List<ControlMessage.Metric> metricList = message.getMetricMsg().getMetricList();
metricList.forEach(metric -> metricMessageHandler.onMetricMessageReceived(
metric.getMetricType(), metric.getMetricId(),
metric.getMetricField(), metric.getMetricValue().toByteArray()));
break;
case ExecutorDataCollected:
final String serializedData = message.getDataCollected().getData();
clientRPC.send(ControlMessage.DriverToClientMessage.newBuilder()
.setType(ControlMessage.DriverToClientMessageType.DataCollected)
.setDataCollected(ControlMessage.DataCollectMessage.newBuilder().setData(serializedData).build())
.build());
break;
case MetricFlushed:
metricCountDownLatch.countDown();
break;
default:
throw new IllegalMessageException(
new Exception("This message should not be received by Master :" + message.getType()));
}
}