in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [497:526]
public void close(Collection<TopicPartition> partitions) {
LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter CLOSE");
for (TopicPartition partition : partitions) {
LOGGER.info("Thread(" + Thread.currentThread().getId() +
") CLOSE (topic: " + partition.topic() +
", partition: " + partition.partition() + ")");
}
for (TopicPartition partition : partitions) {
MaxComputeSinkWriter writer = writers.get(partition);
// If the writer is not initialized, there is nothing to commit
if (writer == null) {
continue;
}
try {
writer.close();
} catch (IOException e) {
LOGGER.error("Failed to close writer " + partition.toString() + ":" + e.getMessage(), e);
resetOffset(partition, writer);
}
writers.remove(partition);
totalBytesSentByClosedWriters += writer.getTotalBytes();
}
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Total bytes sent: " +
totalBytesSentByClosedWriters +
", elapsed time: " + ((System.currentTimeMillis()) - startTimestamp));
}