in src/consumer/OffsetStore.cpp [232:265]
int64 RemoteBrokerOffsetStore::readOffset(const MQMessageQueue& mq,
ReadOffsetType type,
const SessionCredentials& session_credentials) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
boost::lock_guard<boost::mutex> lock(m_lock);
MQ2OFFSET::iterator it = m_offsetTable.find(mq);
if (it != m_offsetTable.end()) {
return it->second;
} else if (READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
int64 brokerOffset = fetchConsumeOffsetFromBroker(mq, session_credentials);
//<!update;
updateOffset(mq, brokerOffset);
return brokerOffset;
} catch (MQBrokerException& e) {
LOG_ERROR("%s", e.what());
return -1;
} catch (MQException& e) {
LOG_ERROR("%s", e.what());
return -2;
}
}
default:
break;
}
return -1;
}