in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [529:550]
public void stop() {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter STOP");
for (Entry<TopicPartition, MaxComputeSinkWriter> entry : writers.entrySet()) {
try {
entry.getValue().close();
} catch (IOException e) {
LOGGER.error("Failed to close writer " + entry.getKey().toString() + ":" + e.getMessage(),
e);
resetOffset(entry.getKey(), entry.getValue());
}
}
writers.values().forEach(w -> totalBytesSentByClosedWriters += w.getTotalBytes());
writers.clear();
if (executor != null) {
executor.shutdown();
}
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Total bytes sent: " +
totalBytesSentByClosedWriters +
", elapsed time: " + ((System.currentTimeMillis()) - startTimestamp));
}