in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [524:562]
public Boolean call() throws RuntimeException {
long time = System.currentTimeMillis() / 1000;
long start = -1;
long end = -1;
processedRecordsEachEcho = 0;
boolean ok = true;
try {
for (SinkRecord record : recordBuffer) {
write(record, time);
if (start == -1) {
start = record.kafkaOffset();
}
end = Math.max(end, record.kafkaOffset());
processedRecordsEachEcho++;
}
} catch (IOException e) {
// tunnel 的波动引起 , 会不断重试
LOGGER.warn("something error in tunnel write,Please check tunnel environment! {}",
e.getMessage());
ok = false;
} catch (RuntimeException e) {
// 数据内部错误,且用户选择不跳过,直接抛给上层框架
LOGGER.error("something error in MaxComputerSinkWriter : " + e.getMessage());
throw new RuntimeException(e);
}
try {
flush();
close();
if (start != -1) {
sinkStatusContext.addOffsetInterval(start, end);
sinkStatusContext.addTotalBytesSentByWriter(getTotalBytes());
}
} catch (IOException e) {
LOGGER.warn("something error in tunnel close,Please check tunnel environment! {}",
e.getMessage());
ok = false;
}
return ok;
}