rocksdb::Status BloomChain::InsertCommon()

in src/types/redis_bloom_chain.cc [153:246]


rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user_key,
                                         const std::vector<std::string> &items,
                                         const BloomFilterInsertOptions &insert_options,
                                         std::vector<BloomFilterAddResult> *rets) {
  std::string ns_key = AppendNamespacePrefix(user_key);

  BloomChainMetadata metadata;
  rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);

  if (s.IsNotFound() && insert_options.auto_create) {
    s = createBloomChain(ctx, ns_key, insert_options.error_rate, insert_options.capacity, insert_options.expansion,
                         &metadata);
  }
  if (!s.ok()) return s;

  std::vector<std::string> bf_key_list;
  getBFKeyList(ns_key, metadata, &bf_key_list);

  std::vector<rocksdb::PinnableSlice> bf_data_list;
  s = getBFDataList(ctx, bf_key_list, &bf_data_list);
  if (!s.ok()) return s;

  std::vector<uint64_t> item_hash_list;
  getItemHashList(items, &item_hash_list);

  uint64_t origin_size = metadata.size;
  auto batch = storage_->GetWriteBatchBase();
  WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
  s = batch->PutLogData(log_data.Encode());
  if (!s.ok()) return s;

  for (size_t i = 0; i < items.size(); ++i) {
    // check
    bool exist = false;
    // TODO: to test which direction for searching is better
    for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
      std::string_view data = bf_data_list[ii].ToStringView();
      exist = bloomCheck(item_hash_list[i], data);
      if (exist) break;
    }

    // insert
    if (exist) {
      (*rets)[i] = BloomFilterAddResult::kExist;
    } else {
      auto pinnable_slice_from_string = [](std::string data) -> rocksdb::PinnableSlice {
        // This is a workaround for the issue that PinnableSlice does not support
        // constructing from a temporary string.
        rocksdb::PinnableSlice slice;
        *slice.GetSelf() = std::move(data);
        slice.PinSelf();
        return slice;
      };
      auto strip_string_from_pinnable_slice = [](rocksdb::PinnableSlice &slice) -> std::string {
        if (!slice.IsPinned()) {
          // Only a "PinSelf" slice ( which is !IsPinned )
          // can operate in this way.
          return std::move(*slice.GetSelf());
        }
        return slice.ToString();
      };
      if (metadata.size + 1 > metadata.GetCapacity()) {
        if (metadata.IsScaling()) {
          s = batch->Put(bf_key_list.back(), bf_data_list.back().ToStringView());
          if (!s.ok()) return s;
          std::string bf_data;
          s = createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
          if (!s.ok()) return s;
          bf_data_list.push_back(pinnable_slice_from_string(std::move(bf_data)));
          bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
        } else {
          (*rets)[i] = BloomFilterAddResult::kFull;
          continue;
        }
      }
      auto &bf_data = bf_data_list.back();
      std::string data = strip_string_from_pinnable_slice(bf_data);
      bloomAdd(item_hash_list[i], data);
      bf_data = pinnable_slice_from_string(std::move(data));
      (*rets)[i] = BloomFilterAddResult::kOk;
      metadata.size += 1;
    }
  }

  if (metadata.size != origin_size) {
    std::string bloom_chain_metadata_bytes;
    metadata.Encode(&bloom_chain_metadata_bytes);
    s = batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
    if (!s.ok()) return s;
    s = batch->Put(bf_key_list.back(), bf_data_list.back().ToStringView());
    if (!s.ok()) return s;
  }
  return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}