public void onStart()

in rocketmq-spark/src/main/java/org/apache/rocketmq/spark/streaming/RocketMQReceiver.java [62:108]


    public void onStart() {
        Validate.notEmpty(properties, "Consumer properties can not be empty");
        ordered = RocketMQConfig.getBoolean(properties, RocketMQConfig.CONSUMER_MESSAGES_ORDERLY, false);

        consumer = new DefaultMQPushConsumer();
        RocketMQConfig.buildConsumerConfigs(properties, (DefaultMQPushConsumer) consumer);

        if (ordered) {
            consumer.registerMessageListener(new MessageListenerOrderly() {
                @Override
                public ConsumeOrderlyStatus consumeMessage(List<MessageExt> msgs,
                                                           ConsumeOrderlyContext context) {
                    if (process(msgs)) {
                        return ConsumeOrderlyStatus.SUCCESS;
                    } else {
                        return ConsumeOrderlyStatus.SUSPEND_CURRENT_QUEUE_A_MOMENT;
                    }
                }
            });
        } else {
            consumer.registerMessageListener(new MessageListenerConcurrently() {
                @Override
                public ConsumeConcurrentlyStatus consumeMessage(List<MessageExt> msgs,
                                                                ConsumeConcurrentlyContext context) {
                    if (process(msgs)) {
                        return ConsumeConcurrentlyStatus.CONSUME_SUCCESS;
                    } else {
                        return ConsumeConcurrentlyStatus.RECONSUME_LATER;
                    }
                }
            });
        }

        try {
            consumer.start();
        } catch (MQClientException e) {
            // TODO add retry
            LOG.error("Failed to start rocketmq consumer because of client exception", e);
            throw new InternalError(e);
        } catch (Exception e) {
            // should not throw spark NonFatal error here
            LOG.error("Failed to start rocketmq consumer because of other exception", e);
            throw new InternalError(e);
        } finally {
            LOG.error("error when consumer start", "xx");
        }
    }