public void close()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [497:526]


  public void close(Collection<TopicPartition> partitions) {
    LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter CLOSE");
    for (TopicPartition partition : partitions) {
      LOGGER.info("Thread(" + Thread.currentThread().getId() +
                  ") CLOSE (topic: " + partition.topic() +
                  ", partition: " + partition.partition() + ")");
    }

    for (TopicPartition partition : partitions) {
      MaxComputeSinkWriter writer = writers.get(partition);

      // If the writer is not initialized, there is nothing to commit
      if (writer == null) {
        continue;
      }

      try {
        writer.close();
      } catch (IOException e) {
        LOGGER.error("Failed to close writer " + partition.toString() + ":" + e.getMessage(), e);
        resetOffset(partition, writer);
      }

      writers.remove(partition);
      totalBytesSentByClosedWriters += writer.getTotalBytes();
    }
    LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Total bytes sent: " +
                totalBytesSentByClosedWriters +
                ", elapsed time: " + ((System.currentTimeMillis()) - startTimestamp));
  }