in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_table_op.cc [507:615]
Status ReCreateTableBuckets(OpKernelContext *ctx,
const std::string &keys_prefix_name_from) {
std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> hscan_reply;
const redisReply *kvs_reply;
std::vector<std::string> keys_prefix_name_slices_in_redis =
_table_instance->GetKeyBucketsAndOptimizerParamsWithName(
keys_prefix_name_from, false);
Tensor keys_temp;
const K *pk_raw;
Tensor values_temp;
const V *pv_raw;
int64 slice_keys_size = 0;
long long cursor = 0;
redisReply *temp_reply;
for (size_t i = 0; i < keys_prefix_name_slices_in_redis.size(); ++i) {
slice_keys_size = _table_instance->TableSizeInBucket(
keys_prefix_name_slices_in_redis[i]);
// fill Tensor keys_temp
try {
TF_RETURN_IF_ERROR(ctx->allocate_temp(DataTypeToEnum<K>::v(),
TensorShape({slice_keys_size}),
&keys_temp));
TF_RETURN_IF_ERROR(ctx->allocate_temp(
DataTypeToEnum<V>::v(),
TensorShape({slice_keys_size, runtime_value_dim_}), &values_temp));
pk_raw = reinterpret_cast<const K *>(keys_temp.tensor_data().data());
pv_raw = reinterpret_cast<const V *>(values_temp.tensor_data().data());
cursor = 0;
while (true) {
hscan_reply.reset();
hscan_reply = std::move(_table_instance->HscanGetKeysValsInBucket(
keys_prefix_name_slices_in_redis[i], &cursor,
multi_redis_cmd_max_argc));
if (hscan_reply == nullptr) {
return errors::Unknown(
"Unknown errors happen when HscanGetKeysValsInBucket in "
"ReCreateTableBuckets");
}
if (hscan_reply->type == REDIS_REPLY_ARRAY &&
hscan_reply->elements > 1) {
kvs_reply = hscan_reply->element[1];
// fill Tensor keys and values
for (size_t j = 0; j < kvs_reply->elements; ++j) {
temp_reply = kvs_reply->element[j];
if (temp_reply->type ==
REDIS_REPLY_STRING) { // #define REDIS_REPLY_STRING 1
ReplyMemcpyToKeyTensor<K>(
pk_raw, temp_reply->str,
temp_reply
->len); // Direct access to Tensor data in TensorFlow
}
++pk_raw;
++j;
temp_reply = kvs_reply->element[j];
if (temp_reply->type ==
REDIS_REPLY_STRING) { // #define REDIS_REPLY_STRING 1
ReplyMemcpyToValTensor<V>(
pv_raw, temp_reply->str,
runtime_value_dim_); // Direct access to Tensor data in
// TensorFlow
}
pv_raw += runtime_value_dim_;
}
}
LOG(INFO) << "The cursor of scanning "
<< keys_prefix_name_slices_in_redis[i]
<< " in ReCreateTableBuckets is " << cursor << " now.";
if (cursor == 0) {
break;
}
}
} catch (const std::exception &err) {
LOG(ERROR) << "Some errors happened when try to copy Redis old buckets "
"data reply into tensor for preparing to insert"
<< " -- " << err.what();
return errors::Unknown(err.what());
}
try {
// insert KV pair into new Redis with new storage_slice
launchInsert(ctx, keys_prefix_name_slices, keys_temp, values_temp,
slice_keys_size, runtime_value_dim_, threads_Insert);
} catch (const std::exception &err) {
LOG(ERROR)
<< "Some errors happened when try to insert new buckets into Redis"
<< " -- " << err.what();
return errors::Unknown(err.what());
}
}
auto statu = Status::OK();
for (auto keys_prefix_name_slice_in_redis :
keys_prefix_name_slices_in_redis) {
LOG(INFO) << "Now try to delet old bucket "
<< keys_prefix_name_slice_in_redis;
auto iter = std::find(keys_prefix_name_slices.begin(),
keys_prefix_name_slices.end(),
keys_prefix_name_slice_in_redis);
if (iter == keys_prefix_name_slices.end()) {
statu = _table_instance->RemoveHkeysInBuckets(
keys_prefix_name_slice_in_redis);
if (statu != Status::OK()) {
return statu;
}
}
}
return Status::OK();
}