in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [284:299]
private void flushRecordBuffer() {
for (Entry<TopicPartition, SinkStatusContext> item : sinkStatus.entrySet()) {
TopicPartition partition = item.getKey();
SinkStatusContext curContext = item.getValue();
List<SinkRecord> left = curContext.getRecordQueue();
if (!left.isEmpty()) {
MaxComputeSinkWriter curWriter = genSinkWriter(partition);
curWriter.setRecordBuffer(left);
curWriter.setErrorReporter(runtimeErrorWriter);
curWriter.setSinkStatusContext(curContext);
curWriter.setSkipError(skipErrorRecords);
writerTasks.add(executor.submit(curWriter));
sinkStatus.get(partition).resetRecordQueue();
}
}
}