int main()

in fbpcs/data_processing/attribution_id_combiner/AttributionIdSpineCombiner.cpp [29:96]


int main(int argc, char** argv) {
  fbpcs::performance_tools::CostEstimation cost{"data_processing"};
  cost.start();

  folly::init(&argc, &argv);
  gflags::ParseCommandLineFlags(&argc, &argv, true);
  fbpcf::AwsSdk::aquire();

  std::filesystem::path outputPath{FLAGS_output_path};
  std::filesystem::path tmpDirectory{FLAGS_tmp_directory};

  XLOG(INFO) << "Starting data_processing run on: data_path:" << FLAGS_data_path
             << ", spine_path: " << FLAGS_spine_path
             << ", output_path: " << FLAGS_output_path
             << ", tmp_directory: " << FLAGS_tmp_directory
             << ", sorting_strategy: " << FLAGS_sort_strategy;

  auto dataInStreamPtr = fbpcf::io::getInputStream(FLAGS_data_path);
  auto spineInStreamPtr = fbpcf::io::getInputStream(FLAGS_spine_path);
  std::istream& dataStream = dataInStreamPtr->get();
  std::istream& spineStream = spineInStreamPtr->get();

  // Get a random ID to avoid potential name collisions if multiple
  // runs at the same time point to the same input file
  auto randomId = std::to_string(folly::Random::secureRand64());
  std::string tmpFilename = randomId + "_" +
      private_lift::filepath_helpers::getBaseFilename(outputPath);
  std::filesystem::path tmpFilepath = (tmpDirectory / tmpFilename);
  XLOG(INFO) << "Writing temporary file to " << tmpFilepath;
  std::ofstream tmpFile{tmpFilepath};

  pid::combiner::attributionIdSpineFileCombiner(
      dataStream, spineStream, tmpFile);
  tmpFile.close();

  auto outputType = fbpcf::io::getFileType(outputPath);
  if (outputPath != tmpFilepath) {
    if (outputType == fbpcf::io::FileType::S3) {
      private_lift::s3_utils::uploadToS3(tmpFilepath, outputPath);
    } else if (outputType == fbpcf::io::FileType::Local) {
      std::filesystem::create_directories(outputPath.parent_path());
      std::filesystem::copy(
          tmpFilepath,
          outputPath,
          std::filesystem::copy_options::overwrite_existing);
    } else {
      throw std::runtime_error{"Unsupported output destination"};
    }

    // We need to make sure we clean up the tmpfiles now
    std::remove(tmpFilepath.c_str());
  }

  cost.end();
  XLOG(INFO) << cost.getEstimatedCostString();
  if (FLAGS_run_name != "") {
    std::string runName = folly::to<std::string>(
        cost.getApplication(),
        "_",
        FLAGS_run_name,
        "_",
        measurement::private_attribution::getDateString());
    XLOG(INFO) << cost.writeToS3(
        runName, cost.getEstimatedCostDynamic(runName));
  }

  return 0;
}