public void open()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [120:150]


  public void open(Collection<TopicPartition> partitions) {
    LOGGER.debug("Thread(" + Thread.currentThread().getId() + ") Enter OPEN");
    for (TopicPartition partition : partitions) {
      LOGGER.info("Thread(" + Thread.currentThread().getId() + ") OPEN (topic: " +
                  partition.topic() + ", partition: " + partition.partition() + ")");
    }

    initOrRebuildOdps();
    writers.clear();
    sinkStatus.clear();
    writerTasks.clear();
    for (TopicPartition partition : partitions) {
      // TODO: Consider a way to resume when running in key or value mode
//      resumeCheckPoint(partition);
      MaxComputeSinkWriter writer = new MaxComputeSinkWriter(
          config,
          project,
          table,
          converterBuilder.build(),
          bufferSizeKB,
          partitionWindowType,
          tz, useStreamTunnel,
          retryTimes,
          tunnelEndpoint);
      writers.put(partition, writer);
      partitionRecordsNumPerEpoch.put(partition, 0L);
      LOGGER.info("Thread(" + Thread.currentThread().getId() +
                  ") Initialize writer successfully for (topic: " + partition.topic() +
                  ", partition: " + partition.partition() + ", failRetry: " + retryTimes + ")");
    }
  }