void addOwnership()

in glean/rocksdb/rocksdb.cpp [1070:1124]


  void addOwnership(const std::vector<OwnershipSet>& ownership) override {
    container_.requireOpen();

    if (ownership.empty()) {
      return;
    }

    size_t new_count = 0;
    std::vector<size_t> touched;
    rocksdb::WriteBatch batch;

    for (const auto& set : ownership) {
      uint32_t unit_id;
      auto res = getUnitId(set.unit);
      if (res.hasValue()) {
        unit_id = *res;
        if (unit_id >= ownership_unit_counters.size()) {
          rts::error("inconsistent unit id {}", unit_id);
        }
        touched.push_back(unit_id);
      } else {
        unit_id = ownership_unit_counters.size() + new_count;
        check(batch.Put(
          container_.family(Family::ownershipUnits),
          slice(set.unit),
          toSlice(unit_id)));
        check(batch.Put(
          container_.family(Family::ownershipUnitIds),
          slice(EncodedNat(unit_id).byteRange()),
          slice(set.unit)));
        ++new_count;
      }

      binary::Output key;
      key.nat(unit_id);
      key.nat(unit_id < ownership_unit_counters.size()
        ? ownership_unit_counters[unit_id]
        : 0);
      check(batch.Put(
        container_.family(Family::ownershipRaw),
        slice(key),
        rocksdb::Slice(
          reinterpret_cast<const char *>(set.ids.data()),
          set.ids.size() * sizeof(int64_t))));
    }

    check(container_.db->Write(container_.writeOptions, &batch));

    for (auto i : touched) {
      assert(i < ownership_unit_counters.size());
      ++ownership_unit_counters[i];
    }
    ownership_unit_counters.insert(
      ownership_unit_counters.end(),  new_count, 1);
  }