in gobblin-modules/gobblin-kafka-common/src/main/java/org/apache/gobblin/source/extractor/extract/kafka/KafkaSource.java [544:676]
private WorkUnit getWorkUnitForTopicPartition(KafkaPartition partition, SourceState state,
Optional<State> topicSpecificState, Offsets offsets, boolean failedToGetKafkaOffsets) {
long previousOffset = 0;
long previousOffsetFetchEpochTime = 0;
boolean previousOffsetNotFound = false;
try {
previousOffset = getPreviousOffsetForPartition(partition, state);
offsets.setPreviousEndOffset(previousOffset);
offsets.setPreviousStartOffset(getPreviousLowWatermark(partition, state));
offsets.setPreviousStartFetchEpochTime(getPreviousStartFetchEpochTimeForPartition(partition, state));
offsets.setPreviousStopFetchEpochTime(getPreviousStopFetchEpochTimeForPartition(partition, state));
offsets.setPreviousLatestOffset(getPreviousExpectedHighWatermark(partition, state));
previousOffsetFetchEpochTime = getPreviousOffsetFetchEpochTimeForPartition(partition, state);
offsets.setPreviousOffsetFetchEpochTime(previousOffsetFetchEpochTime);
} catch (PreviousOffsetNotFoundException e) {
previousOffsetNotFound = true;
}
if (failedToGetKafkaOffsets) {
// Increment counts, which will be reported as job metrics
this.failToGetOffsetCount.incrementAndGet();
// When unable to get earliest/latest offsets from Kafka, skip the partition and create an empty workunit,
// so that previousOffset is persisted.
LOG.warn(String
.format("Failed to retrieve earliest and/or latest offset for partition %s. This partition will be skipped.",
partition));
return previousOffsetNotFound ? null : createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime,
topicSpecificState);
}
if (shouldMoveToLatestOffset(partition, state)) {
offsets.startAtLatestOffset();
} else if (previousOffsetNotFound) {
/**
* When previous offset cannot be found, either start at earliest offset, latest offset, go back with (latest - lookback)
* (long value to be deducted from latest offset in order to avoid data loss) or skip the partition
* (no need to create an empty workunit in this case since there's no offset to persist).
* In case of no previous state OFFSET_LOOKBACK will make sure to avoid consuming huge amount of data (earlist) and data loss (latest offset)
* lookback can be set to any long value where (latest-lookback) is nearest offset for each partition. If computed offset is out of range then
* partition will be consumed from latest offset
**/
String offsetNotFoundMsg = String.format("Previous offset for partition %s does not exist. ", partition);
String offsetOption = state.getProp(BOOTSTRAP_WITH_OFFSET, DEFAULT_BOOTSTRAP_WITH_OFFSET).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET)) {
LOG.warn(offsetNotFoundMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET)) {
LOG.warn(
offsetNotFoundMsg + "This partition will start from the earliest offset: " + offsets.getEarliestOffset());
offsets.startAtEarliestOffset();
} else if (offsetOption.equals(OFFSET_LOOKBACK)) {
long lookbackOffsetRange = state.getPropAsLong(KAFKA_OFFSET_LOOKBACK , 0L);
long latestOffset = offsets.getLatestOffset();
long offset = latestOffset - lookbackOffsetRange;
LOG.warn(offsetNotFoundMsg + "This partition will start from latest-lookback [ " + latestOffset + " - " + lookbackOffsetRange + " ] start offset: " + offset);
try {
offsets.startAt(offset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}
// When above computed offset (latest-lookback) is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
else {
LOG.warn(offsetNotFoundMsg + "This partition will be skipped.");
return null;
}
} else {
try {
offsets.startAt(previousOffset);
} catch (StartOffsetOutOfRangeException e) {
// Increment counts, which will be reported as job metrics
if (offsets.getStartOffset() <= offsets.getLatestOffset()) {
this.offsetTooEarlyCount.incrementAndGet();
} else {
this.offsetTooLateCount.incrementAndGet();
}
// When previous offset is out of range, either start at earliest, latest or nearest offset, or skip the
// partition. If skipping, need to create an empty workunit so that previousOffset is persisted.
String offsetOutOfRangeMsg = String.format(
"Start offset for partition %s is out of range. Start offset = %d, earliest offset = %d, latest offset = %d.",
partition, offsets.getStartOffset(), offsets.getEarliestOffset(), offsets.getLatestOffset());
String offsetOption =
state.getProp(RESET_ON_OFFSET_OUT_OF_RANGE, DEFAULT_RESET_ON_OFFSET_OUT_OF_RANGE).toLowerCase();
if (offsetOption.equals(LATEST_OFFSET) || (offsetOption.equals(NEAREST_OFFSET)
&& offsets.getStartOffset() >= offsets.getLatestOffset())) {
LOG.warn(
offsetOutOfRangeMsg + "This partition will start from the latest offset: " + offsets.getLatestOffset());
offsets.startAtLatestOffset();
} else if (offsetOption.equals(EARLIEST_OFFSET) || offsetOption.equals(NEAREST_OFFSET)) {
LOG.warn(offsetOutOfRangeMsg + "This partition will start from the earliest offset: " + offsets
.getEarliestOffset());
offsets.startAtEarliestOffset();
} else {
LOG.warn(offsetOutOfRangeMsg + "This partition will be skipped.");
return createEmptyWorkUnit(partition, previousOffset, previousOffsetFetchEpochTime, topicSpecificState);
}
}
}
WorkUnit workUnit = getWorkUnitForTopicPartition(partition, offsets, topicSpecificState);
addSourceStatePropsToWorkUnit(workUnit, state);
return workUnit;
}