fn parse()

in arrow-csv/src/reader/mod.rs [618:811]


fn parse(
    rows: &StringRecords<'_>,
    fields: &Fields,
    metadata: Option<std::collections::HashMap<String, String>>,
    projection: Option<&Vec<usize>>,
    line_number: usize,
) -> Result<RecordBatch, ArrowError> {
    let projection: Vec<usize> = match projection {
        Some(v) => v.clone(),
        None => fields.iter().enumerate().map(|(i, _)| i).collect(),
    };

    let arrays: Result<Vec<ArrayRef>, _> = projection
        .iter()
        .map(|i| {
            let i = *i;
            let field = &fields[i];
            match field.data_type() {
                DataType::Boolean => build_boolean_array(line_number, rows, i),
                DataType::Decimal128(precision, scale) => {
                    build_decimal_array::<Decimal128Type>(
                        line_number,
                        rows,
                        i,
                        *precision,
                        *scale,
                    )
                }
                DataType::Decimal256(precision, scale) => {
                    build_decimal_array::<Decimal256Type>(
                        line_number,
                        rows,
                        i,
                        *precision,
                        *scale,
                    )
                }
                DataType::Int8 => build_primitive_array::<Int8Type>(line_number, rows, i),
                DataType::Int16 => {
                    build_primitive_array::<Int16Type>(line_number, rows, i)
                }
                DataType::Int32 => {
                    build_primitive_array::<Int32Type>(line_number, rows, i)
                }
                DataType::Int64 => {
                    build_primitive_array::<Int64Type>(line_number, rows, i)
                }
                DataType::UInt8 => {
                    build_primitive_array::<UInt8Type>(line_number, rows, i)
                }
                DataType::UInt16 => {
                    build_primitive_array::<UInt16Type>(line_number, rows, i)
                }
                DataType::UInt32 => {
                    build_primitive_array::<UInt32Type>(line_number, rows, i)
                }
                DataType::UInt64 => {
                    build_primitive_array::<UInt64Type>(line_number, rows, i)
                }
                DataType::Float32 => {
                    build_primitive_array::<Float32Type>(line_number, rows, i)
                }
                DataType::Float64 => {
                    build_primitive_array::<Float64Type>(line_number, rows, i)
                }
                DataType::Date32 => {
                    build_primitive_array::<Date32Type>(line_number, rows, i)
                }
                DataType::Date64 => {
                    build_primitive_array::<Date64Type>(line_number, rows, i)
                }
                DataType::Time32(TimeUnit::Second) => {
                    build_primitive_array::<Time32SecondType>(line_number, rows, i)
                }
                DataType::Time32(TimeUnit::Millisecond) => {
                    build_primitive_array::<Time32MillisecondType>(line_number, rows, i)
                }
                DataType::Time64(TimeUnit::Microsecond) => {
                    build_primitive_array::<Time64MicrosecondType>(line_number, rows, i)
                }
                DataType::Time64(TimeUnit::Nanosecond) => {
                    build_primitive_array::<Time64NanosecondType>(line_number, rows, i)
                }
                DataType::Timestamp(TimeUnit::Second, tz) => {
                    build_timestamp_array::<TimestampSecondType>(
                        line_number,
                        rows,
                        i,
                        tz.as_deref(),
                    )
                }
                DataType::Timestamp(TimeUnit::Millisecond, tz) => {
                    build_timestamp_array::<TimestampMillisecondType>(
                        line_number,
                        rows,
                        i,
                        tz.as_deref(),
                    )
                }
                DataType::Timestamp(TimeUnit::Microsecond, tz) => {
                    build_timestamp_array::<TimestampMicrosecondType>(
                        line_number,
                        rows,
                        i,
                        tz.as_deref(),
                    )
                }
                DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
                    build_timestamp_array::<TimestampNanosecondType>(
                        line_number,
                        rows,
                        i,
                        tz.as_deref(),
                    )
                }
                DataType::Utf8 => Ok(Arc::new(
                    rows.iter()
                        .map(|row| Some(row.get(i)))
                        .collect::<StringArray>(),
                ) as ArrayRef),
                DataType::Dictionary(key_type, value_type)
                    if value_type.as_ref() == &DataType::Utf8 =>
                {
                    match key_type.as_ref() {
                        DataType::Int8 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<Int8Type>>(),
                        ) as ArrayRef),
                        DataType::Int16 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<Int16Type>>(),
                        ) as ArrayRef),
                        DataType::Int32 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<Int32Type>>(),
                        ) as ArrayRef),
                        DataType::Int64 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<Int64Type>>(),
                        ) as ArrayRef),
                        DataType::UInt8 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<UInt8Type>>(),
                        ) as ArrayRef),
                        DataType::UInt16 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<UInt16Type>>(),
                        ) as ArrayRef),
                        DataType::UInt32 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<UInt32Type>>(),
                        ) as ArrayRef),
                        DataType::UInt64 => Ok(Arc::new(
                            rows.iter()
                                .map(|row| row.get(i))
                                .collect::<DictionaryArray<UInt64Type>>(),
                        ) as ArrayRef),
                        _ => Err(ArrowError::ParseError(format!(
                            "Unsupported dictionary key type {key_type:?}"
                        ))),
                    }
                }
                other => Err(ArrowError::ParseError(format!(
                    "Unsupported data type {other:?}"
                ))),
            }
        })
        .collect();

    let projected_fields: Fields =
        projection.iter().map(|i| fields[*i].clone()).collect();

    let projected_schema = Arc::new(match metadata {
        None => Schema::new(projected_fields),
        Some(metadata) => Schema::new_with_metadata(projected_fields, metadata),
    });

    arrays.and_then(|arr| {
        RecordBatch::try_new_with_options(
            projected_schema,
            arr,
            &RecordBatchOptions::new()
                .with_match_field_names(true)
                .with_row_count(Some(rows.len())),
        )
    })
}