void validate()

in glean/rts/validate.cpp [22:159]


void validate(const Inventory& inventory, const Validate& val, Lookup& facts) {
  const auto starting_id = facts.startingId();
  const auto first_free = facts.firstFreeId();

  size_t elems = distance(starting_id, first_free);
  size_t max_shards = 10000; // 0.4mb of mutexes
  size_t lock_shard_size = std::min(elems, max_shards);
  std::vector<std::mutex> locks(elems / lock_shard_size);
  std::vector<Pid> types(elems, Pid::invalid());

  std::mutex err_lock;
  std::string err;
  std::atomic<bool> has_err = false;
  auto fail([&](std::string msg) {
    const std::lock_guard<std::mutex> lock(err_lock);
    has_err.store(true);
    err = msg;
  });

  auto has_failure([&]() {
    return has_err.load();
  });

  // thread-safe type cache check
  auto expect_type([&](Id id, Pid type) {
    auto ix = distance(starting_id, id);
    size_t lock_ix = ix / lock_shard_size;
    const std::lock_guard<std::mutex> lock(locks.at(lock_ix));

    if (types.at(ix) == Pid::invalid()) {
      types[ix] = type;
    }

    auto real_type = types.at(ix);
    if (type != real_type) {
      fail("fact type mismatch");
    }

    return id;
  });

  Renamer checker(expect_type);

  std::atomic<size_t> count = 0;
  auto validate_section([&](Id from, Id to) {
    auto allowed_id = from;

    for (auto i = facts.enumerate(from, to); auto fact = i->get(); i->next()) {
      if (count.load() >= val.limit || has_failure()) {
        break;
      }
      ++count;

      if (fact.id < allowed_id) {
        fail("enumeration out of order");
      }
      if (fact.id >= first_free) {
        fail("fact id out of bounds");
      }

      if (val.typecheck) {
        const auto *predicate = inventory.lookupPredicate(fact.type);
        if (predicate == nullptr) {
          fail("invalid predicate");
        }

        binary::Output out;
        uint64_t key_size;

        predicate->typecheck(checker, fact.clause, out, key_size);

        if (fact.clause.bytes() != out.bytes()) {
          fail("invalid fact");
        }
        if (fact.clause.key_size != key_size) {
          fail("key size mismatch");
        }
      }

      if (val.keys && facts.idByKey(fact.type, fact.key()) != fact.id) {
        fail("idByKey mismatch");
      }

      allowed_id = fact.id + 1;

      expect_type(fact.id, fact.type);
    }
  });

  const auto max_parallel = 10;
  size_t min_ids_per_section = 100000;
  auto ids_per_section = std::min(min_ids_per_section, elems / max_parallel);
  std::atomic<int> section_no = 0;
  auto get_next_section_starting_id([&]() {
    int section = section_no++;
    return starting_id + section * ids_per_section;
  });

  auto last_percent = -1;
  std::mutex percent_lock;
  auto report_progress([&]() {
    const std::lock_guard<std::mutex> lock(percent_lock);
    size_t percent = (100 * count.load()) / elems;
    if (percent != last_percent) {
      last_percent = percent;
      VLOG(2) << percent << "%";
    }
  });

  // validate multiple sections
  auto worker([&]() {
    while (true) {
      Id from = get_next_section_starting_id();
      Id to = std::min(from + ids_per_section, first_free);

      if (from >= first_free || count.load() >= val.limit || has_failure()) {
        break;
      }

      validate_section(from, to);
      report_progress();
    }
  });

  std::vector<std::future<void>> workers(max_parallel);
  // start workers
  for (auto i = 0; i < max_parallel; i++) {
    workers[i] = std::async(worker);
  }

  for (auto& w : workers) {
    w.wait();
  }

  if (has_failure()) {
    rts::error(err);
  }
}