in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [437:494]
public void flush(Map<TopicPartition, OffsetAndMetadata> map) {
if (LOGGER.isDebugEnabled()) {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Enter FLUSH");
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") FLUSH "
+ "(topic: " + entry.getKey().topic() +
", partition: " + entry.getKey().partition() + ")");
}
}
initOrRebuildOdps();
if (multiWriteMode) {
// 新模式下已经在其他地方进行了writer的close and reset;
return;
}
for (Entry<TopicPartition, OffsetAndMetadata> entry : map.entrySet()) {
TopicPartition partition = entry.getKey();
MaxComputeSinkWriter writer = writers.get(partition);
// If the writer is not initialized, there is nothing to commit
// fast return
if (writer == null || partitionRecordsNumPerEpoch.get(partition) == 0) {
LOGGER.debug(String.format("There is %d records to write! continue!",
partitionRecordsNumPerEpoch.get(partition)));
continue;
}
writer.flush();
// Close writer
try {
writer.close();
partitionRecordsNumPerEpoch.put(partition, 0L);
} catch (IOException e) {
LOGGER.error(e.getMessage(), e);
resetOffset(partition, writer);
}
// Update bytes sent
totalBytesSentByClosedWriters += writer.getTotalBytes();
// reset tunnel session = null, it will be initialized the next time coming data.
writer.reset();
// // Create new writer
// MaxComputeSinkWriter newWriter = new MaxComputeSinkWriter(
// odps,
// project,
// table,
// converterBuilder.build(),
// 64,
// partitionWindowType,
// tz);
// writers.put(partition, newWriter);
}
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") Total bytes sent: " +
totalBytesSentByClosedWriters +
", elapsed time: " + ((System.currentTimeMillis()) - startTimestamp));
}