public void flush()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [437:494]


  public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
    if (LOGGER.isDebugEnabled()) {
      LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter FLUSH");
      for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
        LOGGER.info("Thread(" + Thread.currentThread().getId() + ") FLUSH "
                    + "(topic: " + entry.getKey().topic() +
                    ", partition: " + entry.getKey().partition() + ")");
      }
    }
    initOrRebuildOdps();
    if (multiWriteMode) {
      // 新模式下已经在其他地方进行了writer的close and reset;
      return;
    }
    for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
      TopicPartition partition = entry.getKey();
      MaxComputeSinkWriter writer = writers.get(partition);

      // If the writer is not initialized, there is nothing to commit
      // fast return
      if (writer == null || partitionRecordsNumPerEpoch.get(partition) == 0) {
        LOGGER.debug(String.format("There is %d records to write! continue!",
                                   partitionRecordsNumPerEpoch.get(partition)));
        continue;
      }
      writer.flush();

      // Close writer
      try {
        writer.close();
        partitionRecordsNumPerEpoch.put(partition, 0L);
      } catch (IOException e) {
        LOGGER.error(e.getMessage(), e);
        resetOffset(partition, writer);
      }

      // Update bytes sent
      totalBytesSentByClosedWriters += writer.getTotalBytes();

      // reset tunnel session = null, it will be initialized the next time coming data.
      writer.reset();

//      // Create new writer
//      MaxComputeSinkWriter newWriter = new MaxComputeSinkWriter(
//          odps,
//          project,
//          table,
//          converterBuilder.build(),
//          64,
//          partitionWindowType,
//          tz);
//      writers.put(partition, newWriter);
    }

    LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Total bytes sent: " +
                totalBytesSentByClosedWriters +
                ", elapsed time: " + ((System.currentTimeMillis()) - startTimestamp));
  }