in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_connection_pool.hpp [531:676]
virtual Status RestoreFromDisk(
const std::vector<std::string> &keys_prefix_name_slices,
std::vector<aiocb> &rds, const std::vector<int> &fds,
const std::vector<unsigned long> &buf_sizes) override {
if (fds.size() == 0) {
Status::OK();
}
// std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> reply;
const unsigned &&storage_slice = fds.size();
aiocb *rd;
int ret; // int fd;
auto cmd = [](::sw::redis::Connection &connection,
const ::sw::redis::StringView hkey,
const std::vector<const char *> &ptrs_i,
const std::vector<std::size_t> &sizes_i) {
assert(strcmp(ptrs_i.front(), "RESTORE") == 0);
assert(sizes_i.front() == 7);
connection.send(static_cast<int>(ptrs_i.size()),
const_cast<const char **>(ptrs_i.data()), sizes_i.data());
};
size_t buf_len;
volatile void *tem_aio_buf;
std::vector<std::vector<const char *>> ptrs_i_i(storage_slice);
std::vector<std::vector<std::size_t>> sizes_i_i(storage_slice);
const static char *redis_command = "RESTORE";
const static std::size_t &&redis_command_byte = 7;
const static char *redis_command_param = "0";
const static std::size_t &&redis_command_byte_param = 1;
const static char *replace_command = "REPLACE";
const static std::size_t &&replace_command_byte = 7;
for (size_t i = 0; i < storage_slice; ++i) {
rd = &rds[i];
buf_len = buf_sizes[i];
tem_aio_buf = rd->aio_buf;
rd->aio_buf = realloc((void *)tem_aio_buf,
buf_len); // Be careful! The memory requested here
// should be freed somewhere!
rd->aio_nbytes = buf_len;
rd->aio_fildes = fds[i];
rd->aio_offset = 0;
ret = aio_read(rd);
if (ret < 0) perror("aio_read");
ptrs_i_i[i].reserve(5);
ptrs_i_i[i].clear();
ptrs_i_i[i].emplace_back(redis_command);
ptrs_i_i[i].emplace_back(keys_prefix_name_slices[i].data());
ptrs_i_i[i].emplace_back(redis_command_param);
ptrs_i_i[i].emplace_back((const char *)rd->aio_buf);
ptrs_i_i[i].emplace_back(replace_command);
sizes_i_i[i].reserve(5);
sizes_i_i[i].clear();
sizes_i_i[i].emplace_back(redis_command_byte);
sizes_i_i[i].emplace_back(keys_prefix_name_slices[i].size());
sizes_i_i[i].emplace_back(redis_command_byte_param);
sizes_i_i[i].emplace_back(rd->aio_nbytes);
sizes_i_i[i].emplace_back(replace_command_byte);
}
int count_down = static_cast<int>(storage_slice);
int reread_countdown[storage_slice];
for (unsigned i = 0; i < storage_slice; ++i) {
reread_countdown[i] = 4;
}
bool no_errors = true;
while (count_down > 0) {
for (size_t i = 0; i < storage_slice; ++i) {
rd = &rds[i];
if (reread_countdown[i] > 1) {
if (rd->aio_nbytes > 0) {
if (aio_error(rd) != EINPROGRESS) {
if ((ret = aio_return(rd)) > 0) {
try {
/*reply = */ redis_conn_write->command(
cmd, keys_prefix_name_slices[i], ptrs_i_i[i],
sizes_i_i[i]);
} catch (const std::exception &err) {
LOG(ERROR)
<< "RedisHandler error in RestoreFromDisk for slices "
<< keys_prefix_name_slices[i] << " -- " << err.what();
if (rd->aio_buf) {
free((void *)rd->aio_buf);
rd->aio_buf = nullptr;
rd->aio_nbytes = 0;
}
return errors::Unknown(err.what());
}
if (rd->aio_buf) {
free((void *)rd->aio_buf);
rd->aio_buf = nullptr;
rd->aio_nbytes = 0;
}
--count_down;
} else {
LOG(WARNING) << "File handle " << rd->aio_fildes
<< " did not finish reading last round. "
<< "Try to read " << reread_countdown[i] - 1
<< " more times";
ret = aio_read(rd);
if (ret < 0) perror("aio_read");
--reread_countdown[i];
}
}
} else {
LOG(WARNING) << "File handle " << rd->aio_fildes << " for slice "
<< keys_prefix_name_slices[i]
<< " has nbytes 0. Ignore.";
reread_countdown[i] = 0;
--count_down;
if (rd->aio_buf) {
free((void *)rd->aio_buf);
rd->aio_buf = nullptr;
rd->aio_nbytes = 0;
}
}
} else if (reread_countdown[i] == 1) {
LOG(ERROR) << "File handle " << rd->aio_fildes << " for slice "
<< keys_prefix_name_slices[i]
<< " has some troubles! Given up.";
--reread_countdown[i];
--count_down;
if (rd->aio_buf) {
free((void *)rd->aio_buf);
rd->aio_buf = nullptr;
rd->aio_nbytes = 0;
}
no_errors = false;
}
}
}
return no_errors
? Status::OK()
: errors::Unknown("Unknown errors happen in file handles.");
}