Intrusive_ptr Csv_reader::init_parsers_and_make_schema()

in src/mlio/csv_reader.cc [325:395]


Intrusive_ptr<const Schema> Csv_reader::init_parsers_and_make_schema()
{
    std::size_t batch_size = params().batch_size;

    std::vector<Attribute> attrs{};

    std::size_t num_columns = column_names_.size();

    column_ignores_.reserve(num_columns);
    column_parsers_.reserve(num_columns);

    auto idx_beg = tbb::counting_iterator<std::size_t>(0);
    auto idx_end = tbb::counting_iterator<std::size_t>(num_columns);

    auto name_beg = column_names_.begin();
    auto name_end = column_names_.end();

    auto type_beg = column_types_.begin();
    auto type_end = column_types_.end();

    auto col_beg = tbb::make_zip_iterator(idx_beg, name_beg, type_beg);
    auto col_end = tbb::make_zip_iterator(idx_end, name_end, type_end);

    std::unordered_map<std::string, std::size_t> name_counts{};

    for (auto col_pos = col_beg; col_pos < col_end; ++col_pos) {
        std::string name = std::get<1>(*col_pos);

        if (should_skip(std::get<0>(*col_pos), name)) {
            column_ignores_.emplace_back(1);
            column_parsers_.emplace_back(nullptr);

            continue;
        }

        Data_type dt = std::get<2>(*col_pos);

        column_ignores_.emplace_back(0);
        column_parsers_.emplace_back(make_parser(dt, params_.parser_options));

        if (params_.dedupe_column_names) {
            // Keep count of column names. If the key already exists,
            // create a new name by appending an underscore plus count.
            // Since this new name might also exist, iterate until we
            // can insert the new name.
            auto [pos, inserted] = name_counts.try_emplace(name, 0);
            while (!inserted) {
                name.append("_").append(fmt::to_string(pos->second++));
                std::tie(pos, inserted) = name_counts.try_emplace(name, 0);
            }
            pos->second++;
        }

        attrs.emplace_back(std::move(name), dt, Size_vector{batch_size, 1});
    }

    try {
        return make_intrusive<Schema>(attrs);
    }
    catch (const std::invalid_argument &) {
        std::unordered_set<std::string_view> tmp{};
        for (auto &attr : attrs) {
            if (auto pr = tmp.emplace(attr.name()); !pr.second) {
                throw Schema_error{fmt::format(
                    "The dataset contains more than one column with the name '{0}'.", *pr.first)};
            }
        }

        throw;
    }
}