in src/consumer/DefaultMQPullConsumerImpl.cpp [57:127]
void DefaultMQPullConsumerImpl::start() {
#ifndef WIN32
/* Ignore the SIGPIPE */
struct sigaction sa;
memset(&sa, 0, sizeof(struct sigaction));
sa.sa_handler = SIG_IGN;
sa.sa_flags = 0;
sigaction(SIGPIPE, &sa, 0);
#endif
LOG_INFO("###Current Pull Consumer@%s", getClientVersionString().c_str());
dealWithNameSpace();
showClientConfigs();
switch (m_serviceState) {
case CREATE_JUST: {
m_serviceState = START_FAILED;
DefaultMQClient::start();
LOG_INFO("DefaultMQPullConsumerImpl:%s start", m_GroupName.c_str());
//<!create rebalance;
m_pRebalance = new RebalancePull(this, getFactory());
string groupname = getGroupName();
m_pPullAPIWrapper = new PullAPIWrapper(getFactory(), groupname);
//<!data;
checkConfig();
copySubscription();
//<! registe;
bool registerOK = getFactory()->registerConsumer(this);
if (!registerOK) {
m_serviceState = CREATE_JUST;
THROW_MQEXCEPTION(
MQClientException,
"The cousumer group[" + getGroupName() + "] has been created before, specify another name please.", -1);
}
//<!msg model;
switch (getMessageModel()) {
case BROADCASTING:
m_pOffsetStore = new LocalFileOffsetStore(groupname, getFactory());
break;
case CLUSTERING:
m_pOffsetStore = new RemoteBrokerOffsetStore(groupname, getFactory());
break;
}
bool bStartFailed = false;
string errorMsg;
try {
m_pOffsetStore->load();
} catch (MQClientException& e) {
bStartFailed = true;
errorMsg = std::string(e.what());
}
getFactory()->start();
m_serviceState = RUNNING;
if (bStartFailed) {
shutdown();
THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
}
break;
}
case RUNNING:
case START_FAILED:
case SHUTDOWN_ALREADY:
break;
default:
break;
}
}