void groupBy()

in fbpcs/data_processing/id_combiner/GroupBy.cpp [26:121]


void groupBy(
    std::istream& inFile,
    std::string groupByColumn,
    std::vector<std::string> columnsToAggregate,
    std::ostream& outFile) {
  const std::string kCommaSplitRegex = ",";

  XLOG(INFO) << "[C++ GroupBy] Starting GroupBy run to aggregate columns: "
             << vectorToString(columnsToAggregate)
             << " by column: " << groupByColumn << " \n";

  std::string line;
  std::string row;

  getline(inFile, line);
  std::vector<std::string> header;
  folly::split(kCommaSplitRegex, line, header);

  auto groupByColumnIndex = headerIndex(header, groupByColumn);

  auto headerSize = header.size();

  // Output the header as before
  outFile << vectorToString(header) << "\n";

  // Build a map for group_by_column value to row
  std::unordered_map<std::string, std::vector<std::vector<std::string>>>
      idToRows;

  // Store the order of traversal of the file to retain order
  std::vector<std::string> traversedOrder;
  std::unordered_set<std::string> hasBeenTraversed;
  while (getline(inFile, row)) {
    std::vector<std::string> cols;
    folly::split(kCommaSplitRegex, row, cols);
    auto rowSize = cols.size();
    if (rowSize != headerSize) {
      XLOG(FATAL) << "Mismatch between header and row" << '\n'
                  << "Header has size " << headerSize << " while row has size "
                  << rowSize << '\n'
                  << "Header: " << line << '\n'
                  << "Row   : " << row << '\n';
    }
    auto rowId = cols.at(groupByColumnIndex);
    if (hasBeenTraversed.count(rowId) == 0) {
      hasBeenTraversed.insert(rowId);
      traversedOrder.push_back(rowId);
    }
    idToRows[rowId].push_back(cols);
  }

  // build new map that aggregates common values into lists
  // newIdToRows maps the groupById to vectors of vectors
  // where the inner vector is an aggregation over col values
  // and the other vector represents all the columns for a given row
  std::map<std::string, std::vector<std::vector<std::string>>> newIdToRows;
  for (const auto& keyVal : idToRows) { // key value pairs of id to list of rows
    std::vector<std::vector<std::string>> newRow;
    std::vector<std::vector<std::string>> rows = keyVal.second;
    for (std::size_t i = 0; i < rows.size(); i++) {
      auto curr_row = rows.at(i);
      for (std::size_t j = 0; j < curr_row.size(); j++) {
        if (newRow.size() > j) {
          newRow.at(j).push_back(curr_row.at(j));
        } else {
          newRow.push_back({curr_row.at(j)});
        }
      }
    }
    newIdToRows[keyVal.first] = newRow;
  }

  // This loop iterates over all the unique rows and writes the aggregated
  // values to the output file
  // note that for columns that were not specified in columnsToAggregate, we
  // output a single value rather than a list of values
  for (const auto& id : traversedOrder) {
    auto currRow = newIdToRows.at(id);
    for (std::size_t i = 0; i < currRow.size(); i++) {
      if (std::find(
              columnsToAggregate.begin(),
              columnsToAggregate.end(),
              header.at(i)) != columnsToAggregate.end()) {
        outFile << "[" << vectorToString(currRow.at(i)) << "]";
      } else { // just write out the first value in the list
        outFile << currRow.at(i).at(0);
      }

      if (i < currRow.size() - 1) {
        outFile << ",";
      }
    }
    outFile << "\n";
  }
  XLOG(INFO) << "[C++ GroupBy] Finished.\n";
}