void attributionIdSpineFileCombiner()

in fbpcs/data_processing/attribution_id_combiner/AttributionIdSpineFileCombiner.cpp [34:97]


void attributionIdSpineFileCombiner(
    std::istream& dataFile,
    std::istream& spineIdFile,
    std::ostream& outFile) {
  XLOG(INFO) << "Started.";
  const int32_t kPaddingSize = FLAGS_padding_size;
  const std::vector<std::string> publisherCols = {
      "ad_id", "timestamp", "is_click", "campaign_metadata"};
  const std::vector<std::string> partnerCols = {
      "conversion_timestamp", "conversion_value", "conversion_metadata"};

  // Inspect the headers and verify if this is the publisher or partner dataset
  std::string headerLine;
  getline(dataFile, headerLine);
  boost::algorithm::trim_if(headerLine, boost::is_any_of("\r"));
  std::vector<std::string> header;
  folly::split(",", headerLine, header);
  dataFile.clear();
  dataFile.seekg(0);

  bool isPublisherDataset = verifyHeaderContainsCols(header, publisherCols);
  bool isPartnerDataset = verifyHeaderContainsCols(header, partnerCols);
  if (isPartnerDataset == isPublisherDataset) {
    XLOG(FATAL) << "Invalid headers for dataset. Header: <"
                << vectorToString(header) << ">. Both headers have status of: <"
                << isPublisherDataset << ">";
  }

  auto& aggregatedCols = isPublisherDataset ? publisherCols : partnerCols;
  std::vector<int32_t> colPaddingSize(aggregatedCols.size(), kPaddingSize);

  std::stringstream idMappedOutFile;
  std::stringstream idSwapOutFile;
  idSwap(dataFile, spineIdFile, idMappedOutFile);
  spineIdFile.clear();
  spineIdFile.seekg(0);
  idInsert(idMappedOutFile, spineIdFile, idSwapOutFile);

  std::stringstream groupByOutFile;
  std::stringstream groupByUnsortedOutFile;

  if (FLAGS_sort_strategy == "sort") {
    groupBy(idSwapOutFile, "id_", aggregatedCols, groupByUnsortedOutFile);
    sortIds(groupByUnsortedOutFile, groupByOutFile);
  } else if (FLAGS_sort_strategy == "keep_original") {
    groupBy(idSwapOutFile, "id_", aggregatedCols, groupByOutFile);
  } else {
    XLOG(FATAL) << "Invalid sort strategy '" << FLAGS_sort_strategy
                << "'. Expected 'sort' or 'keep_original'.";
  }

  std::stringstream paddedOutFile;
  addPaddingToCols(
      groupByOutFile, aggregatedCols, colPaddingSize, true, paddedOutFile);

  std::vector<std::string> partnerColsToConvert = {
      "conversion_timestamp", "conversion_value"};
  std::vector<std::string> publisherColsToConvert = {"ad_id", "timestamp"};
  std::vector<std::string> columnsToConvert =
      isPublisherDataset ? publisherColsToConvert : partnerColsToConvert;
  headerColumnsToPlural(paddedOutFile, columnsToConvert, outFile);

  XLOG(INFO) << "Finished.";
}