in rocketmq-spark/src/main/java/org/apache/rocketmq/spark/RocketMQConfig.java [128:164]
public static void buildConsumerConfigs(Properties props, DefaultMQPushConsumer consumer) {
buildCommonConfigs(props, consumer);
String group = props.getProperty(CONSUMER_GROUP);
Validate.notEmpty(group);
consumer.setConsumerGroup(group);
consumer.setPersistConsumerOffsetInterval(getInteger(props,
CONSUMER_OFFSET_PERSIST_INTERVAL, DEFAULT_CONSUMER_OFFSET_PERSIST_INTERVAL));
consumer.setConsumeThreadMin(getInteger(props,
CONSUMER_MIN_THREADS, DEFAULT_CONSUMER_MIN_THREADS));
consumer.setConsumeThreadMax(getInteger(props,
CONSUMER_MAX_THREADS, DEFAULT_CONSUMER_MAX_THREADS));
String initOffset = props.getProperty(CONSUMER_OFFSET_RESET_TO, CONSUMER_OFFSET_LATEST);
switch (initOffset) {
case CONSUMER_OFFSET_EARLIEST:
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_FIRST_OFFSET);
break;
case CONSUMER_OFFSET_LATEST:
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
break;
case CONSUMER_OFFSET_TIMESTAMP:
consumer.setConsumeTimestamp(initOffset);
break;
default:
consumer.setConsumeFromWhere(ConsumeFromWhere.CONSUME_FROM_LAST_OFFSET);
}
String topic = props.getProperty(CONSUMER_TOPIC);
Validate.notEmpty(topic);
try {
consumer.subscribe(topic, props.getProperty(CONSUMER_TAG, DEFAULT_TAG));
} catch (MQClientException e) {
throw new IllegalArgumentException(e);
}
}