in fbpcs/data_processing/attribution_id_combiner/AttributionIdSpineFileCombiner.cpp [34:97]
void attributionIdSpineFileCombiner(
std::istream& dataFile,
std::istream& spineIdFile,
std::ostream& outFile) {
XLOG(INFO) << "Started.";
const int32_t kPaddingSize = FLAGS_padding_size;
const std::vector<std::string> publisherCols = {
"ad_id", "timestamp", "is_click", "campaign_metadata"};
const std::vector<std::string> partnerCols = {
"conversion_timestamp", "conversion_value", "conversion_metadata"};
// Inspect the headers and verify if this is the publisher or partner dataset
std::string headerLine;
getline(dataFile, headerLine);
boost::algorithm::trim_if(headerLine, boost::is_any_of("\r"));
std::vector<std::string> header;
folly::split(",", headerLine, header);
dataFile.clear();
dataFile.seekg(0);
bool isPublisherDataset = verifyHeaderContainsCols(header, publisherCols);
bool isPartnerDataset = verifyHeaderContainsCols(header, partnerCols);
if (isPartnerDataset == isPublisherDataset) {
XLOG(FATAL) << "Invalid headers for dataset. Header: <"
<< vectorToString(header) << ">. Both headers have status of: <"
<< isPublisherDataset << ">";
}
auto& aggregatedCols = isPublisherDataset ? publisherCols : partnerCols;
std::vector<int32_t> colPaddingSize(aggregatedCols.size(), kPaddingSize);
std::stringstream idMappedOutFile;
std::stringstream idSwapOutFile;
idSwap(dataFile, spineIdFile, idMappedOutFile);
spineIdFile.clear();
spineIdFile.seekg(0);
idInsert(idMappedOutFile, spineIdFile, idSwapOutFile);
std::stringstream groupByOutFile;
std::stringstream groupByUnsortedOutFile;
if (FLAGS_sort_strategy == "sort") {
groupBy(idSwapOutFile, "id_", aggregatedCols, groupByUnsortedOutFile);
sortIds(groupByUnsortedOutFile, groupByOutFile);
} else if (FLAGS_sort_strategy == "keep_original") {
groupBy(idSwapOutFile, "id_", aggregatedCols, groupByOutFile);
} else {
XLOG(FATAL) << "Invalid sort strategy '" << FLAGS_sort_strategy
<< "'. Expected 'sort' or 'keep_original'.";
}
std::stringstream paddedOutFile;
addPaddingToCols(
groupByOutFile, aggregatedCols, colPaddingSize, true, paddedOutFile);
std::vector<std::string> partnerColsToConvert = {
"conversion_timestamp", "conversion_value"};
std::vector<std::string> publisherColsToConvert = {"ad_id", "timestamp"};
std::vector<std::string> columnsToConvert =
isPublisherDataset ? publisherColsToConvert : partnerColsToConvert;
headerColumnsToPlural(paddedOutFile, columnsToConvert, outFile);
XLOG(INFO) << "Finished.";
}