private void onEventFromDataFetcher()

in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java [476:520]


  private void onEventFromDataFetcher(final Object event,
                                      final DataFetcher dataFetcher) {
    if (event instanceof Finishmark) {
      // We've consumed all the data from this data fetcher.
      if (dataFetcher instanceof SourceVertexDataFetcher) {
        boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
      } else if (dataFetcher instanceof ParentTaskDataFetcher) {
        serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
        encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
      } else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
        serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
        encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
      }
    } else if (event instanceof LatencyMark) {
      LatencyMark latencymark = (LatencyMark) event;
      long currTimestamp = System.currentTimeMillis();

      // send latencyMetric to RuntimeMaster
      LatencyMetric metric = new LatencyMetric(latencymark, currTimestamp);
      if (metric.getLatency() > 0) {
        metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", SerializationUtils.serialize(metric));
      }

      long latestSentTimestamp = latestSentLatencymarkTimestamp.getOrDefault(latencymark.getCreatedTaskId(), -1L);
      if (latestSentTimestamp < latencymark.getCreatedTimestamp()) {
        latestSentLatencymarkTimestamp.put(latencymark.getCreatedTaskId(), latencymark.getCreatedTimestamp());

        // set previousTaskId and timestamp of latencymark for next task.
        latencymark.setPreviousTaskId(taskId);
        latencymark.setPreviousSentTimestamp(currTimestamp);

        // process latencymark for downstream tasks
        processLatencymark(dataFetcher.getOutputCollector(), latencymark);
      }
    } else if (event instanceof Watermark) {
      // Watermark
      processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
    } else {
      // Process data element
      processElement(dataFetcher.getOutputCollector(), event);

      // increase the number of read tuples
      numOfReadTupleMap.get(dataFetcher.getDataSource().getId()).incrementAndGet();
    }
  }