in connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala [173:395]
private def newWriter(
protoType: FieldDescriptor,
catalystType: DataType,
protoPath: Seq[String],
catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {
(protoType.getJavaType, catalystType) match {
case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)
// TODO: we can avoid boxing if future version of Protobuf provide primitive accessors.
case (BOOLEAN, BooleanType) =>
(updater, ordinal, value) => updater.setBoolean(ordinal, value.asInstanceOf[Boolean])
case (INT, IntegerType) =>
(updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int])
case (INT, ByteType) =>
(updater, ordinal, value) => updater.setByte(ordinal, value.asInstanceOf[Byte])
case (INT, ShortType) =>
(updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])
case (INT, LongType) =>
(updater, ordinal, value) =>
updater.setLong(
ordinal,
Integer.toUnsignedLong(value.asInstanceOf[Int]))
case (
MESSAGE | BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)
case (LONG, LongType) =>
(updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long])
case (LONG, DecimalType.LongDecimal) =>
(updater, ordinal, value) =>
updater.setDecimal(
ordinal,
Decimal.fromString(
UTF8String.fromString(java.lang.Long.toUnsignedString(value.asInstanceOf[Long]))))
case (FLOAT, FloatType) =>
(updater, ordinal, value) => updater.setFloat(ordinal, value.asInstanceOf[Float])
case (DOUBLE, DoubleType) =>
(updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double])
case (STRING, StringType) =>
(updater, ordinal, value) =>
val str = value match {
case s: String => UTF8String.fromString(s)
}
updater.set(ordinal, str)
case (BYTE_STRING, BinaryType) =>
(updater, ordinal, value) =>
val byte_array = value match {
case s: ByteString => s.toByteArray
case unsupported =>
throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
}
updater.set(ordinal, byte_array)
case (MESSAGE, MapType(keyType, valueType, valueContainsNull)) =>
newMapWriter(protoType, protoPath, catalystPath, keyType, valueType, valueContainsNull)
case (MESSAGE, TimestampType) =>
(updater, ordinal, value) =>
val secondsField = protoType.getMessageType.getFields.get(0)
val nanoSecondsField = protoType.getMessageType.getFields.get(1)
val message = value.asInstanceOf[DynamicMessage]
val seconds = message.getField(secondsField).asInstanceOf[Long]
val nanoSeconds = message.getField(nanoSecondsField).asInstanceOf[Int]
val micros = DateTimeUtils.millisToMicros(seconds * 1000)
updater.setLong(ordinal, micros + TimeUnit.NANOSECONDS.toMicros(nanoSeconds))
case (MESSAGE, DayTimeIntervalType(startField, endField)) =>
(updater, ordinal, value) =>
val secondsField = protoType.getMessageType.getFields.get(0)
val nanoSecondsField = protoType.getMessageType.getFields.get(1)
val message = value.asInstanceOf[DynamicMessage]
val seconds = message.getField(secondsField).asInstanceOf[Long]
val nanoSeconds = message.getField(nanoSecondsField).asInstanceOf[Int]
val micros = DateTimeUtils.millisToMicros(seconds * 1000)
updater.setLong(ordinal, micros + TimeUnit.NANOSECONDS.toMicros(nanoSeconds))
case (MESSAGE, StringType)
if protoType.getMessageType.getFullName == "google.protobuf.Any" =>
(updater, ordinal, value) =>
// Convert 'Any' protobuf message to JSON string.
val jsonStr = jsonPrinter.print(value.asInstanceOf[DynamicMessage])
updater.set(ordinal, UTF8String.fromString(jsonStr))
// Handle well known wrapper types. We unpack the value field when the desired
// output type is a primitive (determined by the option in [[ProtobufOptions]])
case (MESSAGE, BooleanType)
if protoType.getMessageType.getFullName == BoolValue.getDescriptor.getFullName =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setBoolean(ordinal, unwrapped.asInstanceOf[Boolean])
}
case (MESSAGE, IntegerType)
if (protoType.getMessageType.getFullName == Int32Value.getDescriptor.getFullName
|| protoType.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName) =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setInt(ordinal, unwrapped.asInstanceOf[Int])
}
case (MESSAGE, LongType)
if (protoType.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName) =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setLong(ordinal, Integer.toUnsignedLong(unwrapped.asInstanceOf[Int]))
}
case (MESSAGE, LongType)
if (protoType.getMessageType.getFullName == Int64Value.getDescriptor.getFullName
|| protoType.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName) =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setLong(ordinal, unwrapped.asInstanceOf[Long])
}
case (MESSAGE, DecimalType.LongDecimal)
if (protoType.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName) =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
val dec = Decimal.fromString(
UTF8String.fromString(java.lang.Long.toUnsignedString(unwrapped.asInstanceOf[Long])))
updater.setDecimal(ordinal, dec)
}
case (MESSAGE, StringType)
if protoType.getMessageType.getFullName == StringValue.getDescriptor.getFullName =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.set(ordinal, UTF8String.fromString(unwrapped.asInstanceOf[String]))
}
case (MESSAGE, BinaryType)
if protoType.getMessageType.getFullName == BytesValue.getDescriptor.getFullName =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.set(ordinal, unwrapped.asInstanceOf[ByteString].toByteArray)
}
case (MESSAGE, FloatType)
if protoType.getMessageType.getFullName == FloatValue.getDescriptor.getFullName =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setFloat(ordinal, unwrapped.asInstanceOf[Float])
}
case (MESSAGE, DoubleType)
if protoType.getMessageType.getFullName == DoubleValue.getDescriptor.getFullName =>
(updater, ordinal, value) =>
val dm = value.asInstanceOf[DynamicMessage]
val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
if (unwrapped == null) {
updater.setNullAt(ordinal)
} else {
updater.setDouble(ordinal, unwrapped.asInstanceOf[Double])
}
case (MESSAGE, st: StructType) =>
val writeRecord = getRecordWriter(
protoType.getMessageType,
st,
protoPath,
catalystPath,
applyFilters = _ => false)
(updater, ordinal, value) =>
val row = new SpecificInternalRow(st)
writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
updater.set(ordinal, row)
case (ENUM, StringType) =>
(updater, ordinal, value) =>
updater.set(
ordinal,
UTF8String.fromString(value.asInstanceOf[EnumValueDescriptor].getName))
case (ENUM, IntegerType) =>
(updater, ordinal, value) =>
updater.setInt(ordinal, value.asInstanceOf[EnumValueDescriptor].getNumber)
case _ =>
throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
toFieldStr(protoPath),
catalystPath,
s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
s" ${protoType.getType}",
catalystType)
}
}