in connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala [66:235]
private def structFieldFor(
fd: FieldDescriptor,
existingRecordNames: Map[String, Int],
protobufOptions: ProtobufOptions): Option[StructField] = {
import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._
val dataType = fd.getJavaType match {
// When the protobuf type is unsigned and upcastUnsignedIntegers has been set,
// use a larger type (LongType and Decimal(20,0) for uint32 and uint64).
case INT =>
if (fd.getLiteType == WireFormat.FieldType.UINT32 && protobufOptions.upcastUnsignedInts) {
Some(LongType)
} else {
Some(IntegerType)
}
case LONG => if (fd.getLiteType == WireFormat.FieldType.UINT64
&& protobufOptions.upcastUnsignedInts) {
Some(DecimalType.LongDecimal)
} else {
Some(LongType)
}
case FLOAT => Some(FloatType)
case DOUBLE => Some(DoubleType)
case BOOLEAN => Some(BooleanType)
case STRING => Some(StringType)
case BYTE_STRING => Some(BinaryType)
case ENUM => if (protobufOptions.enumsAsInts) Some(IntegerType) else Some(StringType)
case MESSAGE
if (fd.getMessageType.getName == "Duration" &&
fd.getMessageType.getFields.size() == 2 &&
fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
Some(DayTimeIntervalType.defaultConcreteType)
case MESSAGE
if (fd.getMessageType.getName == "Timestamp" &&
fd.getMessageType.getFields.size() == 2 &&
fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
Some(TimestampType)
case MESSAGE if protobufOptions.convertAnyFieldsToJson &&
fd.getMessageType.getFullName == "google.protobuf.Any" =>
Some(StringType) // Any protobuf will be parsed and converted to json string.
// Unwrap well known primitive wrapper types if the option has been set.
case MESSAGE if fd.getMessageType.getFullName == BoolValue.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(BooleanType)
case MESSAGE if fd.getMessageType.getFullName == Int32Value.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(IntegerType)
case MESSAGE if fd.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
if (protobufOptions.upcastUnsignedInts) {
Some(LongType)
} else {
Some(IntegerType)
}
case MESSAGE if fd.getMessageType.getFullName == Int64Value.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(LongType)
case MESSAGE if fd.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
if (protobufOptions.upcastUnsignedInts) {
Some(DecimalType.LongDecimal)
} else {
Some(LongType)
}
case MESSAGE if fd.getMessageType.getFullName == StringValue.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(StringType)
case MESSAGE if fd.getMessageType.getFullName == BytesValue.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(BinaryType)
case MESSAGE if fd.getMessageType.getFullName == FloatValue.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(FloatType)
case MESSAGE if fd.getMessageType.getFullName == DoubleValue.getDescriptor.getFullName
&& protobufOptions.unwrapWellKnownTypes =>
Some(DoubleType)
case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry =>
var keyType: Option[DataType] = None
var valueType: Option[DataType] = None
fd.getMessageType.getFields.forEach { field =>
field.getName match {
case "key" =>
keyType =
structFieldFor(
field,
existingRecordNames,
protobufOptions).map(_.dataType)
case "value" =>
valueType =
structFieldFor(
field,
existingRecordNames,
protobufOptions).map(_.dataType)
}
}
(keyType, valueType) match {
case (None, _) =>
// This is probably never expected. Protobuf does not allow complex types for keys.
log.info(s"Dropping map field ${fd.getFullName}. Key reached max recursive depth.")
None
case (_, None) =>
log.info(s"Dropping map field ${fd.getFullName}. Value reached max recursive depth.")
None
case (Some(kt), Some(vt)) => Some(MapType(kt, vt, valueContainsNull = false))
}
case MESSAGE =>
// If the `recursive.fields.max.depth` value is not specified, it will default to -1,
// and recursive fields are not permitted. Setting it to 1 drops all recursive fields,
// 2 allows it to be recursed once, and 3 allows it to be recursed twice and so on.
// A value less than or equal to 0 or greater than 10 is not allowed, and if a protobuf
// record has more depth for recursive fields than the allowed value, it will be truncated
// and some fields may be discarded.
// SQL Schema for protob2uf `message Person { string name = 1; Person bff = 2;}`
// will vary based on the value of "recursive.fields.max.depth".
// 1: struct<name: string>
// 2: struct<name: string, bff: struct<name: string>>
// 3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
// and so on.
// TODO(rangadi): A better way to terminate would be replace the remaining recursive struct
// with the byte array of corresponding protobuf. This way no information is lost.
// i.e. with max depth 2, the above looks like this:
// struct<name: string, bff: struct<name: string, _serialized_bff: bytes>>
val recordName = fd.getMessageType.getFullName
val recursiveDepth = existingRecordNames.getOrElse(recordName, 0)
val recursiveFieldMaxDepth = protobufOptions.recursiveFieldMaxDepth
if (existingRecordNames.contains(recordName) && (recursiveFieldMaxDepth <= 0 ||
recursiveFieldMaxDepth > 10)) {
throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
} else if (existingRecordNames.contains(recordName) &&
recursiveDepth >= recursiveFieldMaxDepth) {
// Recursive depth limit is reached. This field is dropped.
// If it is inside a container like map or array, the containing field is dropped.
log.info(
s"The field ${fd.getFullName} of type $recordName is dropped " +
s"at recursive depth $recursiveDepth"
)
None
} else {
val newRecordNames = existingRecordNames + (recordName -> (recursiveDepth + 1))
val fields = fd.getMessageType.getFields.asScala.flatMap(
structFieldFor(_, newRecordNames, protobufOptions)
).toSeq
fields match {
case Nil =>
if (protobufOptions.retainEmptyMessage) {
Some(convertEmptyProtoToStructWithDummyField(fd.getFullName))
} else {
log.info(
s"Dropping ${fd.getFullName} as it does not have any fields left " +
"likely due to recursive depth limit."
)
None
}
case fds => Some(StructType(fds))
}
}
case other =>
throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString)
}
dataType.map {
case dt: MapType => StructField(fd.getName, dt)
case dt if fd.isRepeated =>
StructField(fd.getName, ArrayType(dt, containsNull = false))
case dt => StructField(fd.getName, dt, nullable = !fd.isRequired)
}
}