virtual Status RestoreFromDisk()

in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_connection_pool.hpp [531:676]


  virtual Status RestoreFromDisk(
      const std::vector<std::string> &keys_prefix_name_slices,
      std::vector<aiocb> &rds, const std::vector<int> &fds,
      const std::vector<unsigned long> &buf_sizes) override {
    if (fds.size() == 0) {
      Status::OK();
    }

    // std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply;
    const unsigned &&storage_slice = fds.size();
    aiocb *rd;
    int ret;  // int fd;

    auto cmd = [](::sw::redis::Connection &connection,
                  const ::sw::redis::StringView hkey,
                  const std::vector<const char *> &ptrs_i,
                  const std::vector<std::size_t> &sizes_i) {
      assert(strcmp(ptrs_i.front(), "RESTORE") == 0);
      assert(sizes_i.front() == 7);
      connection.send(static_cast<int>(ptrs_i.size()),
                      const_cast<const char **>(ptrs_i.data()), sizes_i.data());
    };

    size_t buf_len;
    volatile void *tem_aio_buf;

    std::vector<std::vector<const char *>> ptrs_i_i(storage_slice);
    std::vector<std::vector<std::size_t>> sizes_i_i(storage_slice);

    const static char *redis_command = "RESTORE";
    const static std::size_t &&redis_command_byte = 7;
    const static char *redis_command_param = "0";
    const static std::size_t &&redis_command_byte_param = 1;
    const static char *replace_command = "REPLACE";
    const static std::size_t &&replace_command_byte = 7;

    for (size_t i = 0; i < storage_slice; ++i) {
      rd = &rds[i];

      buf_len = buf_sizes[i];

      tem_aio_buf = rd->aio_buf;
      rd->aio_buf = realloc((void *)tem_aio_buf,
                            buf_len);  // Be careful! The memory requested here
                                       // should be freed somewhere!
      rd->aio_nbytes = buf_len;
      rd->aio_fildes = fds[i];
      rd->aio_offset = 0;
      ret = aio_read(rd);
      if (ret < 0) perror("aio_read");

      ptrs_i_i[i].reserve(5);
      ptrs_i_i[i].clear();
      ptrs_i_i[i].emplace_back(redis_command);
      ptrs_i_i[i].emplace_back(keys_prefix_name_slices[i].data());
      ptrs_i_i[i].emplace_back(redis_command_param);
      ptrs_i_i[i].emplace_back((const char *)rd->aio_buf);
      ptrs_i_i[i].emplace_back(replace_command);

      sizes_i_i[i].reserve(5);
      sizes_i_i[i].clear();
      sizes_i_i[i].emplace_back(redis_command_byte);
      sizes_i_i[i].emplace_back(keys_prefix_name_slices[i].size());
      sizes_i_i[i].emplace_back(redis_command_byte_param);
      sizes_i_i[i].emplace_back(rd->aio_nbytes);
      sizes_i_i[i].emplace_back(replace_command_byte);
    }

    int count_down = static_cast<int>(storage_slice);
    int reread_countdown[storage_slice];
    for (unsigned i = 0; i < storage_slice; ++i) {
      reread_countdown[i] = 4;
    }
    bool no_errors = true;

    while (count_down > 0) {
      for (size_t i = 0; i < storage_slice; ++i) {
        rd = &rds[i];

        if (reread_countdown[i] > 1) {
          if (rd->aio_nbytes > 0) {
            if (aio_error(rd) != EINPROGRESS) {
              if ((ret = aio_return(rd)) > 0) {
                try {
                  /*reply = */ redis_conn_write->command(
                      cmd, keys_prefix_name_slices[i], ptrs_i_i[i],
                      sizes_i_i[i]);
                } catch (const std::exception &err) {
                  LOG(ERROR)
                      << "RedisHandler error in RestoreFromDisk for slices "
                      << keys_prefix_name_slices[i] << " -- " << err.what();
                  if (rd->aio_buf) {
                    free((void *)rd->aio_buf);
                    rd->aio_buf = nullptr;
                    rd->aio_nbytes = 0;
                  }
                  return errors::Unknown(err.what());
                }
                if (rd->aio_buf) {
                  free((void *)rd->aio_buf);
                  rd->aio_buf = nullptr;
                  rd->aio_nbytes = 0;
                }
                --count_down;
              } else {
                LOG(WARNING) << "File handle " << rd->aio_fildes
                             << " did not finish reading last round. "
                             << "Try to read " << reread_countdown[i] - 1
                             << " more times";
                ret = aio_read(rd);
                if (ret < 0) perror("aio_read");
                --reread_countdown[i];
              }
            }
          } else {
            LOG(WARNING) << "File handle " << rd->aio_fildes << " for slice "
                         << keys_prefix_name_slices[i]
                         << " has nbytes 0. Ignore.";
            reread_countdown[i] = 0;
            --count_down;
            if (rd->aio_buf) {
              free((void *)rd->aio_buf);
              rd->aio_buf = nullptr;
              rd->aio_nbytes = 0;
            }
          }
        } else if (reread_countdown[i] == 1) {
          LOG(ERROR) << "File handle " << rd->aio_fildes << " for slice "
                     << keys_prefix_name_slices[i]
                     << " has some troubles! Given up.";
          --reread_countdown[i];
          --count_down;
          if (rd->aio_buf) {
            free((void *)rd->aio_buf);
            rd->aio_buf = nullptr;
            rd->aio_nbytes = 0;
          }
          no_errors = false;
        }
      }
    }

    return no_errors
               ? Status::OK()
               : errors::Unknown("Unknown errors happen in file handles.");
  }