rocksdb::Status Bitmap::BitPos()

in src/types/redis_bitmap.cc [309:457]


rocksdb::Status Bitmap::BitPos(engine::Context &ctx, const Slice &user_key, bool bit, int64_t start, int64_t stop,
                               bool stop_given, int64_t *pos, bool is_bit_index) {
  if (is_bit_index) CHECK(stop_given);

  std::string raw_value;
  std::string ns_key = AppendNamespacePrefix(user_key);

  BitmapMetadata metadata(false);
  rocksdb::Status s = GetMetadata(ctx, ns_key, &metadata, &raw_value);
  if (!s.ok() && !s.IsNotFound()) return s;
  if (s.IsNotFound()) {
    *pos = bit ? -1 : 0;
    return rocksdb::Status::OK();
  }

  if (metadata.Type() == kRedisString) {
    redis::BitmapString bitmap_string_db(storage_, namespace_);
    return bitmap_string_db.BitPos(raw_value, bit, start, stop, stop_given, pos, is_bit_index);
  }

  uint32_t to_bit_factor = is_bit_index ? 8 : 1;
  auto size = static_cast<int64_t>(metadata.size) * static_cast<int64_t>(to_bit_factor);

  std::tie(start, stop) = BitmapString::NormalizeRange(start, stop, size);
  auto u_start = static_cast<uint64_t>(start);
  auto u_stop = static_cast<uint64_t>(stop);
  if (u_start > u_stop) {
    *pos = -1;
    return rocksdb::Status::OK();
  }

  auto bit_pos_in_byte = [](char byte, bool bit) -> int {
    for (int i = 0; i < 8; i++) {
      if (bit && (byte & (1 << i)) != 0) return i;
      if (!bit && (byte & (1 << i)) == 0) return i;
    }
    return -1;
  };

  auto bit_pos_in_byte_startstop = [](char byte, bool bit, uint32_t start, uint32_t stop) -> int {
    for (uint32_t i = start; i <= stop; i++) {
      if (bit && (byte & (1 << i)) != 0) return (int)i;  // typecast to int since the value ranges from 0 to 7
      if (!bit && (byte & (1 << i)) == 0) return (int)i;
    }
    return -1;
  };

  rocksdb::ReadOptions read_options = ctx.GetReadOptions();
  // if bit index, (Eg start = 1, stop = 35), then
  // u_start = 1/8 = 0, u_stop = 35/8 = 4 (in bytes)
  uint32_t start_segment_index = (u_start / to_bit_factor) / kBitmapSegmentBytes;
  uint32_t stop_segment_index = (u_stop / to_bit_factor) / kBitmapSegmentBytes;
  uint32_t start_bit_pos_in_byte = 0;
  uint32_t stop_bit_pos_in_byte = 0;

  if (is_bit_index) {
    start_bit_pos_in_byte = u_start % 8;
    stop_bit_pos_in_byte = u_stop % 8;
  }

  auto range_in_byte = [start_bit_pos_in_byte, stop_bit_pos_in_byte](
                           uint32_t byte_start, uint32_t byte_end,
                           uint32_t curr_byte) -> std::pair<uint32_t, uint32_t> {
    if (curr_byte == byte_start && curr_byte == byte_end) return {start_bit_pos_in_byte, stop_bit_pos_in_byte};
    if (curr_byte == byte_start) return {start_bit_pos_in_byte, 7};
    if (curr_byte == byte_end) return {0, stop_bit_pos_in_byte};
    return {0, 7};
  };

  // Don't use multi get to prevent large range query, and take too much memory
  // Searching bits in segments [start_index, stop_index].
  for (uint32_t i = start_segment_index; i <= stop_segment_index; i++) {
    rocksdb::PinnableSlice pin_value;
    std::string sub_key =
        InternalKey(ns_key, std::to_string(i * kBitmapSegmentBytes), metadata.version, storage_->IsSlotIdEncoded())
            .Encode();
    s = storage_->Get(ctx, read_options, sub_key, &pin_value);
    if (!s.ok() && !s.IsNotFound()) return s;
    if (s.IsNotFound()) {
      if (!bit) {
        // Note: even if stop is given, we can return immediately when bit is 0.
        // because bit_pos will always be greater.
        *pos = i * kBitmapSegmentBits;
        return rocksdb::Status::OK();
      }
      continue;
    }
    size_t byte_pos_in_segment = 0;
    size_t byte_with_bit_start = -1;
    size_t byte_with_bit_stop = -2;
    // if bit index, (Eg start = 1, stop = 35), then
    // byte_pos_in_segment should be calculated in bytes, hence divide by 8
    if (i == start_segment_index) {
      byte_pos_in_segment = (u_start / to_bit_factor) % kBitmapSegmentBytes;
      byte_with_bit_start = byte_pos_in_segment;
    }
    size_t stop_byte_in_segment = pin_value.size();
    if (i == stop_segment_index) {
      CHECK((u_stop / to_bit_factor) % kBitmapSegmentBytes + 1 <= pin_value.size());
      stop_byte_in_segment = (u_stop / to_bit_factor) % kBitmapSegmentBytes + 1;
      byte_with_bit_stop = stop_byte_in_segment;
    }
    // Invariant:
    // 1. pin_value.size() <= kBitmapSegmentBytes.
    // 2. If it's the last segment, metadata.size % kBitmapSegmentBytes <= pin_value.size().
    for (; byte_pos_in_segment < stop_byte_in_segment; byte_pos_in_segment++) {
      int bit_pos_in_byte_value = -1;
      if (is_bit_index) {
        uint32_t start_bit = 0, stop_bit = 7;
        std::tie(start_bit, stop_bit) = range_in_byte(byte_with_bit_start, byte_with_bit_stop, byte_pos_in_segment);
        bit_pos_in_byte_value = bit_pos_in_byte_startstop(pin_value[byte_pos_in_segment], bit, start_bit, stop_bit);
      } else {
        bit_pos_in_byte_value = bit_pos_in_byte(pin_value[byte_pos_in_segment], bit);
      }

      if (bit_pos_in_byte_value != -1) {
        *pos = static_cast<int64_t>(i * kBitmapSegmentBits + byte_pos_in_segment * 8 + bit_pos_in_byte_value);
        return rocksdb::Status::OK();
      }
    }
    if (bit) {
      continue;
    }
    // There're two cases that `pin_value.size() < kBitmapSegmentBytes`:
    // 1. If it's the last segment, we've done searching in the above loop.
    // 2. If it's not the last segment, we can check if the segment is all 0.
    if (pin_value.size() < kBitmapSegmentBytes) {
      if (i == stop_segment_index) {
        continue;
      }
      *pos = static_cast<int64_t>(i * kBitmapSegmentBits + pin_value.size() * 8);
      return rocksdb::Status::OK();
    }
  }
  // bit was not found
  /* If we are looking for clear bits, and the user specified an exact
   * range with start-end, we can't consider the right of the range as
   * zero padded (as we do when no explicit end is given).
   *
   * So if redisBitpos() returns the first bit outside the range,
   * we return -1 to the caller, to mean, in the specified range there
   * is not a single "0" bit. */
  if (stop_given && bit == 0) {
    *pos = -1;
    return rocksdb::Status::OK();
  }
  *pos = bit ? -1 : static_cast<int64_t>(metadata.size * 8);
  return rocksdb::Status::OK();
}