in samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java [75:116]
synchronized void update(long timestamp, String taskName) {
long currentTime = systemTimeFunc.getAsLong();
if (taskName != null) {
Long ts = timestamps.get(taskName);
if (ts != null && ts > timestamp) {
LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
timestamp, ts, taskName));
} else {
timestamps.put(taskName, timestamp);
lastUpdateTime.put(taskName, currentTime);
}
}
if (taskName == null) {
// we get watermark either from the source or from the aggregator task
watermarkTime = Math.max(watermarkTime, timestamp);
} else if (canUpdateWatermark(currentTime)) {
long minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);
if (minWatermark <= watermarkTime && watermarkIdleTimeout > 0) {
// Exclude the tasks that have been idle in watermark emission.
long min = Long.MAX_VALUE;
long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
int updateCount = 0;
for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
// Check the update happens before the idle timeout
if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) {
min = Math.min(min, entry.getValue());
updateCount++;
}
}
// Active tasks must exceed the quorum size
minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST;
quorumCount = updateCount;
} else {
quorumCount = timestamps.size();
}
watermarkTime = Math.max(watermarkTime, minWatermark);
}
}