private void initOffsets()

in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [373:439]


    private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
        for (MessageQueue mq : messageQueues) {
            long offset;
            switch (startMode) {
                case LATEST:
                    consumer.seekToEnd(mq);
                    offset = consumer.committed(mq);
                    break;
                case EARLIEST:
                    consumer.seekToBegin(mq);
                    offset = consumer.committed(mq);
                    break;
                case GROUP_OFFSETS:
                    offset = consumer.committed(mq);
                    // the min offset return if consumer group first join,return a negative number
                    // if
                    // catch exception when fetch from broker.
                    // If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
                    if (offset <= 0) {
                        switch (offsetResetStrategy) {
                            case LATEST:
                                consumer.seekToEnd(mq);
                                offset = consumer.committed(mq);
                                log.info(
                                        "current consumer thread:{} has no committed offset,use Strategy:{} instead",
                                        mq,
                                        offsetResetStrategy);
                                break;
                            case EARLIEST:
                                log.info(
                                        "current consumer thread:{} has no committed offset,use Strategy:{} instead",
                                        mq,
                                        offsetResetStrategy);
                                consumer.seekToBegin(mq);
                                offset = consumer.committed(mq);
                                break;
                            default:
                                break;
                        }
                    }
                    break;
                case TIMESTAMP:
                    offset = consumer.offsetForTimestamp(mq, specificTimeStamp);
                    break;
                case SPECIFIC_OFFSETS:
                    if (specificStartupOffsets == null) {
                        throw new RuntimeException(
                                "StartMode is specific_offsets.But none offsets has been specified");
                    }
                    Long specificOffset = specificStartupOffsets.get(mq);
                    if (specificOffset != null) {
                        offset = specificOffset;
                    } else {
                        offset = consumer.committed(mq);
                    }
                    break;
                default:
                    throw new IllegalArgumentException(
                            "current startMode is not supported" + startMode);
            }
            log.info(
                    "current consumer queue:{} start from offset of: {}",
                    mq.getBrokerName() + "-" + mq.getQueueId(),
                    offset);
            offsetTable.put(mq, offset);
        }
    }