private def structFieldFor()

in connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/utils/SchemaConverters.scala [66:235]


  private def structFieldFor(
      fd: FieldDescriptor,
      existingRecordNames: Map[String, Int],
      protobufOptions: ProtobufOptions): Option[StructField] = {
    import com.google.protobuf.Descriptors.FieldDescriptor.JavaType._

    val dataType = fd.getJavaType match {
      // When the protobuf type is unsigned and upcastUnsignedIntegers has been set,
      // use a larger type (LongType and Decimal(20,0) for uint32 and uint64).
      case INT =>
        if (fd.getLiteType == WireFormat.FieldType.UINT32 && protobufOptions.upcastUnsignedInts) {
          Some(LongType)
        } else {
          Some(IntegerType)
        }
      case LONG => if (fd.getLiteType == WireFormat.FieldType.UINT64
          && protobufOptions.upcastUnsignedInts) {
        Some(DecimalType.LongDecimal)
      } else {
        Some(LongType)
      }
      case FLOAT => Some(FloatType)
      case DOUBLE => Some(DoubleType)
      case BOOLEAN => Some(BooleanType)
      case STRING => Some(StringType)
      case BYTE_STRING => Some(BinaryType)
      case ENUM => if (protobufOptions.enumsAsInts) Some(IntegerType) else Some(StringType)
      case MESSAGE
        if (fd.getMessageType.getName == "Duration" &&
          fd.getMessageType.getFields.size() == 2 &&
          fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
          fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
        Some(DayTimeIntervalType.defaultConcreteType)
      case MESSAGE
        if (fd.getMessageType.getName == "Timestamp" &&
          fd.getMessageType.getFields.size() == 2 &&
          fd.getMessageType.getFields.get(0).getName.equals("seconds") &&
          fd.getMessageType.getFields.get(1).getName.equals("nanos")) =>
        Some(TimestampType)
      case MESSAGE if protobufOptions.convertAnyFieldsToJson &&
        fd.getMessageType.getFullName == "google.protobuf.Any" =>
        Some(StringType) // Any protobuf will be parsed and converted to json string.

      // Unwrap well known primitive wrapper types if the option has been set.
      case MESSAGE if fd.getMessageType.getFullName == BoolValue.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(BooleanType)
      case MESSAGE if fd.getMessageType.getFullName == Int32Value.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(IntegerType)
      case MESSAGE if fd.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        if (protobufOptions.upcastUnsignedInts) {
          Some(LongType)
        } else {
          Some(IntegerType)
        }
      case MESSAGE if fd.getMessageType.getFullName == Int64Value.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(LongType)
      case MESSAGE if fd.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        if (protobufOptions.upcastUnsignedInts) {
          Some(DecimalType.LongDecimal)
        } else {
          Some(LongType)
        }
      case MESSAGE if fd.getMessageType.getFullName == StringValue.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(StringType)
      case MESSAGE if fd.getMessageType.getFullName == BytesValue.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(BinaryType)
      case MESSAGE if fd.getMessageType.getFullName == FloatValue.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(FloatType)
      case MESSAGE if fd.getMessageType.getFullName == DoubleValue.getDescriptor.getFullName
        && protobufOptions.unwrapWellKnownTypes =>
        Some(DoubleType)

      case MESSAGE if fd.isRepeated && fd.getMessageType.getOptions.hasMapEntry =>
        var keyType: Option[DataType] = None
        var valueType: Option[DataType] = None
        fd.getMessageType.getFields.forEach { field =>
          field.getName match {
            case "key" =>
              keyType =
                structFieldFor(
                  field,
                  existingRecordNames,
                  protobufOptions).map(_.dataType)
            case "value" =>
              valueType =
                structFieldFor(
                  field,
                  existingRecordNames,
                  protobufOptions).map(_.dataType)
          }
        }
        (keyType, valueType) match {
          case (None, _) =>
            // This is probably never expected. Protobuf does not allow complex types for keys.
            log.info(s"Dropping map field ${fd.getFullName}. Key reached max recursive depth.")
            None
          case (_, None) =>
            log.info(s"Dropping map field ${fd.getFullName}. Value reached max recursive depth.")
            None
          case (Some(kt), Some(vt)) => Some(MapType(kt, vt, valueContainsNull = false))
        }
      case MESSAGE =>
        // If the `recursive.fields.max.depth` value is not specified, it will default to -1,
        // and recursive fields are not permitted. Setting it to 1 drops all recursive fields,
        // 2 allows it to be recursed once, and 3 allows it to be recursed twice and so on.
        // A value less than or equal to 0 or greater than 10 is not allowed, and if a protobuf
        // record has more depth for recursive fields than the allowed value, it will be truncated
        // and some fields may be discarded.
        // SQL Schema for protob2uf `message Person { string name = 1; Person bff = 2;}`
        // will vary based on the value of "recursive.fields.max.depth".
        // 1: struct<name: string>
        // 2: struct<name: string, bff: struct<name: string>>
        // 3: struct<name: string, bff: struct<name: string, bff: struct<name: string>>>
        // and so on.
        // TODO(rangadi): A better way to terminate would be replace the remaining recursive struct
        //      with the byte array of corresponding protobuf. This way no information is lost.
        //      i.e. with max depth 2, the above looks like this:
        //      struct<name: string, bff: struct<name: string, _serialized_bff: bytes>>
        val recordName = fd.getMessageType.getFullName
        val recursiveDepth = existingRecordNames.getOrElse(recordName, 0)
        val recursiveFieldMaxDepth = protobufOptions.recursiveFieldMaxDepth
        if (existingRecordNames.contains(recordName) && (recursiveFieldMaxDepth <= 0 ||
          recursiveFieldMaxDepth > 10)) {
          throw QueryCompilationErrors.foundRecursionInProtobufSchema(fd.toString())
        } else if (existingRecordNames.contains(recordName) &&
          recursiveDepth >= recursiveFieldMaxDepth) {
          // Recursive depth limit is reached. This field is dropped.
          // If it is inside a container like map or array, the containing field is dropped.
          log.info(
            s"The field ${fd.getFullName} of type $recordName is dropped " +
              s"at recursive depth $recursiveDepth"
          )
          None
        } else {
          val newRecordNames = existingRecordNames + (recordName -> (recursiveDepth + 1))
          val fields = fd.getMessageType.getFields.asScala.flatMap(
            structFieldFor(_, newRecordNames, protobufOptions)
          ).toSeq
          fields match {
            case Nil =>
              if (protobufOptions.retainEmptyMessage) {
                Some(convertEmptyProtoToStructWithDummyField(fd.getFullName))
              } else {
                log.info(
                  s"Dropping ${fd.getFullName} as it does not have any fields left " +
                    "likely due to recursive depth limit."
                )
                None
              }
            case fds => Some(StructType(fds))
          }
        }
      case other =>
        throw QueryCompilationErrors.protobufTypeUnsupportedYetError(other.toString)
    }
    dataType.map {
      case dt: MapType => StructField(fd.getName, dt)
      case dt if fd.isRepeated =>
        StructField(fd.getName, ArrayType(dt, containsNull = false))
      case dt => StructField(fd.getName, dt, nullable = !fd.isRequired)
    }
  }