in src/consumer/DefaultMQPushConsumerImpl.cpp [306:404]
void DefaultMQPushConsumerImpl::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
LOG_WARN("###Current Push Consumer@%s", getClientVersionString().c_str());
// deal with name space before start
dealWithNameSpace();
logConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
// Start status server
StatsServerManager::getInstance()->getConsumeStatServer()->start();
DefaultMQClient::start();
dealWithMessageTrace();
LOG_INFO("DefaultMQPushConsumerImpl:%s start", m_GroupName.c_str());
//<!data;
checkConfig();
//<!create rebalance;
m_pRebalance = new RebalancePush(this, getFactory());
string groupname = getGroupName();
m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
if (m_pMessageListener) {
if (m_pMessageListener->getMessageListenerType() == messageListenerOrderly) {
LOG_INFO("start orderly consume service:%s", getGroupName().c_str());
m_consumerService = new ConsumeMessageOrderlyService(this, m_consumeThreadCount, m_pMessageListener);
} else // for backward compatible, defaultly and concurrently listeners
// are allocating ConsumeMessageConcurrentlyService
{
LOG_INFO("start concurrently consume service:%s", getGroupName().c_str());
m_consumerService = new ConsumeMessageConcurrentlyService(this, m_consumeThreadCount, m_pMessageListener);
}
}
m_pullmsgQueue = new TaskQueue(m_pullMsgThreadPoolNum);
m_pullmsgThread.reset(
new boost::thread(boost::bind(&DefaultMQPushConsumerImpl::runPullMsgQueue, this, m_pullmsgQueue)));
copySubscription();
//<! registe;
bool registerOK = getFactory()->registerConsumer(this);
if (!registerOK) {
m_serviceState = CREATE_JUST;
THROW_MQEXCEPTION(
MQClientException,
"The cousumer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
}
//<!msg model;
switch (getMessageModel()) {
case BROADCASTING:
m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
break;
case CLUSTERING:
m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
break;
}
bool bStartFailed = false;
string errorMsg;
try {
m_pOffsetStore->load();
} catch (MQClientException& e) {
bStartFailed = true;
errorMsg = std::string(e.what());
}
m_consumerService->start();
getFactory()->start();
updateTopicSubscribeInfoWhenSubscriptionChanged();
getFactory()->sendHeartbeatToAllBroker();
m_serviceState = RUNNING;
if (bStartFailed) {
shutdown();
THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
}
break;
}
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
break;
default:
break;
}
getFactory()->rebalanceImmediately();
}