private void executeMultiWrite()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [357:389]


  private void executeMultiWrite(Collection<SinkRecord> collection)
      throws IOException {
    // use multi-thread to sink data to MC ; decouple from origin mode
    if (LOGGER.isDebugEnabled()) {
      LOGGER.debug("Enter multiWriterExecute!~");
    }
    for (SinkRecord r : collection) {
      TopicPartition partition = new TopicPartition(r.topic(), r.kafkaPartition());
      if (!sinkStatus.containsKey(partition)) {
        sinkStatus.put(partition,
                       new SinkStatusContext(r.kafkaOffset() - 1, new ArrayList<>()));
      }
      if (sinkStatus.get(partition).containsOffset(r.kafkaOffset())) {
        continue;
      }
      List<SinkRecord> cur = sinkStatus.get(partition).getRecordQueue();
      // 需要记录当前收到的分区 offset 最小值
      cur.add(r);
      if (cur.size() >= batchSize) {
        // 从writerPool中获得writer
        MaxComputeSinkWriter curWriter = genSinkWriter(partition);
        curWriter.setRecordBuffer(cur);
        curWriter.setErrorReporter(runtimeErrorWriter);
        curWriter.setSinkStatusContext(sinkStatus.get(partition));
        curWriter.setSkipError(skipErrorRecords);
        writerTasks.add(executor.submit(curWriter));
        sinkStatus.get(partition).resetRecordQueue();
      }
    }
    if (needSyncCommit) {
      context.requestCommit();
    }
  }