src/consumer/DefaultMQPushConsumerImpl.h (127 lines of code) (raw):

/* * Licensed to the Apache Software Foundation (ASF) under one or more * contributor license agreements. See the NOTICE file distributed with * this work for additional information regarding copyright ownership. * The ASF licenses this file to You under the Apache License, Version 2.0 * (the "License"); you may not use this file except in compliance with * the License. You may obtain a copy of the License at * * http://www.apache.org/licenses/LICENSE-2.0 * * Unless required by applicable law or agreed to in writing, software * distributed under the License is distributed on an "AS IS" BASIS, * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. * See the License for the specific language governing permissions and * limitations under the License. */ #ifndef __DEFAULTMQPUSHCONSUMERIMPL_H__ #define __DEFAULTMQPUSHCONSUMERIMPL_H__ #include <boost/asio.hpp> #include <boost/asio/io_service.hpp> #include <boost/bind.hpp> #include <boost/date_time/posix_time/posix_time.hpp> #include <boost/scoped_ptr.hpp> #include <boost/thread/thread.hpp> #include <string> #include "AsyncCallback.h" #include "ConsumeMessageContext.h" #include "ConsumeMessageHook.h" #include "DefaultMQProducerImpl.h" #include "MQConsumer.h" #include "MQMessageListener.h" #include "MQMessageQueue.h" namespace rocketmq { class Rebalance; class SubscriptionData; class OffsetStore; class PullAPIWrapper; class PullRequest; class ConsumeMsgService; class TaskQueue; class TaskThread; class AsyncPullCallback; class ConsumerRunningInfo; //<!*************************************************************************** class DefaultMQPushConsumerImpl : public MQConsumer { public: DefaultMQPushConsumerImpl(); DefaultMQPushConsumerImpl(const std::string& groupname); void boost_asio_work(); virtual ~DefaultMQPushConsumerImpl(); //<!begin mqadmin; virtual void start(); virtual void shutdown(); //<!end mqadmin; //<!begin MQConsumer virtual bool sendMessageBack(MQMessageExt& msg, int delayLevel, std::string& brokerName); virtual void fetchSubscribeMessageQueues(const std::string& topic, std::vector<MQMessageQueue>& mqs); virtual void doRebalance(); virtual void persistConsumerOffset(); virtual void persistConsumerOffsetByResetOffset(); virtual void updateTopicSubscribeInfo(const std::string& topic, std::vector<MQMessageQueue>& info); virtual ConsumeType getConsumeType(); virtual ConsumeFromWhere getConsumeFromWhere(); void setConsumeFromWhere(ConsumeFromWhere consumeFromWhere); virtual void getSubscriptions(std::vector<SubscriptionData>&); virtual void updateConsumeOffset(const MQMessageQueue& mq, int64 offset); virtual void removeConsumeOffset(const MQMessageQueue& mq); virtual PullResult pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums) { return PullResult(); } virtual void pull(const MQMessageQueue& mq, const std::string& subExpression, int64 offset, int maxNums, PullCallback* pPullCallback) {} virtual ConsumerRunningInfo* getConsumerRunningInfo(); //<!end MQConsumer; void registerMessageListener(MQMessageListener* pMessageListener); MessageListenerType getMessageListenerType(); void subscribe(const std::string& topic, const std::string& subExpression); OffsetStore* getOffsetStore() const; virtual Rebalance* getRebalance() const; ConsumeMsgService* getConsumerMsgService() const; virtual bool producePullMsgTask(boost::weak_ptr<PullRequest>); virtual bool producePullMsgTaskLater(boost::weak_ptr<PullRequest>, int millis); static void static_triggerNextPullRequest(void* context, boost::asio::deadline_timer* t, boost::weak_ptr<PullRequest>); void triggerNextPullRequest(boost::asio::deadline_timer* t, boost::weak_ptr<PullRequest>); void runPullMsgQueue(TaskQueue* pTaskQueue); void pullMessage(boost::weak_ptr<PullRequest> pullrequest); void pullMessageAsync(boost::weak_ptr<PullRequest> pullrequest); void setAsyncPull(bool asyncFlag); AsyncPullCallback* getAsyncPullCallBack(boost::weak_ptr<PullRequest>, MQMessageQueue msgQueue); void shutdownAsyncPullCallBack(); /* for orderly consume, set the pull num of message size by each pullMsg, default value is 1; */ void setConsumeMessageBatchMaxSize(int consumeMessageBatchMaxSize); int getConsumeMessageBatchMaxSize() const; /* set consuming thread count, default value is cpu cores */ void setConsumeThreadCount(int threadCount); int getConsumeThreadCount() const; void setMaxReconsumeTimes(int maxReconsumeTimes); int getMaxReconsumeTimes() const; /* set pullMsg thread count, default value is cpu cores */ void setPullMsgThreadPoolCount(int threadCount); int getPullMsgThreadPoolCount() const; /* set max cache msg size perQueue in memory if consumer could not consume msgs immediately default maxCacheMsgSize perQueue is 1000, set range is:1~65535 */ void setMaxCacheMsgSizePerQueue(int maxCacheSize); int getMaxCacheMsgSizePerQueue() const; void submitSendTraceRequest(MQMessage& msg, SendCallback* pSendCallback); bool hasConsumeMessageHook(); void registerConsumeMessageHook(std::shared_ptr<ConsumeMessageHook>& hook); void setDefaultMqProducerImpl(DefaultMQProducerImpl* DefaultMqProducerImpl); void executeConsumeMessageHookBefore(ConsumeMessageContext* context); void executeConsumeMessageHookAfter(ConsumeMessageContext* context); private: void checkConfig(); void copySubscription(); void updateTopicSubscribeInfoWhenSubscriptionChanged(); bool dealWithNameSpace(); void logConfigs(); bool dealWithMessageTrace(); void createMessageTraceInnerProducer(); void shutdownMessageTraceInnerProducer(); private: uint64_t m_startTime; ConsumeFromWhere m_consumeFromWhere; std::map<std::string, std::string> m_subTopics; int m_consumeThreadCount; OffsetStore* m_pOffsetStore; Rebalance* m_pRebalance; PullAPIWrapper* m_pPullAPIWrapper; ConsumeMsgService* m_consumerService; MQMessageListener* m_pMessageListener; int m_consumeMessageBatchMaxSize; int m_maxMsgCacheSize; int m_maxReconsumeTimes = -1; boost::asio::io_service m_async_ioService; boost::scoped_ptr<boost::thread> m_async_service_thread; typedef std::map<MQMessageQueue, AsyncPullCallback*> PullMAP; PullMAP m_PullCallback; bool m_asyncPull; int m_asyncPullTimeout; int m_pullMsgThreadPoolNum; private: TaskQueue* m_pullmsgQueue; std::unique_ptr<boost::thread> m_pullmsgThread; // used for trace std::vector<std::shared_ptr<ConsumeMessageHook> > m_consumeMessageHookList; std::shared_ptr<DefaultMQProducerImpl> m_DefaultMQProducerImpl; }; //<!*************************************************************************** } // namespace rocketmq #endif