in torchtext/csrc/vocab.cpp [249:292]
Vocab _load_vocab_from_file(const std::string &file_path,
const int64_t min_freq, const int64_t num_cpus) {
int64_t num_lines = _infer_lines(file_path);
int64_t chunk_size = impl::divup(num_lines, num_cpus);
// Launching a thread on less lines than this likely has too much overhead.
// TODO: Add explicit test beyond grain size to cover multithreading
chunk_size = std::max(chunk_size, GRAIN_SIZE);
std::vector<size_t> offsets;
impl::infer_offsets(file_path, num_lines, chunk_size, offsets);
std::vector<std::shared_ptr<IndexDict>> chunk_counters;
std::mutex m;
std::condition_variable cv;
std::atomic<int> thread_count(0);
// create threads
int64_t j = 0;
for (int64_t i = 0; i < num_lines; i += chunk_size) {
auto counter_ptr = std::make_shared<IndexDict>();
thread_count++;
at::launch([&, file_path, num_lines, chunk_size, j, i, counter_ptr]() {
parse_vocab_file_chunk(file_path, offsets[j], i,
std::min(num_lines, i + chunk_size), counter_ptr);
std::lock_guard<std::mutex> lk(m);
thread_count--;
cv.notify_all();
});
chunk_counters.push_back(counter_ptr);
j++;
}
// block until all threads finish execution
std::unique_lock<std::mutex> lock(m);
cv.wait(lock, [&thread_count] { return thread_count == 0; });
StringList tokens =
_concat_tokens(chunk_counters, min_freq, num_lines, false);
return Vocab(std::move(tokens));
}