in gobblin-core/src/main/java/org/apache/gobblin/source/extractor/extract/QueryBasedSource.java [388:465]
static Map<SourceEntity, Long> getPreviousWatermarksForAllTables(SourceState state) {
Map<SourceEntity, Long> result = Maps.newHashMap();
Map<SourceEntity, Long> prevLowWatermarksByTable = Maps.newHashMap();
Map<SourceEntity, Long> prevActualHighWatermarksByTable = Maps.newHashMap();
Set<SourceEntity> tablesWithFailedTasks = Sets.newHashSet();
Set<SourceEntity> tablesWithNoUpdatesOnPreviousRun = Sets.newHashSet();
boolean commitOnFullSuccess = JobCommitPolicy.getCommitPolicy(state) == JobCommitPolicy.COMMIT_ON_FULL_SUCCESS;
for (WorkUnitState previousWus : state.getPreviousWorkUnitStates()) {
Optional<SourceEntity> sourceEntity = SourceEntity.fromState(previousWus);
if (!sourceEntity.isPresent()) {
log.warn("Missing source entity for WorkUnit state: " + previousWus);
continue;
}
SourceEntity table = sourceEntity.get();
long lowWm = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
LongWatermark waterMarkObj = previousWus.getWorkunit().getLowWatermark(LongWatermark.class);
// new job state file(version 0.2.1270) , water mark format:
// "watermark.interval.value": "{\"low.watermark.to.json\":{\"value\":20160101000000},\"expected.watermark.to.json\":{\"value\":20160715230234}}",
if(waterMarkObj != null){
lowWm = waterMarkObj.getValue();
}
// job state file(version 0.2.805)
// "workunit.low.water.mark": "20160711000000",
// "workunit.state.runtime.high.water.mark": "20160716140338",
else if(previousWus.getProperties().containsKey(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY)){
lowWm = Long.parseLong(previousWus.getProperties().getProperty(ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY));
log.warn("can not find low water mark in json format, getting value from " + ConfigurationKeys.WORK_UNIT_LOW_WATER_MARK_KEY + " low water mark " + lowWm);
}
if (!prevLowWatermarksByTable.containsKey(table)) {
prevLowWatermarksByTable.put(table, lowWm);
} else {
prevLowWatermarksByTable.put(table, Math.min(prevLowWatermarksByTable.get(table), lowWm));
}
long highWm = ConfigurationKeys.DEFAULT_WATERMARK_VALUE;
waterMarkObj = previousWus.getActualHighWatermark(LongWatermark.class);
if(waterMarkObj != null){
highWm = waterMarkObj.getValue();
}
else if(previousWus.getProperties().containsKey(ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK)){
highWm = Long.parseLong(previousWus.getProperties().getProperty(ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK));
log.warn("can not find high water mark in json format, getting value from " + ConfigurationKeys.WORK_UNIT_STATE_RUNTIME_HIGH_WATER_MARK + " high water mark " + highWm);
}
if (!prevActualHighWatermarksByTable.containsKey(table)) {
prevActualHighWatermarksByTable.put(table, highWm);
} else {
prevActualHighWatermarksByTable.put(table, Math.max(prevActualHighWatermarksByTable.get(table), highWm));
}
if (commitOnFullSuccess && !isSuccessfulOrCommited(previousWus)) {
tablesWithFailedTasks.add(table);
}
if (!isAnyDataProcessed(previousWus)) {
tablesWithNoUpdatesOnPreviousRun.add(table);
}
}
for (Map.Entry<SourceEntity, Long> entry : prevLowWatermarksByTable.entrySet()) {
if (tablesWithFailedTasks.contains(entry.getKey())) {
log.info("Resetting low watermark to {} because previous run failed.", entry.getValue());
result.put(entry.getKey(), entry.getValue());
} else if (tablesWithNoUpdatesOnPreviousRun.contains(entry.getKey())
&& state.getPropAsBoolean(ConfigurationKeys.SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK,
ConfigurationKeys.DEFAULT_SOURCE_QUERYBASED_RESET_EMPTY_PARTITION_WATERMARK)) {
log.info("Resetting low watermakr to {} because previous run processed no data.", entry.getValue());
result.put(entry.getKey(), entry.getValue());
} else {
result.put(entry.getKey(), prevActualHighWatermarksByTable.get(entry.getKey()));
}
}
return result;
}