in rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java [62:108]
public void onStart() {
Validate.notEmpty(properties, "Consumer properties can not be empty");
ordered = RocketMQConfig.getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);
consumer = new DefaultMQPushConsumer();
RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer) consumer);
if (ordered) {
consumer.registerMessageListener(new MessageListenerOrderly() {
@Override
public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeOrderlyContext context) {
if (process(msgs)) {
return ConsumeOrderlyStatus.SUCCESS;
} else {
return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
}
}
});
} else {
consumer.registerMessageListener(new MessageListenerConcurrently() {
@Override
public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
ConsumeConcurrentlyContext context) {
if (process(msgs)) {
return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
} else {
return ConsumeConcurrentlyStatus.RECONSUME_LATER;
}
}
});
}
try {
consumer.start();
} catch (MQClientException e) {
// TODO add retry
LOG.error("Failed to start rocketmq consumer because of client exception", e);
throw new InternalError(e);
} catch (Exception e) {
// should not throw spark NonFatal error here
LOG.error("Failed to start rocketmq consumer because of other exception", e);
throw new InternalError(e);
} finally {
LOG.error("error when consumer start", "xx");
}
}