in src/consumer/OffsetStore.cpp [143:175]
int64 LocalFileOffsetStore::readOffset(const MQMessageQueue& mq,
ReadOffsetType type,
const SessionCredentials& session_credentials) {
switch (type) {
case MEMORY_FIRST_THEN_STORE:
case READ_FROM_MEMORY: {
boost::lock_guard<boost::mutex> lock(m_lock);
MQ2OFFSET::iterator it = m_offsetTable.find(mq);
if (it != m_offsetTable.end()) {
return it->second;
} else if (READ_FROM_MEMORY == type) {
return -1;
}
}
case READ_FROM_STORE: {
try {
load();
} catch (MQException& e) {
LOG_ERROR("catch exception when load local file");
return -1;
}
boost::lock_guard<boost::mutex> lock(m_lock);
MQ2OFFSET::iterator it = m_offsetTable.find(mq);
if (it != m_offsetTable.end()) {
return it->second;
}
}
default:
break;
}
LOG_ERROR("can not readOffset from offsetStore.json, maybe first time consumation");
return -1;
}