in datafusion/proto-common/src/from_proto/mod.rs [347:604]
fn try_from(
scalar: &protobuf::ScalarValue,
) -> datafusion_common::Result<Self, Self::Error> {
use protobuf::scalar_value::Value;
let value = scalar
.value
.as_ref()
.ok_or_else(|| Error::required("value"))?;
Ok(match value {
Value::BoolValue(v) => Self::Boolean(Some(*v)),
Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
Value::Int32Value(v) => Self::Int32(Some(*v)),
Value::Int64Value(v) => Self::Int64(Some(*v)),
Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
Value::Uint32Value(v) => Self::UInt32(Some(*v)),
Value::Uint64Value(v) => Self::UInt64(Some(*v)),
Value::Float32Value(v) => Self::Float32(Some(*v)),
Value::Float64Value(v) => Self::Float64(Some(*v)),
Value::Date32Value(v) => Self::Date32(Some(*v)),
// ScalarValue::List is serialized using arrow IPC format
Value::ListValue(v)
| Value::FixedSizeListValue(v)
| Value::LargeListValue(v)
| Value::StructValue(v)
| Value::MapValue(v) => {
let protobuf::ScalarNestedValue {
ipc_message,
arrow_data,
dictionaries,
schema,
} = &v;
let schema: Schema = if let Some(schema_ref) = schema {
schema_ref.try_into()?
} else {
return Err(Error::General(
"Invalid schema while deserializing ScalarValue::List"
.to_string(),
));
};
let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
Error::General(format!(
"Error IPC message while deserializing ScalarValue::List: {e}"
))
})?;
let buffer = Buffer::from(arrow_data.as_slice());
let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
Error::General(
"Unexpected message type deserializing ScalarValue::List"
.to_string(),
)
})?;
let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
Error::General(format!(
"Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
))
})?;
let buffer = Buffer::from(arrow_data.as_slice());
let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
Error::General(
"Unexpected message type deserializing ScalarValue::List dictionary message"
.to_string(),
)
})?;
let id = dict_batch.id();
let record_batch = read_record_batch(
&buffer,
dict_batch.data().unwrap(),
Arc::new(schema.clone()),
&Default::default(),
None,
&message.version(),
)?;
let values: ArrayRef = Arc::clone(record_batch.column(0));
Ok((id, values))
}).collect::<datafusion_common::Result<HashMap<_, _>>>()?;
let record_batch = read_record_batch(
&buffer,
ipc_batch,
Arc::new(schema),
&dict_by_id,
None,
&message.version(),
)
.map_err(|e| arrow_datafusion_err!(e))
.map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
let arr = record_batch.column(0);
match value {
Value::ListValue(_) => {
Self::List(arr.as_list::<i32>().to_owned().into())
}
Value::LargeListValue(_) => {
Self::LargeList(arr.as_list::<i64>().to_owned().into())
}
Value::FixedSizeListValue(_) => {
Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
}
Value::StructValue(_) => {
Self::Struct(arr.as_struct().to_owned().into())
}
Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
_ => unreachable!(),
}
}
Value::NullValue(v) => {
let null_type: DataType = v.try_into()?;
null_type.try_into().map_err(Error::DataFusionError)?
}
Value::Decimal128Value(val) => {
let array = vec_to_array(val.value.clone());
Self::Decimal128(
Some(i128::from_be_bytes(array)),
val.p as u8,
val.s as i8,
)
}
Value::Decimal256Value(val) => {
let array = vec_to_array(val.value.clone());
Self::Decimal256(
Some(i256::from_be_bytes(array)),
val.p as u8,
val.s as i8,
)
}
Value::Date64Value(v) => Self::Date64(Some(*v)),
Value::Time32Value(v) => {
let time_value =
v.value.as_ref().ok_or_else(|| Error::required("value"))?;
match time_value {
protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
Self::Time32Second(Some(*t))
}
protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
Self::Time32Millisecond(Some(*t))
}
}
}
Value::Time64Value(v) => {
let time_value =
v.value.as_ref().ok_or_else(|| Error::required("value"))?;
match time_value {
protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
Self::Time64Microsecond(Some(*t))
}
protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
Self::Time64Nanosecond(Some(*t))
}
}
}
Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
Value::TimestampValue(v) => {
let timezone = if v.timezone.is_empty() {
None
} else {
Some(v.timezone.as_str().into())
};
let ts_value =
v.value.as_ref().ok_or_else(|| Error::required("value"))?;
match ts_value {
protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
Self::TimestampMicrosecond(Some(*t), timezone)
}
protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
Self::TimestampNanosecond(Some(*t), timezone)
}
protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
Self::TimestampSecond(Some(*t), timezone)
}
protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
Self::TimestampMillisecond(Some(*t), timezone)
}
}
}
Value::DictionaryValue(v) => {
let index_type: DataType = v
.index_type
.as_ref()
.ok_or_else(|| Error::required("index_type"))?
.try_into()?;
let value: Self = v
.value
.as_ref()
.ok_or_else(|| Error::required("value"))?
.as_ref()
.try_into()?;
Self::Dictionary(Box::new(index_type), Box::new(value))
}
Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
IntervalDayTimeType::make_value(v.days, v.milliseconds),
)),
Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
)),
Value::UnionValue(val) => {
let mode = match val.mode {
0 => UnionMode::Sparse,
1 => UnionMode::Dense,
id => Err(Error::unknown("UnionMode", id))?,
};
let ids = val
.fields
.iter()
.map(|f| f.field_id as i8)
.collect::<Vec<_>>();
let fields = val
.fields
.iter()
.map(|f| f.field.clone())
.collect::<Option<Vec<_>>>();
let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
let fields = parse_proto_fields_to_fields(&fields)?;
let fields = UnionFields::new(ids, fields);
let v_id = val.value_id as i8;
let val = match &val.value {
None => None,
Some(val) => {
let val: ScalarValue = val
.as_ref()
.try_into()
.map_err(|_| Error::General("Invalid Scalar".to_string()))?;
Some((v_id, Box::new(val)))
}
};
Self::Union(val, fields, mode)
}
Value::FixedSizeBinaryValue(v) => {
Self::FixedSizeBinary(v.length, Some(v.clone().values))
}
})
}