in src/types/redis_bitmap.cc [309:457]
rocksdb::Status Bitmap::BitPos(engine::Context &ctx, const Slice &user_key, bool bit, int64_t start, int64_t stop,
bool stop_given, int64_t *pos, bool is_bit_index) {
if (is_bit_index) CHECK(stop_given);
std::string raw_value;
std::string ns_key = AppendNamespacePrefix(user_key);
BitmapMetadata metadata(false);
rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
*pos = bit ? -1 : 0;
return rocksdb::Status::OK();
}
if (metadata.Type() == kRedisString) {
redis::BitmapString bitmap_string_db(storage_, namespace_);
return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
}
uint32_t to_bit_factor = is_bit_index ? 8 : 1;
auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);
std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
auto u_start = static_cast<uint64_t>(start);
auto u_stop = static_cast<uint64_t>(stop);
if (u_start > u_stop) {
*pos = -1;
return rocksdb::Status::OK();
}
auto bit_pos_in_byte = [](char byte, bool bit) -> int {
for (int i = 0; i < 8; i++) {
if (bit && (byte & (1 << i)) != 0) return i;
if (!bit && (byte & (1 << i)) == 0) return i;
}
return -1;
};
auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
for (uint32_t i = start; i <= stop; i++) {
if (bit && (byte & (1 << i)) != 0) return (int)i; // typecast to int since the value ranges from 0 to 7
if (!bit && (byte & (1 << i)) == 0) return (int)i;
}
return -1;
};
rocksdb::ReadOptions read_options = ctx.GetReadOptions();
// if bit index, (Eg start = 1, stop = 35), then
// u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
uint32_t stop_segment_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
uint32_t start_bit_pos_in_byte = 0;
uint32_t stop_bit_pos_in_byte = 0;
if (is_bit_index) {
start_bit_pos_in_byte = u_start % 8;
stop_bit_pos_in_byte = u_stop % 8;
}
auto range_in_byte = [start_bit_pos_in_byte, stop_bit_pos_in_byte](
uint32_t byte_start, uint32_t byte_end,
uint32_t curr_byte) -> std::pair<uint32_t, uint32_t> {
if (curr_byte == byte_start && curr_byte == byte_end) return {start_bit_pos_in_byte, stop_bit_pos_in_byte};
if (curr_byte == byte_start) return {start_bit_pos_in_byte, 7};
if (curr_byte == byte_end) return {0, stop_bit_pos_in_byte};
return {0, 7};
};
// Don't use multi get to prevent large range query, and take too much memory
// Searching bits in segments [start_index, stop_index].
for (uint32_t i = start_segment_index; i <= stop_segment_index; i++) {
rocksdb::PinnableSlice pin_value;
std::string sub_key =
InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), metadata.version, storage_->IsSlotIdEncoded())
.Encode();
s = storage_->Get(ctx, read_options, sub_key, &pin_value);
if (!s.ok() && !s.IsNotFound()) return s;
if (s.IsNotFound()) {
if (!bit) {
// Note: even if stop is given, we can return immediately when bit is 0.
// because bit_pos will always be greater.
*pos = i * kBitmapSegmentBits;
return rocksdb::Status::OK();
}
continue;
}
size_t byte_pos_in_segment = 0;
size_t byte_with_bit_start = -1;
size_t byte_with_bit_stop = -2;
// if bit index, (Eg start = 1, stop = 35), then
// byte_pos_in_segment should be calculated in bytes, hence divide by 8
if (i == start_segment_index) {
byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
byte_with_bit_start = byte_pos_in_segment;
}
size_t stop_byte_in_segment = pin_value.size();
if (i == stop_segment_index) {
CHECK((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1 <= pin_value.size());
stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
byte_with_bit_stop = stop_byte_in_segment;
}
// Invariant:
// 1. pin_value.size() <= kBitmapSegmentBytes.
// 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
int bit_pos_in_byte_value = -1;
if (is_bit_index) {
uint32_t start_bit = 0, stop_bit = 7;
std::tie(start_bit, stop_bit) = range_in_byte(byte_with_bit_start, byte_with_bit_stop, byte_pos_in_segment);
bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit, stop_bit);
} else {
bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
}
if (bit_pos_in_byte_value != -1) {
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
return rocksdb::Status::OK();
}
}
if (bit) {
continue;
}
// There're two cases that `pin_value.size() < kBitmapSegmentBytes`:
// 1. If it's the last segment, we've done searching in the above loop.
// 2. If it's not the last segment, we can check if the segment is all 0.
if (pin_value.size() < kBitmapSegmentBytes) {
if (i == stop_segment_index) {
continue;
}
*pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 8);
return rocksdb::Status::OK();
}
}
// bit was not found
/* If we are looking for clear bits, and the user specified an exact
* range with start-end, we can't consider the right of the range as
* zero padded (as we do when no explicit end is given).
*
* So if redisBitpos() returns the first bit outside the range,
* we return -1 to the caller, to mean, in the specified range there
* is not a single "0" bit. */
if (stop_given && bit == 0) {
*pos = -1;
return rocksdb::Status::OK();
}
*pos = bit ? -1 : static_cast<int64_t>(metadata.size * 8);
return rocksdb::Status::OK();
}