in src/main/java/org/apache/flink/connector/rocketmq/legacy/RocketMQSourceFunction.java [372:438]
private void initOffsets(List<MessageQueue> messageQueues) throws MQClientException {
for (MessageQueue mq : messageQueues) {
long offset;
switch (startMode) {
case LATEST:
consumer.seekToEnd(mq);
offset = consumer.committed(mq);
break;
case EARLIEST:
consumer.seekToBegin(mq);
offset = consumer.committed(mq);
break;
case GROUP_OFFSETS:
offset = consumer.committed(mq);
// the min offset return if consumer group first join,return a negative number
// if
// catch exception when fetch from broker.
// If you want consumer from earliest,please use OffsetResetStrategy.EARLIEST
if (offset <= 0) {
switch (offsetResetStrategy) {
case LATEST:
consumer.seekToEnd(mq);
offset = consumer.committed(mq);
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
break;
case EARLIEST:
log.info(
"current consumer thread:{} has no committed offset,use Strategy:{} instead",
mq,
offsetResetStrategy);
consumer.seekToBegin(mq);
offset = consumer.committed(mq);
break;
default:
break;
}
}
break;
case TIMESTAMP:
offset = consumer.offsetForTimestamp(mq, specificTimeStamp);
break;
case SPECIFIC_OFFSETS:
if (specificStartupOffsets == null) {
throw new RuntimeException(
"StartMode is specific_offsets.But none offsets has been specified");
}
Long specificOffset = specificStartupOffsets.get(mq);
if (specificOffset != null) {
offset = specificOffset;
} else {
offset = consumer.committed(mq);
}
break;
default:
throw new IllegalArgumentException(
"current startMode is not supported" + startMode);
}
log.info(
"current consumer queue:{} start from offset of: {}",
mq.getBrokerName() + "-" + mq.getQueueId(),
offset);
offsetTable.put(mq, offset);
}
}