in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkWriter.java [142:171]
public void write(SinkRecord sinkRecord, Long timestamp) throws IOException {
if (minOffset == null) {
minOffset = sinkRecord.kafkaOffset();
}
try {
resetUploadSessionIfNeeded(timestamp);
} catch (OdpsException e) {
throw new IOException(e);
}
try {
converter.convert(sinkRecord, reusedRecord);
} catch (Exception e) {
if (errorReporter != null) {
errorReporter.write(sinkRecord);
return;
} else {
if (skipError) {
return;
}
throw new RuntimeException(e);
}
}
if (useStreamingTunnel) {
writeToStreamWriter();
} else {
writeToBatchWriter();
}
}