public void put()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [392:422]


  public void put(Collection<SinkRecord> collection) {
    LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter PUT");
    // Epoch second
    long time = System.currentTimeMillis() / 1000;
    initOrRebuildOdps();
    if (collection.isEmpty()) {
      LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter empty put records");
      return;
    } else {
      LOGGER.info("Thread(" + Thread.currentThread().getId() + ") putted records size "
                  + collection.size());
    }
    if (multiWriteMode) {
      try {
        executeMultiWrite(collection);
      } catch (IOException e) {
        throw new RuntimeException(e);
      }
      return;
    }
    for (SinkRecord r : collection) {
      TopicPartition partition = new TopicPartition(r.topic(), r.kafkaPartition());
      MaxComputeSinkWriter writer = writers.get(partition);
      try {
        writer.write(r, time);
        partitionRecordsNumPerEpoch.put(partition, partitionRecordsNumPerEpoch.get(partition) + 1);
      } catch (IOException e) {
        reportRuntimeError(r, e);
      }
    }
  }