static Map getPreviousWatermarksForAllTables()

in gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java [388:465]


  static Map<SourceEntity, Long> getPreviousWatermarksForAllTables(SourceState state) {
    Map<SourceEntity, Long> result = Maps.newHashMap();
    Map<SourceEntity, Long> prevLowWatermarksByTable = Maps.newHashMap();
    Map<SourceEntity, Long> prevActualHighWatermarksByTable = Maps.newHashMap();
    Set<SourceEntity> tablesWithFailedTasks = Sets.newHashSet();
    Set<SourceEntity> tablesWithNoUpdatesOnPreviousRun = Sets.newHashSet();
    boolean commitOnFullSuccess = JobCommitPolicy.getCommitPolicy(state) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;

    for (WorkUnitState previousWus : state.getPreviousWorkUnitStates()) {
      Optional<SourceEntity> sourceEntity = SourceEntity.fromState(previousWus);
      if (!sourceEntity.isPresent()) {
        log.warn("Missing source entity for WorkUnit state: " + previousWus);
        continue;
      }
      SourceEntity table = sourceEntity.get();

      long lowWm = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
      LongWatermark waterMarkObj = previousWus.getWorkunit().getLowWatermark(LongWatermark.class);
      // new job state file(version 0.2.1270) , water mark format:
      // "watermark.interval.value": "{\"low.watermark.to.json\":{\"value\":20160101000000},\"expected.watermark.to.json\":{\"value\":20160715230234}}",
      if(waterMarkObj != null){
        lowWm = waterMarkObj.getValue();
      }
      // job state file(version 0.2.805)
      // "workunit.low.water.mark": "20160711000000",
      // "workunit.state.runtime.high.water.mark": "20160716140338",
      else if(previousWus.getProperties().containsKey(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY)){
        lowWm = Long.parseLong(previousWus.getProperties().getProperty(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY));
        log.warn("can not find low water mark in json format, getting value from " + ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY + " low water mark " + lowWm);
      }

      if (!prevLowWatermarksByTable.containsKey(table)) {
        prevLowWatermarksByTable.put(table, lowWm);
      } else {
        prevLowWatermarksByTable.put(table, Math.min(prevLowWatermarksByTable.get(table), lowWm));
      }

      long highWm = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
      waterMarkObj = previousWus.getActualHighWatermark(LongWatermark.class);
      if(waterMarkObj != null){
        highWm = waterMarkObj.getValue();
      }
      else if(previousWus.getProperties().containsKey(ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK)){
        highWm = Long.parseLong(previousWus.getProperties().getProperty(ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK));
        log.warn("can not find high water mark in json format, getting value from " + ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK + " high water mark " + highWm);
      }

      if (!prevActualHighWatermarksByTable.containsKey(table)) {
        prevActualHighWatermarksByTable.put(table, highWm);
      } else {
        prevActualHighWatermarksByTable.put(table, Math.max(prevActualHighWatermarksByTable.get(table), highWm));
      }

      if (commitOnFullSuccess && !isSuccessfulOrCommited(previousWus)) {
        tablesWithFailedTasks.add(table);
      }

      if (!isAnyDataProcessed(previousWus)) {
        tablesWithNoUpdatesOnPreviousRun.add(table);
      }
    }

    for (Map.Entry<SourceEntity, Long> entry : prevLowWatermarksByTable.entrySet()) {
      if (tablesWithFailedTasks.contains(entry.getKey())) {
        log.info("Resetting low watermark to {} because previous run failed.", entry.getValue());
        result.put(entry.getKey(), entry.getValue());
      } else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey())
          && state.getPropAsBoolean(ConfigurationKeys.SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK,
          ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK)) {
        log.info("Resetting low watermakr to {} because previous run processed no data.", entry.getValue());
        result.put(entry.getKey(), entry.getValue());
      } else {
        result.put(entry.getKey(), prevActualHighWatermarksByTable.get(entry.getKey()));
      }
    }

    return result;
  }