void LiftIdSpineFileCombiner::combineFile()

in fbpcs/data_processing/lift_id_combiner/LiftIdSpineFileCombiner.cpp [41:208]


void LiftIdSpineFileCombiner::combineFile() {
  auto dataInStreamPtr = fbpcf::io::getInputStream(dataPath_);
  auto spineInStreamPtr = fbpcf::io::getInputStream(spinePath_);
  auto& dataInStream = dataInStreamPtr->get();
  auto& spineInStream = spineInStreamPtr->get();

  // Get a random ID to avoid potential name collisions if multiple
  // runs at the same time point to the same input file
  auto randomId = std::to_string(folly::Random::secureRand64());
  std::string tmpFilename = randomId + "_" +
      private_lift::filepath_helpers::getBaseFilename(outputPath_);
  auto tmpFilepath = (tmpDirectory_ / tmpFilename).string();
  XLOG(INFO) << "Writing temporary file to " << tmpFilepath;
  auto tmpFile = std::make_unique<std::ofstream>(tmpFilepath);

  XLOG(INFO) << "Combining " << dataPath_ << " and " << spinePath_ << " into "
             << outputPath_;
  const std::vector<std::string> requiredPublisherCols = {
      "opportunity_timestamp", "test_flag"};
  const std::vector<std::string> requiredPartnerCols = {"event_timestamp"};

  // TODO T86923630: Uncomment this once data validation supports hashed ids
  // Temporary workaround because it breaks on non-int id_ column
  // pid::combiner::validateCsvData(dataInStream);
  dataInStream.clear();
  dataInStream.seekg(0);

  // Inspect the headers and verify if this is the publisher or partner dataset
  std::string headerLine;
  getline(dataInStream, headerLine);
  std::vector<std::string> header;
  folly::split(",", headerLine, header);
  dataInStream.clear();
  dataInStream.seekg(0);

  bool isPublisherDataset =
      combiner::verifyHeaderContainsCols(header, requiredPublisherCols);
  bool isPartnerDataset =
      combiner::verifyHeaderContainsCols(header, requiredPartnerCols);
  if (isPartnerDataset == isPublisherDataset) {
    XLOG(FATAL) << "Invalid headers for dataset.";
  }

  // run idSwap followed by idInsert
  // TODO: Switch from stringstreams to a real random filename
  std::stringstream idSwapOutFile;
  std::stringstream idMappedOutFile;
  pid::combiner::idSwap(dataInStream, spineInStream, idMappedOutFile);
  spineInStream.clear();
  spineInStream.seekg(0);
  pid::combiner::idInsert(idMappedOutFile, spineInStream, idSwapOutFile);

  std::string line;

  // if partner data, we want to aggregate over remaining columns,
  // add padding, and rename the aggregated columns

  // if its publisher, we want to add the opportunity column based on
  // opportunity_timestamp
  if (isPartnerDataset) {
    // get all columns that are not id_, these are the columns we want to
    // aggregate
    std::vector<std::string> aggregatedCols = header;
    aggregatedCols.erase(
        std::find(aggregatedCols.begin(), aggregatedCols.end(), "id_"));

    std::stringstream groupByOutFile;
    std::stringstream groupByUnsortedOutFile;
    if (FLAGS_sort_strategy == "sort") {
      pid::combiner::groupBy(
          idSwapOutFile, "id_", aggregatedCols, groupByUnsortedOutFile);
      pid::combiner::sortIds(groupByUnsortedOutFile, groupByOutFile);
    } else if (FLAGS_sort_strategy == "keep_original") {
      pid::combiner::groupBy(
          idSwapOutFile, "id_", aggregatedCols, groupByOutFile);
    } else {
      XLOG(FATAL) << "Invalid sort strategy '" << FLAGS_sort_strategy
                  << "'. Expected 'sort' or 'keep_original'.";
    }

    // add "s" to all aggregated column headers
    std::stringstream renamedColsFile;
    std::vector<std::string> renamedColsVec = header;
    for (auto& colName : aggregatedCols) {
      auto it = find(renamedColsVec.begin(), renamedColsVec.end(), colName);
      colName.append("s");
      *it = colName;
    }
    renamedColsFile << combiner::vectorToString(renamedColsVec) << "\n";
    getline(groupByOutFile, line);
    renamedColsFile << groupByOutFile.rdbuf();

    // define padding size and add padding to aggregated columns
    std::vector<int32_t> colPaddingSize(
        aggregatedCols.size(), FLAGS_multi_conversion_limit);
    std::stringstream paddingOutFile;

    pid::combiner::addPaddingToCols(
        renamedColsFile, aggregatedCols, colPaddingSize, true, paddingOutFile);

    // ensure conversions are sorted by timestamp
    std::stringstream sortingOutFile;
    std::string sortBy = "event_timestamps";
    std::vector<std::string> listColumns = {"event_timestamps"};
    // It's possible that this is a "valueless" run
    // Also remember that we need to search for the *original* header name
    // since we have pluralized it in a previous step
    if (std::find(header.begin(), header.end(), "value") != header.end()) {
      listColumns.push_back("values");
    }
    pid::combiner::sortIntegralValues(
        paddingOutFile, sortingOutFile, sortBy, listColumns);

    *tmpFile << sortingOutFile.rdbuf();
  } else if (isPublisherDataset) {
    // We need to get the timestamp index *before* we add the new column
    // Otherwise, we'll get a std::out_of_range exception
    auto timestampIndex =
        combiner::headerIndex(header, "opportunity_timestamp");
    // add opportunity to header
    header.insert(header.end() - 1, "opportunity");
    *tmpFile << combiner::vectorToString(header) << "\n";

    // add opportunity value.
    // if timestamp is 0, opportunity is 0
    // if timestamp is not 0, opportunity is 1
    getline(idSwapOutFile, line); // skip header
    while (getline(idSwapOutFile, line)) {
      std::vector<std::string> row;
      folly::split(",", line, row);
      if (row.at(timestampIndex) == "0") {
        row.insert(row.end() - 1, "0");
      } else {
        row.insert(row.end() - 1, "1");
      }
      *tmpFile << combiner::vectorToString(row) << "\n";
    }
  }

  XLOG(INFO) << "Now copying combined data to final output path";
  // Reset underlying unique_ptr to ensure buffer gets flushed
  tmpFile.reset();
  if (outputPath_ != tmpFilepath) {
    // The only time this wouldn't be the case is if tmpFilepath is somehow
    // the final output location (which is possible if the final output is in
    // the same location as our tmpDirectory)
    // TODO: This should never happen if we actually use a tmp filename
    XLOG(INFO) << "Writing " << tmpFilepath << " -> " << outputPath_;

    auto outputType = fbpcf::io::getFileType(outputPath_);
    if (outputType == fbpcf::io::FileType::S3) {
      private_lift::s3_utils::uploadToS3(tmpFilepath, outputPath_);
    } else if (outputType == fbpcf::io::FileType::Local) {
      if (outputPath_.has_parent_path()) {
        std::filesystem::create_directories(outputPath_.parent_path());
      }
      std::filesystem::copy(
          tmpFilepath,
          outputPath_,
          std::filesystem::copy_options::overwrite_existing);
    } else {
      throw std::runtime_error{"Unsupported output destination"};
    }
    // We need to make sure we clean up the tmpfiles now
    std::remove(tmpFilepath.c_str());
  }
  XLOG(INFO) << "Finished combiner.";
}