public final boolean processMetricMessage()

in runtime/common/src/main/java/org/apache/nemo/runtime/common/metric/TaskMetric.java [297:367]


  public final boolean processMetricMessage(final String metricField, final byte[] metricValue) {
    LOG.debug("metric {} has just arrived!", metricField);
    switch (metricField) {
      case "streamMetric":
        setStreamMetric(SerializationUtils.deserialize(metricValue));
        break;
      case "latencymark":
        setLatencyMetric(SerializationUtils.deserialize(metricValue));
        break;
      case "taskDuration":
        setTaskDuration(SerializationUtils.deserialize(metricValue));
        break;
      case "schedulingOverhead":
        setSchedulingOverhead(SerializationUtils.deserialize(metricValue));
        break;
      case "serializedReadBytes":
        setSerializedReadBytes(SerializationUtils.deserialize(metricValue));
        break;
      case "encodedReadBytes":
        setEncodedReadBytes(SerializationUtils.deserialize(metricValue));
        break;
      case "boundedSourceReadTime":
        setBoundedSourceReadTime(SerializationUtils.deserialize(metricValue));
        break;
      case "taskOutputBytes":
        setTaskOutputBytes(SerializationUtils.deserialize(metricValue));
        break;
      case "taskDeserializationTime":
        setTaskDeserializationTime(SerializationUtils.deserialize(metricValue));
        break;
      case "stateTransitionEvent":
        final StateTransitionEvent<TaskState.State> newStateTransitionEvent =
          SerializationUtils.deserialize(metricValue);
        addEvent(newStateTransitionEvent);
        break;
      case "scheduleAttempt":
        setScheduleAttempt(SerializationUtils.deserialize(metricValue));
        break;
      case "containerId":
        setContainerId(SerializationUtils.deserialize(metricValue));
        break;
      case "taskCPUTime":
        setTaskCPUTime(SerializationUtils.deserialize(metricValue));
        break;
      case "taskSerializationTime":
        setTaskSerializationTime(SerializationUtils.deserialize(metricValue));
        break;
      case "peakExecutionMemory":
        setPeakExecutionMemory(SerializationUtils.deserialize(metricValue));
        break;
      case "taskSizeRatio":
        setTaskSizeRatio(SerializationUtils.deserialize(metricValue));
        break;
      case "shuffleReadBytes":
        setShuffleReadBytes(SerializationUtils.deserialize(metricValue));
        break;
      case "shuffleReadTime":
        setShuffleReadTime(SerializationUtils.deserialize(metricValue));
        break;
      case "shuffleWriteBytes":
        setShuffleWriteBytes(SerializationUtils.deserialize(metricValue));
        break;
      case "shuffleWriteTime":
        setShuffleWriteTime(SerializationUtils.deserialize(metricValue));
        break;
      default:
        LOG.warn("metricField {} is not supported.", metricField);
        return false;
    }
    return true;
  }