in arrow-csv/src/reader/mod.rs [618:811]
fn parse(
rows: &StringRecords<'_>,
fields: &Fields,
metadata: Option<std::collections::HashMap<String, String>>,
projection: Option<&Vec<usize>>,
line_number: usize,
) -> Result<RecordBatch, ArrowError> {
let projection: Vec<usize> = match projection {
Some(v) => v.clone(),
None => fields.iter().enumerate().map(|(i, _)| i).collect(),
};
let arrays: Result<Vec<ArrayRef>, _> = projection
.iter()
.map(|i| {
let i = *i;
let field = &fields[i];
match field.data_type() {
DataType::Boolean => build_boolean_array(line_number, rows, i),
DataType::Decimal128(precision, scale) => {
build_decimal_array::<Decimal128Type>(
line_number,
rows,
i,
*precision,
*scale,
)
}
DataType::Decimal256(precision, scale) => {
build_decimal_array::<Decimal256Type>(
line_number,
rows,
i,
*precision,
*scale,
)
}
DataType::Int8 => build_primitive_array::<Int8Type>(line_number, rows, i),
DataType::Int16 => {
build_primitive_array::<Int16Type>(line_number, rows, i)
}
DataType::Int32 => {
build_primitive_array::<Int32Type>(line_number, rows, i)
}
DataType::Int64 => {
build_primitive_array::<Int64Type>(line_number, rows, i)
}
DataType::UInt8 => {
build_primitive_array::<UInt8Type>(line_number, rows, i)
}
DataType::UInt16 => {
build_primitive_array::<UInt16Type>(line_number, rows, i)
}
DataType::UInt32 => {
build_primitive_array::<UInt32Type>(line_number, rows, i)
}
DataType::UInt64 => {
build_primitive_array::<UInt64Type>(line_number, rows, i)
}
DataType::Float32 => {
build_primitive_array::<Float32Type>(line_number, rows, i)
}
DataType::Float64 => {
build_primitive_array::<Float64Type>(line_number, rows, i)
}
DataType::Date32 => {
build_primitive_array::<Date32Type>(line_number, rows, i)
}
DataType::Date64 => {
build_primitive_array::<Date64Type>(line_number, rows, i)
}
DataType::Time32(TimeUnit::Second) => {
build_primitive_array::<Time32SecondType>(line_number, rows, i)
}
DataType::Time32(TimeUnit::Millisecond) => {
build_primitive_array::<Time32MillisecondType>(line_number, rows, i)
}
DataType::Time64(TimeUnit::Microsecond) => {
build_primitive_array::<Time64MicrosecondType>(line_number, rows, i)
}
DataType::Time64(TimeUnit::Nanosecond) => {
build_primitive_array::<Time64NanosecondType>(line_number, rows, i)
}
DataType::Timestamp(TimeUnit::Second, tz) => {
build_timestamp_array::<TimestampSecondType>(
line_number,
rows,
i,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Millisecond, tz) => {
build_timestamp_array::<TimestampMillisecondType>(
line_number,
rows,
i,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Microsecond, tz) => {
build_timestamp_array::<TimestampMicrosecondType>(
line_number,
rows,
i,
tz.as_deref(),
)
}
DataType::Timestamp(TimeUnit::Nanosecond, tz) => {
build_timestamp_array::<TimestampNanosecondType>(
line_number,
rows,
i,
tz.as_deref(),
)
}
DataType::Utf8 => Ok(Arc::new(
rows.iter()
.map(|row| Some(row.get(i)))
.collect::<StringArray>(),
) as ArrayRef),
DataType::Dictionary(key_type, value_type)
if value_type.as_ref() == &DataType::Utf8 =>
{
match key_type.as_ref() {
DataType::Int8 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<Int8Type>>(),
) as ArrayRef),
DataType::Int16 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<Int16Type>>(),
) as ArrayRef),
DataType::Int32 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<Int32Type>>(),
) as ArrayRef),
DataType::Int64 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<Int64Type>>(),
) as ArrayRef),
DataType::UInt8 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<UInt8Type>>(),
) as ArrayRef),
DataType::UInt16 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<UInt16Type>>(),
) as ArrayRef),
DataType::UInt32 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<UInt32Type>>(),
) as ArrayRef),
DataType::UInt64 => Ok(Arc::new(
rows.iter()
.map(|row| row.get(i))
.collect::<DictionaryArray<UInt64Type>>(),
) as ArrayRef),
_ => Err(ArrowError::ParseError(format!(
"Unsupported dictionary key type {key_type:?}"
))),
}
}
other => Err(ArrowError::ParseError(format!(
"Unsupported data type {other:?}"
))),
}
})
.collect();
let projected_fields: Fields =
projection.iter().map(|i| fields[*i].clone()).collect();
let projected_schema = Arc::new(match metadata {
None => Schema::new(projected_fields),
Some(metadata) => Schema::new_with_metadata(projected_fields, metadata),
});
arrays.and_then(|arr| {
RecordBatch::try_new_with_options(
projected_schema,
arr,
&RecordBatchOptions::new()
.with_match_field_names(true)
.with_row_count(Some(rows.len())),
)
})
}