in runtime/executor/src/main/java/org/apache/nemo/runtime/executor/datatransfer/OperatorVertexOutputCollector.java [108:134]
public void emitWatermark(final Watermark watermark) {
if (LOG.isDebugEnabled()) {
LOG.debug("{} emits watermark {}", irVertex.getId(), watermark);
}
// Emit watermarks to internal vertices
for (final NextIntraTaskOperatorInfo internalVertex : internalMainOutputs) {
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
}
for (final List<NextIntraTaskOperatorInfo> internalVertices : internalAdditionalOutputs.values()) {
for (final NextIntraTaskOperatorInfo internalVertex : internalVertices) {
internalVertex.getWatermarkManager().trackAndEmitWatermarks(internalVertex.getEdgeIndex(), watermark);
}
}
// Emit watermarks to output writer
for (final OutputWriter outputWriter : externalMainOutputs) {
outputWriter.writeWatermark(watermark);
}
for (final List<OutputWriter> externalVertices : externalAdditionalOutputs.values()) {
for (final OutputWriter externalVertex : externalVertices) {
externalVertex.writeWatermark(watermark);
}
}
}