in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java [137:163]
public void emitLatencymark(final LatencyMark latencymark) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} emits latencymark {}", irVertex.getId(), latencymark);
}
// Emit latencymark to internal vertices
for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
internalVertex.getNextOperator().getTransform().onLatencymark(latencymark);
}
for (final List<NextIntraTaskOperatorInfo> internalVertices : internalAdditionalOutputs.values()) {
for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
internalVertex.getNextOperator().getTransform().onLatencymark(latencymark);
}
}
// Emit latencymark to output writer
for (final OutputWriter outputWriter : externalMainOutputs) {
outputWriter.writeLatencymark(latencymark);
}
for (final List<OutputWriter> externalVertices : externalAdditionalOutputs.values()) {
for (final OutputWriter externalVertex : externalVertices) {
externalVertex.writeLatencymark(latencymark);
}
}
}