in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [392:422]
public void put(Collection<SinkRecord> collection) {
LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter PUT");
// Epoch second
long time = System.currentTimeMillis() / 1000;
initOrRebuildOdps();
if (collection.isEmpty()) {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter empty put records");
return;
} else {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") putted records size "
+ collection.size());
}
if (multiWriteMode) {
try {
executeMultiWrite(collection);
} catch (IOException e) {
throw new RuntimeException(e);
}
return;
}
for (SinkRecord r : collection) {
TopicPartition partition = new TopicPartition(r.topic(), r.kafkaPartition());
MaxComputeSinkWriter writer = writers.get(partition);
try {
writer.write(r, time);
partitionRecordsNumPerEpoch.put(partition, partitionRecordsNumPerEpoch.get(partition) + 1);
} catch (IOException e) {
reportRuntimeError(r, e);
}
}
}