Status ReCreateTableBuckets()

in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_table_op.cc [507:615]


  Status ReCreateTableBuckets(OpKernelContext *ctx,
                              const std::string &keys_prefix_name_from) {
    std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter> hscan_reply;
    const redisReply *kvs_reply;
    std::vector<std::string> keys_prefix_name_slices_in_redis =
        _table_instance->GetKeyBucketsAndOptimizerParamsWithName(
            keys_prefix_name_from, false);
    Tensor keys_temp;
    const K *pk_raw;
    Tensor values_temp;
    const V *pv_raw;
    int64 slice_keys_size = 0;
    long long cursor = 0;

    redisReply *temp_reply;
    for (size_t i = 0; i < keys_prefix_name_slices_in_redis.size(); ++i) {
      slice_keys_size = _table_instance->TableSizeInBucket(
          keys_prefix_name_slices_in_redis[i]);
      // fill Tensor keys_temp
      try {
        TF_RETURN_IF_ERROR(ctx->allocate_temp(DataTypeToEnum<K>::v(),
                                              TensorShape({slice_keys_size}),
                                              &keys_temp));
        TF_RETURN_IF_ERROR(ctx->allocate_temp(
            DataTypeToEnum<V>::v(),
            TensorShape({slice_keys_size, runtime_value_dim_}), &values_temp));
        pk_raw = reinterpret_cast<const K *>(keys_temp.tensor_data().data());
        pv_raw = reinterpret_cast<const V *>(values_temp.tensor_data().data());
        cursor = 0;
        while (true) {
          hscan_reply.reset();
          hscan_reply = std::move(_table_instance->HscanGetKeysValsInBucket(
              keys_prefix_name_slices_in_redis[i], &cursor,
              multi_redis_cmd_max_argc));
          if (hscan_reply == nullptr) {
            return errors::Unknown(
                "Unknown errors happen when HscanGetKeysValsInBucket in "
                "ReCreateTableBuckets");
          }
          if (hscan_reply->type == REDIS_REPLY_ARRAY &&
              hscan_reply->elements > 1) {
            kvs_reply = hscan_reply->element[1];
            // fill Tensor keys and values
            for (size_t j = 0; j < kvs_reply->elements; ++j) {
              temp_reply = kvs_reply->element[j];
              if (temp_reply->type ==
                  REDIS_REPLY_STRING) {  // #define REDIS_REPLY_STRING 1
                ReplyMemcpyToKeyTensor<K>(
                    pk_raw, temp_reply->str,
                    temp_reply
                        ->len);  // Direct access to Tensor data in TensorFlow
              }
              ++pk_raw;

              ++j;
              temp_reply = kvs_reply->element[j];
              if (temp_reply->type ==
                  REDIS_REPLY_STRING) {  // #define REDIS_REPLY_STRING 1
                ReplyMemcpyToValTensor<V>(
                    pv_raw, temp_reply->str,
                    runtime_value_dim_);  // Direct access to Tensor data in
                                          // TensorFlow
              }
              pv_raw += runtime_value_dim_;
            }
          }

          LOG(INFO) << "The cursor of scanning "
                    << keys_prefix_name_slices_in_redis[i]
                    << " in ReCreateTableBuckets is " << cursor << " now.";
          if (cursor == 0) {
            break;
          }
        }
      } catch (const std::exception &err) {
        LOG(ERROR) << "Some errors happened when try to copy Redis old buckets "
                      "data reply into tensor for preparing to insert"
                   << " -- " << err.what();
        return errors::Unknown(err.what());
      }
      try {
        // insert KV pair into new Redis with new storage_slice
        launchInsert(ctx, keys_prefix_name_slices, keys_temp, values_temp,
                     slice_keys_size, runtime_value_dim_, threads_Insert);
      } catch (const std::exception &err) {
        LOG(ERROR)
            << "Some errors happened when try to insert new buckets into Redis"
            << " -- " << err.what();
        return errors::Unknown(err.what());
      }
    }
    auto statu = Status::OK();
    for (auto keys_prefix_name_slice_in_redis :
         keys_prefix_name_slices_in_redis) {
      LOG(INFO) << "Now try to delet old bucket "
                << keys_prefix_name_slice_in_redis;
      auto iter = std::find(keys_prefix_name_slices.begin(),
                            keys_prefix_name_slices.end(),
                            keys_prefix_name_slice_in_redis);
      if (iter == keys_prefix_name_slices.end()) {
        statu = _table_instance->RemoveHkeysInBuckets(
            keys_prefix_name_slice_in_redis);
        if (statu != Status::OK()) {
          return statu;
        }
      }
    }
    return Status::OK();
  }