void LocalFileOffsetStore::load()

in src/consumer/OffsetStore.cpp [70:136]


void LocalFileOffsetStore::load() {
  std::ifstream ifs(m_storeFile.c_str(), std::ios::in);
  if (ifs.good()) {
    if (ifs.is_open()) {
      if (ifs.peek() != std::ifstream::traits_type::eof()) {
        map<string, int64> m_offsetTable_tmp;
        boost::system::error_code e;
        try {
          boost::archive::text_iarchive ia(ifs);
          ia >> m_offsetTable_tmp;
        } catch (...) {
          LOG_ERROR(
              "load offset store file failed, please check whether file: %s is "
              "cleared by operator, if so, delete this offsets.Json file and "
              "then restart consumer",
              m_storeFile.c_str());
          ifs.close();
          string errorMsg("load offset store file: ");
          errorMsg.append(m_storeFile)
              .append(
                  " failed, please check whether offsets.Json is cleared by "
                  "operator, if so, delete this offsets.Json file and then "
                  "restart consumer");
          THROW_MQEXCEPTION(MQClientException, errorMsg, -1);
        }
        ifs.close();

        for (map<string, int64>::iterator it = m_offsetTable_tmp.begin(); it != m_offsetTable_tmp.end(); ++it) {
          // LOG_INFO("it->first:%s, it->second:%lld", it->first.c_str(),
          // it->second);
          Json::Reader reader;
          Json::Value object;
          reader.parse(it->first.c_str(), object);
          MQMessageQueue mq(object["topic"].asString(), object["brokerName"].asString(), object["queueId"].asInt());
          updateOffset(mq, it->second);
        }
        m_offsetTable_tmp.clear();
      } else {
        LOG_ERROR(
            "open offset store file failed, please check whether file: %s is "
            "cleared by operator, if so, delete this offsets.Json file and "
            "then restart consumer",
            m_storeFile.c_str());
        THROW_MQEXCEPTION(MQClientException,
                          "open offset store file failed, please check whether "
                          "offsets.Json is cleared by operator, if so, delete "
                          "this offsets.Json file and then restart consumer",
                          -1);
      }
    } else {
      LOG_ERROR(
          "open offset store file failed, please check whether file:%s is "
          "deleted by operator and then restart consumer",
          m_storeFile.c_str());
      THROW_MQEXCEPTION(MQClientException,
                        "open offset store file failed, please check "
                        "directory:%s is deleted by operator or offset.Json "
                        "file is cleared by operator, and then restart "
                        "consumer",
                        -1);
    }
  } else {
    LOG_WARN(
        "offsets.Json file not exist, maybe this is the first time "
        "consumation");
  }
}