in runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java [297:367]
public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
LOG.debug("metric {} has just arrived!", metricField);
switch (metricField) {
case "streamMetric":
setStreamMetric(SerializationUtils.deserialize(metricValue));
break;
case "latencymark":
setLatencyMetric(SerializationUtils.deserialize(metricValue));
break;
case "taskDuration":
setTaskDuration(SerializationUtils.deserialize(metricValue));
break;
case "schedulingOverhead":
setSchedulingOverhead(SerializationUtils.deserialize(metricValue));
break;
case "serializedReadBytes":
setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
break;
case "encodedReadBytes":
setEncodedReadBytes(SerializationUtils.deserialize(metricValue));
break;
case "boundedSourceReadTime":
setBoundedSourceReadTime(SerializationUtils.deserialize(metricValue));
break;
case "taskOutputBytes":
setTaskOutputBytes(SerializationUtils.deserialize(metricValue));
break;
case "taskDeserializationTime":
setTaskDeserializationTime(SerializationUtils.deserialize(metricValue));
break;
case "stateTransitionEvent":
final StateTransitionEvent<TaskState.State> newStateTransitionEvent =
SerializationUtils.deserialize(metricValue);
addEvent(newStateTransitionEvent);
break;
case "scheduleAttempt":
setScheduleAttempt(SerializationUtils.deserialize(metricValue));
break;
case "containerId":
setContainerId(SerializationUtils.deserialize(metricValue));
break;
case "taskCPUTime":
setTaskCPUTime(SerializationUtils.deserialize(metricValue));
break;
case "taskSerializationTime":
setTaskSerializationTime(SerializationUtils.deserialize(metricValue));
break;
case "peakExecutionMemory":
setPeakExecutionMemory(SerializationUtils.deserialize(metricValue));
break;
case "taskSizeRatio":
setTaskSizeRatio(SerializationUtils.deserialize(metricValue));
break;
case "shuffleReadBytes":
setShuffleReadBytes(SerializationUtils.deserialize(metricValue));
break;
case "shuffleReadTime":
setShuffleReadTime(SerializationUtils.deserialize(metricValue));
break;
case "shuffleWriteBytes":
setShuffleWriteBytes(SerializationUtils.deserialize(metricValue));
break;
case "shuffleWriteTime":
setShuffleWriteTime(SerializationUtils.deserialize(metricValue));
break;
default:
LOG.warn("metricField {} is not supported.", metricField);
return false;
}
return true;
}