in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [357:389]
private void executeMultiWrite(Collection<SinkRecord> collection)
throws IOException {
// use multi-thread to sink data to MC ; decouple from origin mode
if (LOGGER.isDebugEnabled()) {
LOGGER.debug("Enter multiWriterExecute!~");
}
for (SinkRecord r : collection) {
TopicPartition partition = new TopicPartition(r.topic(), r.kafkaPartition());
if (!sinkStatus.containsKey(partition)) {
sinkStatus.put(partition,
new SinkStatusContext(r.kafkaOffset() - 1, new ArrayList<>()));
}
if (sinkStatus.get(partition).containsOffset(r.kafkaOffset())) {
continue;
}
List<SinkRecord> cur = sinkStatus.get(partition).getRecordQueue();
// 需要记录当前收到的分区 offset 最小值
cur.add(r);
if (cur.size() >= batchSize) {
// 从writerPool中获得writer
MaxComputeSinkWriter curWriter = genSinkWriter(partition);
curWriter.setRecordBuffer(cur);
curWriter.setErrorReporter(runtimeErrorWriter);
curWriter.setSinkStatusContext(sinkStatus.get(partition));
curWriter.setSkipError(skipErrorRecords);
writerTasks.add(executor.submit(curWriter));
sinkStatus.get(partition).resetRecordQueue();
}
}
if (needSyncCommit) {
context.requestCommit();
}
}