std::unique_ptr executeQuery()

in glean/rts/query.cpp [504:694]


std::unique_ptr<QueryResults> executeQuery (
    Inventory& inventory,
    Define& facts,
    DefineOwnership* ownership,
    Subroutine& sub,
    Pid pid,
    std::shared_ptr<Subroutine> traverse,
    folly::Optional<uint64_t> maxResults,
    folly::Optional<uint64_t> maxBytes,
    folly::Optional<uint64_t> maxTime,
    Depth depth,
    std::unordered_set<Pid, folly::hasher<Pid>>& expandPids,
    bool wantStats,
    folly::Optional<thrift::internal::QueryCont> restart) {

  QueryExecutor q {
    .inventory = inventory,
    .facts = facts,
    .ownership = ownership,
    .sub = sub,
    .pid = pid,
    .traverse = traverse,
    .depth = depth,
    .expandPids = expandPids,
    .wantStats = wantStats
  };
  // coarse_steady_clock is around 1ms granularity which is enough for us.
  q.timeout = Clock::now();
  q.start_time = Clock::now();
  if (maxTime) {
    q.timeout += std::chrono::milliseconds{*maxTime};
    q.check_timeout = CHECK_TIMEOUT_INTERVAL;
  } else {
    q.check_timeout = UINT64_MAX;
  }

  q.outputs.resize(sub.outputs);

  // Set up all the iterators as before if we're restarting
  if (restart) {
    for (auto& savedIter : *restart->iters()) {
      std::unique_ptr<FactIterator> iter;
      Id id;
      if (const auto type = Pid::fromThrift(*savedIter.type())) {
        auto key = binary::byteRange(*savedIter.key());
        iter = facts.seek(type, key, savedIter.get_prefix_size());
        auto res = iter->get(FactIterator::KeyOnly);
        if (!res || res.key() != key) {
          error("restart iter didn't find a key");
        }
        id = res.id;
      } else {
        // We serialized a finished iterator
        iter = std::make_unique<EmptyIterator>();
        id = Id::invalid();
      }
      q.iters.emplace_back(QueryExecutor::Iter{std::move(iter),
          Pid::fromWord(*savedIter.type()),
          id,
          static_cast<size_t>(savedIter.get_prefix_size()),
          *savedIter.first()});
    }
  }

  if (restart) {
    for (auto i = 0; i < sub.outputs; i++) {
      q.outputs[i].bytes(
          restart->outputs()[i].data(), restart->outputs()[i].size());
    }
  }

  auto max_results = maxResults ? *maxResults : UINT64_MAX;
  auto max_bytes = maxBytes ? *maxBytes : UINT64_MAX;

  // IF YOU BREAK BACKWARD COMPATIBILITY HERE, BUMP version IN
  // Glean.Bytecode.Generate.Instruction
  //
  // IF YOU ALSO BREAK FORWARD COMPATIBILITY, BUMP latestSupportedVersion AS
  // WELL

  const std::function<uint64_t(uint64_t, uint64_t, uint64_t)> seek_ =
      [&](uint64_t type, uint64_t prefix, uint64_t end) {
        return q.seek(
            Pid::fromWord(type),
            folly::ByteRange(
                reinterpret_cast<uint8_t *>(prefix),
                reinterpret_cast<uint8_t *>(end)));
      };


  const std::function<uint64_t()> currentSeek_ = [&]() -> uint64_t {
    return q.currentSeek();
  };

  const std::function<void(uint64_t)> endSeek_ = [&](uint64_t token) {
    q.endSeek(token);
  };

  const std::function<uint64_t(uint64_t, uint64_t, uint64_t *, uint64_t *,
                               uint64_t *, uint64_t *)>
      next_ = [&](uint64_t token, uint64_t demand, uint64_t *clause_begin,
                  uint64_t *key_end, uint64_t *clause_end, uint64_t *id) {
        if (q.timeExpired()) {
          return 2;
        }
        if (q.interrupted()) {
          return 2;
        }
        auto res = q.next(token, demand != 0 ? FactIterator::KeyValue
                                             : FactIterator::KeyOnly);
        if (!res) {
          return 0;
        }
        *id = res.id.toWord();
        *clause_begin = reinterpret_cast<uint64_t>(res.clause.bytes().data());
        *key_end = reinterpret_cast<uint64_t>(res.clause.key().end());
        *clause_end = reinterpret_cast<uint64_t>(res.clause.bytes().end());
        return 1;
      };

  const std::function<uint64_t(uint64_t, uint64_t *, uint64_t *)>
      lookupKeyValue_ = [&](uint64_t fid, uint64_t *kout, uint64_t *vout) {
        return q.lookupKeyValue(
            Id::fromWord(fid),
            reinterpret_cast<binary::Output *>(kout),
            reinterpret_cast<binary::Output *>(vout)).toWord();
      };

  const std::function<uint64_t(uint64_t, uint64_t *, uint64_t)>
      newDerivedFact_ = [&](uint64_t type, uint64_t *key, uint64_t size) {
        return q.newDerivedFact(
            Pid::fromWord(type),
            reinterpret_cast<binary::Output *>(key),
            size).toWord();
      };

  const std::function<void(uint64_t *, uint64_t *)> saveState_ =
      [&](uint64_t *pc, uint64_t *frame) { q.saveState(pc, frame); };

  const std::function<void(uint64_t, binary::Output *, binary::Output *,
                           uint64_t, uint64_t)>
      resultWithPid_ = [&](uint64_t id, binary::Output *key,
                           binary::Output *val, uint64_t pid, uint64_t rec) {
        q.resultWithPid(
            Id::fromWord(id),
            key,
            val,
            Pid::fromWord(pid),
            rec);
      };

  const std::function<uint64_t(uint64_t, binary::Output *, binary::Output *)>
      result_ = [&](uint64_t id, binary::Output *key, binary::Output *val) {
        return q.result(Id::fromWord(id), key, val);
      };

  std::vector<uint64_t> args;

  if (restart) {
    args.reserve(sub.inputs + sub.locals);
  } else {
    args.reserve(sub.inputs);
  }

  args.push_back(reinterpret_cast<uint64_t>(&seek_));
  args.push_back(reinterpret_cast<uint64_t>(&currentSeek_));
  args.push_back(reinterpret_cast<uint64_t>(&endSeek_));
  args.push_back(reinterpret_cast<uint64_t>(&next_));
  args.push_back(reinterpret_cast<uint64_t>(&lookupKeyValue_));
  args.push_back(reinterpret_cast<uint64_t>(&result_));
  args.push_back(reinterpret_cast<uint64_t>(&resultWithPid_));
  args.push_back(reinterpret_cast<uint64_t>(&newDerivedFact_));
  args.push_back(reinterpret_cast<uint64_t>(&saveState_));
  args.push_back(reinterpret_cast<uint64_t>(max_results));
  args.push_back(reinterpret_cast<uint64_t>(max_bytes));
  for (auto i = 0; i < sub.outputs; i++) {
    args.push_back(reinterpret_cast<uint64_t>(&q.outputs[i]));
  }

  if (restart) {
    std::copy(
        restart->sub()->locals()->begin(),
        restart->sub()->locals()->end(),
        std::back_inserter(args));
    sub.restart(args.data(), *restart->sub()->entry());
  } else {
    sub.execute(args.data());
  }

  return q.finish();
}