in src/consumer/OffsetStore.cpp [70:136]
void LocalFileOffsetStore::load() {
std::ifstream ifs(m_storeFile.c_str(), std::ios::in);
if (ifs.good()) {
if (ifs.is_open()) {
if (ifs.peek() != std::ifstream::traits_type::eof()) {
map<string, int64> m_offsetTable_tmp;
boost::system::error_code e;
try {
boost::archive::text_iarchive ia(ifs);
ia >> m_offsetTable_tmp;
} catch (...) {
LOG_ERROR(
"load offset store file failed, please check whether file: %s is "
"cleared by operator, if so, delete this offsets.Json file and "
"then restart consumer",
m_storeFile.c_str());
ifs.close();
string errorMsg("load offset store file: ");
errorMsg.append(m_storeFile)
.append(
" failed, please check whether offsets.Json is cleared by "
"operator, if so, delete this offsets.Json file and then "
"restart consumer");
THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
}
ifs.close();
for (map<string, int64>::iterator it = m_offsetTable_tmp.begin(); it != m_offsetTable_tmp.end(); ++it) {
// LOG_INFO("it->first:%s, it->second:%lld", it->first.c_str(),
// it->second);
Json::Reader reader;
Json::Value object;
reader.parse(it->first.c_str(), object);
MQMessageQueue mq(object["topic"].asString(), object["brokerName"].asString(), object["queueId"].asInt());
updateOffset(mq, it->second);
}
m_offsetTable_tmp.clear();
} else {
LOG_ERROR(
"open offset store file failed, please check whether file: %s is "
"cleared by operator, if so, delete this offsets.Json file and "
"then restart consumer",
m_storeFile.c_str());
THROW_MQEXCEPTION(MQClientException,
"open offset store file failed, please check whether "
"offsets.Json is cleared by operator, if so, delete "
"this offsets.Json file and then restart consumer",
-1);
}
} else {
LOG_ERROR(
"open offset store file failed, please check whether file:%s is "
"deleted by operator and then restart consumer",
m_storeFile.c_str());
THROW_MQEXCEPTION(MQClientException,
"open offset store file failed, please check "
"directory:%s is deleted by operator or offset.Json "
"file is cleared by operator, and then restart "
"consumer",
-1);
}
} else {
LOG_WARN(
"offsets.Json file not exist, maybe this is the first time "
"consumation");
}
}