in src/types/redis_bloom_chain.cc [153:246]
rocksdb::Status BloomChain::InsertCommon(engine::Context &ctx, const Slice &user_key,
const std::vector<std::string> &items,
const BloomFilterInsertOptions &insert_options,
std::vector<BloomFilterAddResult> *rets) {
std::string ns_key = AppendNamespacePrefix(user_key);
BloomChainMetadata metadata;
rocksdb::Status s = getBloomChainMetadata(ctx, ns_key, &metadata);
if (s.IsNotFound() && insert_options.auto_create) {
s = createBloomChain(ctx, ns_key, insert_options.error_rate, insert_options.capacity, insert_options.expansion,
&metadata);
}
if (!s.ok()) return s;
std::vector<std::string> bf_key_list;
getBFKeyList(ns_key, metadata, &bf_key_list);
std::vector<rocksdb::PinnableSlice> bf_data_list;
s = getBFDataList(ctx, bf_key_list, &bf_data_list);
if (!s.ok()) return s;
std::vector<uint64_t> item_hash_list;
getItemHashList(items, &item_hash_list);
uint64_t origin_size = metadata.size;
auto batch = storage_->GetWriteBatchBase();
WriteBatchLogData log_data(kRedisBloomFilter, {"insert"});
s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
for (size_t i = 0; i < items.size(); ++i) {
// check
bool exist = false;
// TODO: to test which direction for searching is better
for (int ii = static_cast<int>(bf_data_list.size()) - 1; ii >= 0; --ii) {
std::string_view data = bf_data_list[ii].ToStringView();
exist = bloomCheck(item_hash_list[i], data);
if (exist) break;
}
// insert
if (exist) {
(*rets)[i] = BloomFilterAddResult::kExist;
} else {
auto pinnable_slice_from_string = [](std::string data) -> rocksdb::PinnableSlice {
// This is a workaround for the issue that PinnableSlice does not support
// constructing from a temporary string.
rocksdb::PinnableSlice slice;
*slice.GetSelf() = std::move(data);
slice.PinSelf();
return slice;
};
auto strip_string_from_pinnable_slice = [](rocksdb::PinnableSlice &slice) -> std::string {
if (!slice.IsPinned()) {
// Only a "PinSelf" slice ( which is !IsPinned )
// can operate in this way.
return std::move(*slice.GetSelf());
}
return slice.ToString();
};
if (metadata.size + 1 > metadata.GetCapacity()) {
if (metadata.IsScaling()) {
s = batch->Put(bf_key_list.back(), bf_data_list.back().ToStringView());
if (!s.ok()) return s;
std::string bf_data;
s = createBloomFilterInBatch(ns_key, &metadata, batch, &bf_data);
if (!s.ok()) return s;
bf_data_list.push_back(pinnable_slice_from_string(std::move(bf_data)));
bf_key_list.push_back(getBFKey(ns_key, metadata, metadata.n_filters - 1));
} else {
(*rets)[i] = BloomFilterAddResult::kFull;
continue;
}
}
auto &bf_data = bf_data_list.back();
std::string data = strip_string_from_pinnable_slice(bf_data);
bloomAdd(item_hash_list[i], data);
bf_data = pinnable_slice_from_string(std::move(data));
(*rets)[i] = BloomFilterAddResult::kOk;
metadata.size += 1;
}
}
if (metadata.size != origin_size) {
std::string bloom_chain_metadata_bytes;
metadata.Encode(&bloom_chain_metadata_bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bloom_chain_metadata_bytes);
if (!s.ok()) return s;
s = batch->Put(bf_key_list.back(), bf_data_list.back().ToStringView());
if (!s.ok()) return s;
}
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}