in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/task/TaskExecutor.java [476:520]
private void onEventFromDataFetcher(final Object event,
final DataFetcher dataFetcher) {
if (event instanceof Finishmark) {
// We've consumed all the data from this data fetcher.
if (dataFetcher instanceof SourceVertexDataFetcher) {
boundedSourceReadTime += ((SourceVertexDataFetcher) dataFetcher).getBoundedSourceReadTime();
} else if (dataFetcher instanceof ParentTaskDataFetcher) {
serializedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getSerializedBytes();
encodedReadBytes += ((ParentTaskDataFetcher) dataFetcher).getEncodedBytes();
} else if (dataFetcher instanceof MultiThreadParentTaskDataFetcher) {
serializedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getSerializedBytes();
encodedReadBytes += ((MultiThreadParentTaskDataFetcher) dataFetcher).getEncodedBytes();
}
} else if (event instanceof LatencyMark) {
LatencyMark latencymark = (LatencyMark) event;
long currTimestamp = System.currentTimeMillis();
// send latencyMetric to RuntimeMaster
LatencyMetric metric = new LatencyMetric(latencymark, currTimestamp);
if (metric.getLatency() > 0) {
metricMessageSender.send(TASK_METRIC_ID, taskId, "latencymark", SerializationUtils.serialize(metric));
}
long latestSentTimestamp = latestSentLatencymarkTimestamp.getOrDefault(latencymark.getCreatedTaskId(), -1L);
if (latestSentTimestamp < latencymark.getCreatedTimestamp()) {
latestSentLatencymarkTimestamp.put(latencymark.getCreatedTaskId(), latencymark.getCreatedTimestamp());
// set previousTaskId and timestamp of latencymark for next task.
latencymark.setPreviousTaskId(taskId);
latencymark.setPreviousSentTimestamp(currTimestamp);
// process latencymark for downstream tasks
processLatencymark(dataFetcher.getOutputCollector(), latencymark);
}
} else if (event instanceof Watermark) {
// Watermark
processWatermark(dataFetcher.getOutputCollector(), (Watermark) event);
} else {
// Process data element
processElement(dataFetcher.getOutputCollector(), event);
// increase the number of read tuples
numOfReadTupleMap.get(dataFetcher.getDataSource().getId()).incrementAndGet();
}
}