fn try_from()

in datafusion/proto-common/src/from_proto/mod.rs [347:604]


    fn try_from(
        scalar: &protobuf::ScalarValue,
    ) -> datafusion_common::Result<Self, Self::Error> {
        use protobuf::scalar_value::Value;

        let value = scalar
            .value
            .as_ref()
            .ok_or_else(|| Error::required("value"))?;

        Ok(match value {
            Value::BoolValue(v) => Self::Boolean(Some(*v)),
            Value::Utf8Value(v) => Self::Utf8(Some(v.to_owned())),
            Value::Utf8ViewValue(v) => Self::Utf8View(Some(v.to_owned())),
            Value::LargeUtf8Value(v) => Self::LargeUtf8(Some(v.to_owned())),
            Value::Int8Value(v) => Self::Int8(Some(*v as i8)),
            Value::Int16Value(v) => Self::Int16(Some(*v as i16)),
            Value::Int32Value(v) => Self::Int32(Some(*v)),
            Value::Int64Value(v) => Self::Int64(Some(*v)),
            Value::Uint8Value(v) => Self::UInt8(Some(*v as u8)),
            Value::Uint16Value(v) => Self::UInt16(Some(*v as u16)),
            Value::Uint32Value(v) => Self::UInt32(Some(*v)),
            Value::Uint64Value(v) => Self::UInt64(Some(*v)),
            Value::Float32Value(v) => Self::Float32(Some(*v)),
            Value::Float64Value(v) => Self::Float64(Some(*v)),
            Value::Date32Value(v) => Self::Date32(Some(*v)),
            // ScalarValue::List is serialized using arrow IPC format
            Value::ListValue(v)
            | Value::FixedSizeListValue(v)
            | Value::LargeListValue(v)
            | Value::StructValue(v)
            | Value::MapValue(v) => {
                let protobuf::ScalarNestedValue {
                    ipc_message,
                    arrow_data,
                    dictionaries,
                    schema,
                } = &v;

                let schema: Schema = if let Some(schema_ref) = schema {
                    schema_ref.try_into()?
                } else {
                    return Err(Error::General(
                        "Invalid schema while deserializing ScalarValue::List"
                            .to_string(),
                    ));
                };

                let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
                    Error::General(format!(
                        "Error IPC message while deserializing ScalarValue::List: {e}"
                    ))
                })?;
                let buffer = Buffer::from(arrow_data.as_slice());

                let ipc_batch = message.header_as_record_batch().ok_or_else(|| {
                    Error::General(
                        "Unexpected message type deserializing ScalarValue::List"
                            .to_string(),
                    )
                })?;

                let dict_by_id: HashMap<i64,ArrayRef> = dictionaries.iter().map(|protobuf::scalar_nested_value::Dictionary { ipc_message, arrow_data }| {
                    let message = root_as_message(ipc_message.as_slice()).map_err(|e| {
                        Error::General(format!(
                            "Error IPC message while deserializing ScalarValue::List dictionary message: {e}"
                        ))
                    })?;
                    let buffer = Buffer::from(arrow_data.as_slice());

                    let dict_batch = message.header_as_dictionary_batch().ok_or_else(|| {
                        Error::General(
                            "Unexpected message type deserializing ScalarValue::List dictionary message"
                                .to_string(),
                        )
                    })?;

                    let id = dict_batch.id();

                    let record_batch = read_record_batch(
                        &buffer,
                        dict_batch.data().unwrap(),
                        Arc::new(schema.clone()),
                        &Default::default(),
                        None,
                        &message.version(),
                    )?;

                    let values: ArrayRef = Arc::clone(record_batch.column(0));

                    Ok((id, values))
                }).collect::<datafusion_common::Result<HashMap<_, _>>>()?;

                let record_batch = read_record_batch(
                    &buffer,
                    ipc_batch,
                    Arc::new(schema),
                    &dict_by_id,
                    None,
                    &message.version(),
                )
                .map_err(|e| arrow_datafusion_err!(e))
                .map_err(|e| e.context("Decoding ScalarValue::List Value"))?;
                let arr = record_batch.column(0);
                match value {
                    Value::ListValue(_) => {
                        Self::List(arr.as_list::<i32>().to_owned().into())
                    }
                    Value::LargeListValue(_) => {
                        Self::LargeList(arr.as_list::<i64>().to_owned().into())
                    }
                    Value::FixedSizeListValue(_) => {
                        Self::FixedSizeList(arr.as_fixed_size_list().to_owned().into())
                    }
                    Value::StructValue(_) => {
                        Self::Struct(arr.as_struct().to_owned().into())
                    }
                    Value::MapValue(_) => Self::Map(arr.as_map().to_owned().into()),
                    _ => unreachable!(),
                }
            }
            Value::NullValue(v) => {
                let null_type: DataType = v.try_into()?;
                null_type.try_into().map_err(Error::DataFusionError)?
            }
            Value::Decimal128Value(val) => {
                let array = vec_to_array(val.value.clone());
                Self::Decimal128(
                    Some(i128::from_be_bytes(array)),
                    val.p as u8,
                    val.s as i8,
                )
            }
            Value::Decimal256Value(val) => {
                let array = vec_to_array(val.value.clone());
                Self::Decimal256(
                    Some(i256::from_be_bytes(array)),
                    val.p as u8,
                    val.s as i8,
                )
            }
            Value::Date64Value(v) => Self::Date64(Some(*v)),
            Value::Time32Value(v) => {
                let time_value =
                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
                match time_value {
                    protobuf::scalar_time32_value::Value::Time32SecondValue(t) => {
                        Self::Time32Second(Some(*t))
                    }
                    protobuf::scalar_time32_value::Value::Time32MillisecondValue(t) => {
                        Self::Time32Millisecond(Some(*t))
                    }
                }
            }
            Value::Time64Value(v) => {
                let time_value =
                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;
                match time_value {
                    protobuf::scalar_time64_value::Value::Time64MicrosecondValue(t) => {
                        Self::Time64Microsecond(Some(*t))
                    }
                    protobuf::scalar_time64_value::Value::Time64NanosecondValue(t) => {
                        Self::Time64Nanosecond(Some(*t))
                    }
                }
            }
            Value::IntervalYearmonthValue(v) => Self::IntervalYearMonth(Some(*v)),
            Value::DurationSecondValue(v) => Self::DurationSecond(Some(*v)),
            Value::DurationMillisecondValue(v) => Self::DurationMillisecond(Some(*v)),
            Value::DurationMicrosecondValue(v) => Self::DurationMicrosecond(Some(*v)),
            Value::DurationNanosecondValue(v) => Self::DurationNanosecond(Some(*v)),
            Value::TimestampValue(v) => {
                let timezone = if v.timezone.is_empty() {
                    None
                } else {
                    Some(v.timezone.as_str().into())
                };

                let ts_value =
                    v.value.as_ref().ok_or_else(|| Error::required("value"))?;

                match ts_value {
                    protobuf::scalar_timestamp_value::Value::TimeMicrosecondValue(t) => {
                        Self::TimestampMicrosecond(Some(*t), timezone)
                    }
                    protobuf::scalar_timestamp_value::Value::TimeNanosecondValue(t) => {
                        Self::TimestampNanosecond(Some(*t), timezone)
                    }
                    protobuf::scalar_timestamp_value::Value::TimeSecondValue(t) => {
                        Self::TimestampSecond(Some(*t), timezone)
                    }
                    protobuf::scalar_timestamp_value::Value::TimeMillisecondValue(t) => {
                        Self::TimestampMillisecond(Some(*t), timezone)
                    }
                }
            }
            Value::DictionaryValue(v) => {
                let index_type: DataType = v
                    .index_type
                    .as_ref()
                    .ok_or_else(|| Error::required("index_type"))?
                    .try_into()?;

                let value: Self = v
                    .value
                    .as_ref()
                    .ok_or_else(|| Error::required("value"))?
                    .as_ref()
                    .try_into()?;

                Self::Dictionary(Box::new(index_type), Box::new(value))
            }
            Value::BinaryValue(v) => Self::Binary(Some(v.clone())),
            Value::BinaryViewValue(v) => Self::BinaryView(Some(v.clone())),
            Value::LargeBinaryValue(v) => Self::LargeBinary(Some(v.clone())),
            Value::IntervalDaytimeValue(v) => Self::IntervalDayTime(Some(
                IntervalDayTimeType::make_value(v.days, v.milliseconds),
            )),
            Value::IntervalMonthDayNano(v) => Self::IntervalMonthDayNano(Some(
                IntervalMonthDayNanoType::make_value(v.months, v.days, v.nanos),
            )),
            Value::UnionValue(val) => {
                let mode = match val.mode {
                    0 => UnionMode::Sparse,
                    1 => UnionMode::Dense,
                    id => Err(Error::unknown("UnionMode", id))?,
                };
                let ids = val
                    .fields
                    .iter()
                    .map(|f| f.field_id as i8)
                    .collect::<Vec<_>>();
                let fields = val
                    .fields
                    .iter()
                    .map(|f| f.field.clone())
                    .collect::<Option<Vec<_>>>();
                let fields = fields.ok_or_else(|| Error::required("UnionField"))?;
                let fields = parse_proto_fields_to_fields(&fields)?;
                let fields = UnionFields::new(ids, fields);
                let v_id = val.value_id as i8;
                let val = match &val.value {
                    None => None,
                    Some(val) => {
                        let val: ScalarValue = val
                            .as_ref()
                            .try_into()
                            .map_err(|_| Error::General("Invalid Scalar".to_string()))?;
                        Some((v_id, Box::new(val)))
                    }
                };
                Self::Union(val, fields, mode)
            }
            Value::FixedSizeBinaryValue(v) => {
                Self::FixedSizeBinary(v.length, Some(v.clone().values))
            }
        })
    }