private void flushRecordBuffer()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [284:299]


  private void flushRecordBuffer() {
    for (Entry<TopicPartition, SinkStatusContext> item : sinkStatus.entrySet()) {
      TopicPartition partition = item.getKey();
      SinkStatusContext curContext = item.getValue();
      List<SinkRecord> left = curContext.getRecordQueue();
      if (!left.isEmpty()) {
        MaxComputeSinkWriter curWriter = genSinkWriter(partition);
        curWriter.setRecordBuffer(left);
        curWriter.setErrorReporter(runtimeErrorWriter);
        curWriter.setSinkStatusContext(curContext);
        curWriter.setSkipError(skipErrorRecords);
        writerTasks.add(executor.submit(curWriter));
        sinkStatus.get(partition).resetRecordQueue();
      }
    }
  }