in src/utils/utils.h [106:156]
void foreach_line(const String& fname,
Lambda f,
int numThreads = 1) {
using namespace std;
auto filelen = [&](ifstream& f) {
f.seekg(0, ios_base::end);
return tellg(f);
};
ifstream ifs(fname);
if (!ifs.good()) {
throw runtime_error(string("error opening ") + fname);
}
auto len = filelen(ifs);
// partitions[i],partitions[i+1] will be the bytewise boundaries for the i'th
// thread.
std::vector<off_t> partitions(numThreads + 1);
partitions[0] = 0;
partitions[numThreads] = len;
// Seek to bytewise partition boundaries, and read one line forward.
string unused;
for (int i = 1; i < numThreads; i++) {
reset(ifs, (len / numThreads) * i);
getline(ifs, unused);
partitions[i] = tellg(ifs);
}
// It's possible that the ranges in partitions overlap; consider,
// e.g., a machine with 100 hardware threads and only 99 lines
// in the file. In this case, we'll do some excess work, so we ask
// that f() be idempotent.
vector<thread> threads;
for (int i = 0; i < numThreads; i++) {
threads.emplace_back([i, f, &fname, &partitions] {
detail::id = i;
// Get our own seek pointer.
ifstream ifs2(fname);
ifs2.seekg(partitions[i]);
string line;
while (tellg(ifs2) < partitions[i + 1] && getline(ifs2, line)) {
// We don't know the line number. Super-bummer.
f(line);
}
});
}
for (auto &t: threads) {
t.join();
}
}