in src/main/java/org/apache/doris/kafka/connector/writer/StreamLoadWriter.java [88:103]
public void fetchOffset() {
Map<String, String> label2Status = fetchLabel2Status();
long maxOffset = -1;
for (Map.Entry<String, String> entry : label2Status.entrySet()) {
String label = entry.getKey();
String status = entry.getValue();
if (status.equalsIgnoreCase("VISIBLE")) {
long offset = FileNameUtils.labelToEndOffset(label);
if (offset > maxOffset) {
maxOffset = offset;
}
}
}
this.offsetPersistedInDoris.set(maxOffset);
LOG.info("init topic {} partition {} offset {}", topic, partition, maxOffset);
}