in src/types/redis_bitmap.cc [459:651]
rocksdb::Status Bitmap::BitOp(engine::Context &ctx, BitOpFlags op_flag, const std::string &op_name,
const Slice &user_key, const std::vector<Slice> &op_keys, int64_t *len) {
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
std::vector<std::pair<std::string, BitmapMetadata>> meta_pairs;
uint64_t max_bitmap_size = 0;
for (const auto &op_key : op_keys) {
BitmapMetadata metadata(false);
std::string ns_op_key = AppendNamespacePrefix(op_key);
auto s = GetMetadata(ctx, ns_op_key, &metadata, &raw_value);
if (!s.ok()) {
if (s.IsNotFound()) {
continue;
}
return s;
}
if (metadata.Type() == kRedisString) {
// Currently, we don't support bitop between bitmap and bitmap string.
return rocksdb::Status::NotSupported(kErrMsgWrongType);
}
if (metadata.size > max_bitmap_size) max_bitmap_size = metadata.size;
meta_pairs.emplace_back(std::move(ns_op_key), metadata);
}
size_t num_keys = meta_pairs.size();
auto batch = storage_->GetWriteBatchBase();
if (max_bitmap_size == 0) {
/* Compute the bit operation, if all bitmap is empty. cleanup the dest bitmap. */
auto s = batch->Delete(metadata_cf_handle_, ns_key);
if (!s.ok()) return s;
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}
std::vector<std::string> log_args = {std::to_string(kRedisCmdBitOp), op_name};
for (const auto &op_key : op_keys) {
log_args.emplace_back(op_key.ToString());
}
WriteBatchLogData log_data(kRedisBitmap, std::move(log_args));
auto s = batch->PutLogData(log_data.Encode());
if (!s.ok()) return s;
BitmapMetadata res_metadata;
// If the operation is AND and the number of keys is less than the number of op_keys,
// we can skip setting the subkeys of the result bitmap and just set the metadata.
const bool can_skip_op = op_flag == kBitOpAnd && num_keys != op_keys.size();
if (!can_skip_op) {
uint64_t stop_index = (max_bitmap_size - 1) / kBitmapSegmentBytes;
std::unique_ptr<unsigned char[]> frag_res(new unsigned char[kBitmapSegmentBytes]);
rocksdb::ReadOptions read_options = ctx.GetReadOptions();
for (uint64_t frag_index = 0; frag_index <= stop_index; frag_index++) {
std::vector<rocksdb::PinnableSlice> fragments;
uint16_t frag_maxlen = 0, frag_minlen = 0;
for (const auto &meta_pair : meta_pairs) {
std::string sub_key = InternalKey(meta_pair.first, std::to_string(frag_index * kBitmapSegmentBytes),
meta_pair.second.version, storage_->IsSlotIdEncoded())
.Encode();
rocksdb::PinnableSlice fragment;
auto s = storage_->Get(ctx, read_options, sub_key, &fragment);
if (!s.ok() && !s.IsNotFound()) {
return s;
}
if (s.IsNotFound()) {
if (op_flag == kBitOpAnd) {
// If any of the input bitmaps is empty, the result of AND
// is empty.
frag_maxlen = 0;
break;
}
} else {
if (frag_maxlen < fragment.size()) frag_maxlen = fragment.size();
if (fragment.size() < frag_minlen || frag_minlen == 0) frag_minlen = fragment.size();
fragments.emplace_back(std::move(fragment));
}
}
size_t frag_numkeys = fragments.size();
if (frag_maxlen != 0 || op_flag == kBitOpNot) {
uint16_t j = 0;
if (op_flag == kBitOpNot) {
memset(frag_res.get(), UCHAR_MAX, kBitmapSegmentBytes);
} else {
memset(frag_res.get(), 0, frag_maxlen);
}
/* Fast path: as far as we have data for all the input bitmaps we
* can take a fast path that performs much better than the
* vanilla algorithm. On ARM we skip the fast path since it will
* result in GCC compiling the code using multiple-words load/store
* operations that are not supported even in ARM >= v6. */
#ifndef USE_ALIGNED_ACCESS
if (frag_minlen >= sizeof(uint64_t) * 4 && frag_numkeys <= 16) {
auto *lres = reinterpret_cast<uint64_t *>(frag_res.get());
const uint64_t *lp[16];
for (uint64_t i = 0; i < frag_numkeys; i++) {
lp[i] = reinterpret_cast<const uint64_t *>(fragments[i].data());
}
memcpy(frag_res.get(), fragments[0].data(), frag_minlen);
auto apply_fast_path_op = [&](auto op) {
// Note: kBitOpNot cannot use this op, it only applying
// to kBitOpAnd, kBitOpOr, kBitOpXor.
CHECK(op_flag != kBitOpNot);
while (frag_minlen >= sizeof(uint64_t) * 4) {
for (uint64_t i = 1; i < frag_numkeys; i++) {
op(lres[0], lp[i][0]);
op(lres[1], lp[i][1]);
op(lres[2], lp[i][2]);
op(lres[3], lp[i][3]);
lp[i] += 4;
}
lres += 4;
j += sizeof(uint64_t) * 4;
frag_minlen -= sizeof(uint64_t) * 4;
}
};
if (op_flag == kBitOpAnd) {
apply_fast_path_op([](uint64_t &a, uint64_t b) { a &= b; });
} else if (op_flag == kBitOpOr) {
apply_fast_path_op([](uint64_t &a, uint64_t b) { a |= b; });
} else if (op_flag == kBitOpXor) {
apply_fast_path_op([](uint64_t &a, uint64_t b) { a ^= b; });
} else if (op_flag == kBitOpNot) {
while (frag_minlen >= sizeof(uint64_t) * 4) {
lres[0] = ~lres[0];
lres[1] = ~lres[1];
lres[2] = ~lres[2];
lres[3] = ~lres[3];
lres += 4;
j += sizeof(uint64_t) * 4;
frag_minlen -= sizeof(uint64_t) * 4;
}
}
}
#endif
uint8_t output = 0, byte = 0;
for (; j < frag_maxlen; j++) {
output = (fragments[0].size() <= j) ? 0 : fragments[0][j];
if (op_flag == kBitOpNot) output = ~output;
for (uint64_t i = 1; i < frag_numkeys; i++) {
byte = (fragments[i].size() <= j) ? 0 : fragments[i][j];
switch (op_flag) {
case kBitOpAnd:
output &= byte;
break;
case kBitOpOr:
output |= byte;
break;
case kBitOpXor:
output ^= byte;
break;
default:
break;
}
}
frag_res[j] = output;
}
if (op_flag == kBitOpNot) {
if (frag_index == stop_index) {
// We should not set the extra bytes to 0xff. So we should limit
// `frag_maxlen` for the last segment.
if (max_bitmap_size == (frag_index + 1) * kBitmapSegmentBytes) {
// If the last fragment is full, `max_bitmap_size % kBitmapSegmentBytes`
// would be 0. In this case, we should set `frag_maxlen` to
// `kBitmapSegmentBytes` to avoid writing an empty fragment.
frag_maxlen = kBitmapSegmentBytes;
} else {
frag_maxlen = max_bitmap_size % kBitmapSegmentBytes;
}
} else {
frag_maxlen = kBitmapSegmentBytes;
}
}
std::string sub_key = InternalKey(ns_key, std::to_string(frag_index * kBitmapSegmentBytes),
res_metadata.version, storage_->IsSlotIdEncoded())
.Encode();
auto s = batch->Put(sub_key, Slice(reinterpret_cast<char *>(frag_res.get()), frag_maxlen));
if (!s.ok()) return s;
}
}
}
std::string bytes;
res_metadata.size = max_bitmap_size;
res_metadata.Encode(&bytes);
s = batch->Put(metadata_cf_handle_, ns_key, bytes);
if (!s.ok()) return s;
*len = static_cast<int64_t>(max_bitmap_size);
return storage_->Write(ctx, storage_->DefaultWriteOptions(), batch->GetWriteBatch());
}