private void resumeCheckPoint()

in src/main/java/com/aliyun/odps/kafka/connect/MaxComputeSinkTask.java [155:180]


  private void resumeCheckPoint(TopicPartition partition) {
    StringBuilder queryBuilder = new StringBuilder();
    queryBuilder.append("SELECT MAX(offset) as offset ")
        .append("FROM ").append(table).append(" ")
        .append("WHERE topic=\"").append(partition.topic()).append("\" ")
        .append("AND partition=").append(partition.partition()).append(";");

    try {
      Instance findLastCommittedOffset = SQLTask.run(odps, queryBuilder.toString());
      findLastCommittedOffset.waitForSuccess();
      ResultSet res = SQLTask.getResultSet(findLastCommittedOffset);
      Long lastCommittedOffset = res.next().getBigint("offset");
      LOGGER.info("Thread(" + Thread.currentThread().getId() +
                  ") Last committed offset for (topic: " + partition.topic() + ", partition: "
                  + partition.partition() + "): " + lastCommittedOffset);
      if (lastCommittedOffset != null) {
        // Offset should be reset to the last committed plus one, otherwise the last committed
        // record will be duplicated
        context.offset(partition, lastCommittedOffset + 1);
      }
    } catch (OdpsException | IOException e) {
      LOGGER
          .error("Thread(" + Thread.currentThread().getId() + ") Resume from checkpoint failed", e);
      throw new RuntimeException(e);
    }
  }