virtual Status DelCommand()

in tensorflow_recommenders_addons/dynamic_embedding/core/kernels/redis_impl/redis_cluster_connection_pool.hpp [1247:1325]


  virtual Status DelCommand(
      const Tensor &keys, ThreadContext *thread_context, const int64 begin,
      const int64 max_i,
      const std::vector<std::string> &keys_prefix_name_slices) override {
    const int &&total = max_i - begin;
    const int &&argc = total + 2;

    const static char *redis_command = "HDEL";
    const static std::size_t &&redis_command_byte = 4;

    const K *const pk_raw_end =
        reinterpret_cast<const K *>(keys.tensor_data().data()) + max_i;
    const K *pk_raw =
        reinterpret_cast<const K *>(keys.tensor_data().data()) + begin;

    const unsigned &storage_slice = redis_connection_params.storage_slice;
    const unsigned &&vector_len =
        (static_cast<int64>(reinterpret_cast<int>(argc)) /
         redis_connection_params.storage_slice) +
        2;

    thread_context->HandleReserve(storage_slice, vector_len, total);

    for (unsigned i = 0; i < storage_slice; ++i) {
      thread_context->HandlePushBack(i, redis_command, redis_command_byte);
      thread_context->HandlePushBack(i, keys_prefix_name_slices[i].data(),
                                     keys_prefix_name_slices[i].size());
    }

    unsigned *pbucket_loc = thread_context->bucket_locs->data();
    unsigned key_bucket_locs = 0;
    for (; pk_raw != pk_raw_end; ++pk_raw) {
      key_bucket_locs = KBucketNum<K>(pk_raw, storage_slice);
      // The bucket to which the key belongs is recorded to facilitate future
      // memory writes that do not recompute the redis hash
      *pbucket_loc = key_bucket_locs;
      ++pbucket_loc;

      // Direct access to Tensor data in TensorFlow
      thread_context->HandlePushBack(
          key_bucket_locs, KContentPointer<K>(pk_raw), KTypeSize<K>(pk_raw));
    }

    auto cmd = [](::sw::redis::Connection &connection,
                  const ::sw::redis::StringView hkey,
                  const std::vector<const char *> *ptrs_i,
                  const std::vector<std::size_t> *sizes_i) {
      assert(strcmp(ptrs_i->front(), "HDEL") == 0);
      assert(sizes_i->front() == 4);
      assert(std::string(hkey.data()).compare(ptrs_i[1]) == 0);

      connection.send(static_cast<int>(ptrs_i->size()),
                      const_cast<const char **>(ptrs_i->data()),
                      sizes_i->data());
    };

    std::vector<
        std::future<std::unique_ptr<redisReply, ::sw::redis::ReplyDeleter>>>
        results;
    try {
      for (unsigned i = 0; i < storage_slice; ++i) {
        results.emplace_back(
            network_worker_pool->enqueue([this, &cmd, &thread_context, i] {
              return PipeExecWrite(cmd, 3U, thread_context->buckets[i]);
            }));
      }
      for (auto &&result : results) {
        result.wait();
      }
      if (error_ptr) {
        std::rethrow_exception(error_ptr);
      }
    } catch (const std::exception &err) {
      error_ptr = nullptr;
      return errors::Unknown(err.what());
    }

    return Status::OK();
  }