in fbpcs/data_processing/pid_preparer/UnionPIDDataPreparer.cpp [74:160]
UnionPIDDataPreparerResults UnionPIDDataPreparer::prepare() const {
UnionPIDDataPreparerResults res;
auto inStreamPtr = fbpcf::io::getInputStream(inputPath_);
auto& inStream = inStreamPtr->get();
// Get a random ID to avoid potential name collisions if multiple
// runs at the same time point to the same input file
auto randomId = std::to_string(folly::Random::secureRand64());
std::string tmpFilename = randomId + "_" +
private_lift::filepath_helpers::getBaseFilename(inputPath_) + "_prepared";
auto tmpFilepath = (tmpDirectory_ / tmpFilename).string();
std::cout << "\t\tCreated temporary filepath --> " << tmpFilepath << '\n';
auto tmpFile = std::make_unique<std::ofstream>(tmpFilename);
std::string line;
getline(inStream, line);
auto header = split(line, kCommaSplitRegex);
auto idIter = std::find(header.begin(), header.end(), kIdColumnName);
if (idIter == header.end()) {
// note: it's not *essential* to clean up tmpfile here, but it will
// pollute our test directory otherwise, which is just somewhat annoying.
std::remove(tmpFilename.c_str());
XLOG(FATAL) << kIdColumnName << " column missing from input header\n"
<< "Header: " << vectorToString(header);
}
auto idColumnIdx = std::distance(header.begin(), idIter);
std::unordered_set<std::string> seenIds;
while (getline(inStream, line)) {
auto cols = split(line, kCommaSplitRegex);
auto rowSize = cols.size();
auto headerSize = header.size();
if (rowSize != headerSize) {
// note: it's not *essential* to clean up tmpfile here, but it will
// pollute our test directory otherwise, which is just somewhat annoying.
std::remove(tmpFilename.c_str());
XLOG(FATAL) << "Mismatch between header and row at index "
<< res.linesProcessed << '\n'
<< "Header has size " << headerSize << " while row has size "
<< rowSize << '\n'
<< "Header: " << vectorToString(header) << '\n'
<< "Row : " << vectorToString(cols);
}
auto id = cols.at(idColumnIdx);
if (seenIds.find(id) == seenIds.end()) {
*tmpFile << id << '\n';
seenIds.insert(id);
} else {
++res.duplicateIdCount;
}
++res.linesProcessed;
if (res.linesProcessed % logEveryN_ == 0) {
XLOG(INFO) << "Processed "
<< private_lift::logging::formatNumber(res.linesProcessed)
<< " lines.";
}
}
XLOG(INFO) << "Processed with "
<< private_lift::logging::formatNumber(res.duplicateIdCount)
<< " duplicate ids.";
XLOG(INFO) << "Now copying prepared data to final output path";
// Reset underlying unique_ptr to ensure buffer gets flushed
tmpFile.reset();
XLOG(INFO) << "Writing " << tmpFilename << " -> " << outputPath_;
auto outputType = fbpcf::io::getFileType(outputPath_);
if (outputType == fbpcf::io::FileType::S3) {
private_lift::s3_utils::uploadToS3(tmpFilename, outputPath_);
} else if (outputType == fbpcf::io::FileType::Local) {
std::filesystem::copy(
tmpFilename,
outputPath_,
std::filesystem::copy_options::overwrite_existing);
} else {
throw std::runtime_error{"Unsupported output destination"};
}
// We need to make sure we clean up the tmpfiles now
std::remove(tmpFilename.c_str());
XLOG(INFO) << "File write successful.";
return res;
}