in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [120:150]
public void open(Collection<TopicPartition> partitions) {
LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter OPEN");
for (TopicPartition partition : partitions) {
LOGGER.info("Thread(" + Thread.currentThread().getId() + ") OPEN (topic: " +
partition.topic() + ", partition: " + partition.partition() + ")");
}
initOrRebuildOdps();
writers.clear();
sinkStatus.clear();
writerTasks.clear();
for (TopicPartition partition : partitions) {
// TODO: Consider a way to resume when running in key or value mode
// resumeCheckPoint(partition);
MaxComputeSinkWriter writer = new MaxComputeSinkWriter(
config,
project,
table,
converterBuilder.build(),
bufferSizeKB,
partitionWindowType,
tz, useStreamTunnel,
retryTimes,
tunnelEndpoint);
writers.put(partition, writer);
partitionRecordsNumPerEpoch.put(partition, 0L);
LOGGER.info("Thread(" + Thread.currentThread().getId() +
") Initialize writer successfully for (topic: " + partition.topic() +
", partition: " + partition.partition() + ", failRetry: " + retryTimes + ")");
}
}