rocksdb::Status Bitmap::BitOp()

in src/types/redis_bitmap.cc [459:651]


rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const std::string &op_name,
                              const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
  std::string raw_value;
  std::string ns_key = AppendNamespacePrefix(user_key);

  std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
  uint64_t max_bitmap_size = 0;

  for (const auto &op_key : op_keys) {
    BitmapMetadata metadata(false);
    std::string ns_op_key = AppendNamespacePrefix(op_key);
    auto s = GetMetadata(ctx, ns_op_key, &metadata, &raw_value);
    if (!s.ok()) {
      if (s.IsNotFound()) {
        continue;
      }
      return s;
    }
    if (metadata.Type() == kRedisString) {
      // Currently, we don't support bitop between bitmap and bitmap string.
      return rocksdb::Status::NotSupported(kErrMsgWrongType);
    }
    if (metadata.size > max_bitmap_size) max_bitmap_size = metadata.size;
    meta_pairs.emplace_back(std::move(ns_op_key), metadata);
  }
  size_t num_keys = meta_pairs.size();

  auto batch = storage_->GetWriteBatchBase();
  if (max_bitmap_size == 0) {
    /* Compute the bit operation, if all bitmap is empty. cleanup the dest bitmap. */
    auto s = batch->Delete(metadata_cf_handle_, ns_key);
    if (!s.ok()) return s;
    return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
  }
  std::vector<std::string> log_args = {std::to_string(kRedisCmdBitOp), op_name};
  for (const auto &op_key : op_keys) {
    log_args.emplace_back(op_key.ToString());
  }
  WriteBatchLogData log_data(kRedisBitmap, std::move(log_args));
  auto s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  BitmapMetadata res_metadata;
  // If the operation is AND and the number of keys is less than the number of op_keys,
  // we can skip setting the subkeys of the result bitmap and just set the metadata.
  const bool can_skip_op = op_flag == kBitOpAnd && num_keys != op_keys.size();
  if (!can_skip_op) {
    uint64_t stop_index = (max_bitmap_size - 1) / kBitmapSegmentBytes;
    std::unique_ptr<unsigned char[]> frag_res(new unsigned char[kBitmapSegmentBytes]);

    rocksdb::ReadOptions read_options = ctx.GetReadOptions();
    for (uint64_t frag_index = 0; frag_index <= stop_index; frag_index++) {
      std::vector<rocksdb::PinnableSlice> fragments;
      uint16_t frag_maxlen = 0, frag_minlen = 0;
      for (const auto &meta_pair : meta_pairs) {
        std::string sub_key = InternalKey(meta_pair.first, std::to_string(frag_index * kBitmapSegmentBytes),
                                          meta_pair.second.version, storage_->IsSlotIdEncoded())
                                  .Encode();
        rocksdb::PinnableSlice fragment;
        auto s = storage_->Get(ctx, read_options, sub_key, &fragment);
        if (!s.ok() && !s.IsNotFound()) {
          return s;
        }
        if (s.IsNotFound()) {
          if (op_flag == kBitOpAnd) {
            // If any of the input bitmaps is empty, the result of AND
            // is empty.
            frag_maxlen = 0;
            break;
          }
        } else {
          if (frag_maxlen < fragment.size()) frag_maxlen = fragment.size();
          if (fragment.size() < frag_minlen || frag_minlen == 0) frag_minlen = fragment.size();
          fragments.emplace_back(std::move(fragment));
        }
      }

      size_t frag_numkeys = fragments.size();
      if (frag_maxlen != 0 || op_flag == kBitOpNot) {
        uint16_t j = 0;
        if (op_flag == kBitOpNot) {
          memset(frag_res.get(), UCHAR_MAX, kBitmapSegmentBytes);
        } else {
          memset(frag_res.get(), 0, frag_maxlen);
        }

        /* Fast path: as far as we have data for all the input bitmaps we
         * can take a fast path that performs much better than the
         * vanilla algorithm. On ARM we skip the fast path since it will
         * result in GCC compiling the code using multiple-words load/store
         * operations that are not supported even in ARM >= v6. */
#ifndef USE_ALIGNED_ACCESS
        if (frag_minlen >= sizeof(uint64_t) * 4 && frag_numkeys <= 16) {
          auto *lres = reinterpret_cast<uint64_t *>(frag_res.get());
          const uint64_t *lp[16];
          for (uint64_t i = 0; i < frag_numkeys; i++) {
            lp[i] = reinterpret_cast<const uint64_t *>(fragments[i].data());
          }
          memcpy(frag_res.get(), fragments[0].data(), frag_minlen);
          auto apply_fast_path_op = [&](auto op) {
            // Note: kBitOpNot cannot use this op, it only applying
            // to kBitOpAnd, kBitOpOr, kBitOpXor.
            CHECK(op_flag != kBitOpNot);
            while (frag_minlen >= sizeof(uint64_t) * 4) {
              for (uint64_t i = 1; i < frag_numkeys; i++) {
                op(lres[0], lp[i][0]);
                op(lres[1], lp[i][1]);
                op(lres[2], lp[i][2]);
                op(lres[3], lp[i][3]);
                lp[i] += 4;
              }
              lres += 4;
              j += sizeof(uint64_t) * 4;
              frag_minlen -= sizeof(uint64_t) * 4;
            }
          };

          if (op_flag == kBitOpAnd) {
            apply_fast_path_op([](uint64_t &a, uint64_t b) { a &= b; });
          } else if (op_flag == kBitOpOr) {
            apply_fast_path_op([](uint64_t &a, uint64_t b) { a |= b; });
          } else if (op_flag == kBitOpXor) {
            apply_fast_path_op([](uint64_t &a, uint64_t b) { a ^= b; });
          } else if (op_flag == kBitOpNot) {
            while (frag_minlen >= sizeof(uint64_t) * 4) {
              lres[0] = ~lres[0];
              lres[1] = ~lres[1];
              lres[2] = ~lres[2];
              lres[3] = ~lres[3];
              lres += 4;
              j += sizeof(uint64_t) * 4;
              frag_minlen -= sizeof(uint64_t) * 4;
            }
          }
        }
#endif

        uint8_t output = 0, byte = 0;
        for (; j < frag_maxlen; j++) {
          output = (fragments[0].size() <= j) ? 0 : fragments[0][j];
          if (op_flag == kBitOpNot) output = ~output;
          for (uint64_t i = 1; i < frag_numkeys; i++) {
            byte = (fragments[i].size() <= j) ? 0 : fragments[i][j];
            switch (op_flag) {
              case kBitOpAnd:
                output &= byte;
                break;
              case kBitOpOr:
                output |= byte;
                break;
              case kBitOpXor:
                output ^= byte;
                break;
              default:
                break;
            }
          }
          frag_res[j] = output;
        }

        if (op_flag == kBitOpNot) {
          if (frag_index == stop_index) {
            // We should not set the extra bytes to 0xff. So we should limit
            // `frag_maxlen` for the last segment.
            if (max_bitmap_size == (frag_index + 1) * kBitmapSegmentBytes) {
              // If the last fragment is full, `max_bitmap_size % kBitmapSegmentBytes`
              // would be 0. In this case, we should set `frag_maxlen` to
              // `kBitmapSegmentBytes` to avoid writing an empty fragment.
              frag_maxlen = kBitmapSegmentBytes;
            } else {
              frag_maxlen = max_bitmap_size % kBitmapSegmentBytes;
            }
          } else {
            frag_maxlen = kBitmapSegmentBytes;
          }
        }
        std::string sub_key = InternalKey(ns_key, std::to_string(frag_index * kBitmapSegmentBytes),
                                          res_metadata.version, storage_->IsSlotIdEncoded())
                                  .Encode();
        auto s = batch->Put(sub_key, Slice(reinterpret_cast<char *>(frag_res.get()), frag_maxlen));
        if (!s.ok()) return s;
      }
    }
  }

  std::string bytes;
  res_metadata.size = max_bitmap_size;
  res_metadata.Encode(&bytes);
  s = batch->Put(metadata_cf_handle_, ns_key, bytes);
  if (!s.ok()) return s;
  *len = static_cast<int64_t>(max_bitmap_size);
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}