synchronized void update()

in samza-core/src/main/java/org/apache/samza/operators/impl/WatermarkStates.java [75:116]


    synchronized void update(long timestamp, String taskName) {
      long currentTime = systemTimeFunc.getAsLong();
      if (taskName != null) {
        Long ts = timestamps.get(taskName);
        if (ts != null && ts > timestamp) {
          LOG.warn(String.format("Incoming watermark %s is smaller than existing watermark %s for upstream task %s",
              timestamp, ts, taskName));
        } else {
          timestamps.put(taskName, timestamp);
          lastUpdateTime.put(taskName, currentTime);
        }
      }

      if (taskName == null) {
        // we get watermark either from the source or from the aggregator task
        watermarkTime = Math.max(watermarkTime, timestamp);
      } else if (canUpdateWatermark(currentTime)) {
        long minWatermark = timestamps.values().stream().min(Long::compare).orElse(timestamp);

        if (minWatermark <= watermarkTime && watermarkIdleTimeout > 0) {
          // Exclude the tasks that have been idle in watermark emission.
          long min = Long.MAX_VALUE;
          long watermarkIdleThreshold = currentTime - watermarkIdleTimeout;
          int updateCount = 0;
          for (Map.Entry<String, Long> entry : timestamps.entrySet()) {
            // Check the update happens before the idle timeout
            if (lastUpdateTime.get(entry.getKey()) > watermarkIdleThreshold) {
              min = Math.min(min, entry.getValue());
              updateCount++;
            }
          }

          // Active tasks must exceed the quorum size
          minWatermark = (updateCount >= quorumSize && min != Long.MAX_VALUE) ? min : WATERMARK_NOT_EXIST;
          quorumCount = updateCount;
        } else {
          quorumCount = timestamps.size();
        }

        watermarkTime = Math.max(watermarkTime, minWatermark);
      }
    }