private def newConverter()

in connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufSerializer.scala [79:298]


  private def newConverter(
      catalystType: DataType,
      fieldDescriptor: FieldDescriptor,
      catalystPath: Seq[String],
      protoPath: Seq[String]): Converter = {
    (catalystType, fieldDescriptor.getJavaType) match {
      case (NullType, _) =>
        (getter, ordinal) => null
      case (BooleanType, BOOLEAN) =>
        (getter, ordinal) => getter.getBoolean(ordinal)
      case (ByteType, INT) =>
        (getter, ordinal) => getter.getByte(ordinal).toInt
      case (ShortType, INT) =>
        (getter, ordinal) => getter.getShort(ordinal).toInt
      case (IntegerType, INT) =>
        (getter, ordinal) => {
          getter.getInt(ordinal)
        }
      case (LongType, INT) if fieldDescriptor.getLiteType == WireFormat.FieldType.UINT32 =>
        (getter, ordinal) => {
          getter.getLong(ordinal).toInt
        }
      case (LongType, LONG) =>
        (getter, ordinal) => getter.getLong(ordinal)
      case (DecimalType(), LONG)
        if fieldDescriptor.getLiteType == WireFormat.FieldType.UINT64 =>
        (getter, ordinal) => {
          getter.getDecimal(ordinal, 20, 0).toUnscaledLong
        }
      case (FloatType, FLOAT) =>
        (getter, ordinal) => getter.getFloat(ordinal)
      case (DoubleType, DOUBLE) =>
        (getter, ordinal) => getter.getDouble(ordinal)
      case (StringType, ENUM) =>
        val enumSymbols: Set[String] =
          fieldDescriptor.getEnumType.getValues.asScala.map(e => e.toString).toSet
        (getter, ordinal) =>
          val data = getter.getUTF8String(ordinal).toString
          if (!enumSymbols.contains(data)) {
            throw QueryExecutionErrors.cannotConvertCatalystValueToProtobufEnumTypeError(
              catalystPath,
              toFieldStr(protoPath),
              data,
              enumSymbols.mkString("\"", "\", \"", "\""))
          }
          fieldDescriptor.getEnumType.findValueByName(data)
      case (IntegerType, ENUM) =>
        val enumValues: Set[Int] =
          fieldDescriptor.getEnumType.getValues.asScala.map(e => e.getNumber).toSet
        (getter, ordinal) =>
          val data = getter.getInt(ordinal)
          if (!enumValues.contains(data)) {
            throw QueryExecutionErrors.cannotConvertCatalystValueToProtobufEnumTypeError(
              catalystPath,
              toFieldStr(protoPath),
              data.toString,
              enumValues.mkString(", "))
          }
          fieldDescriptor.getEnumType.findValueByNumber(data)
      case (StringType, STRING) =>
        (getter, ordinal) => {
          String.valueOf(getter.getUTF8String(ordinal))
        }

      case (BinaryType, BYTE_STRING) =>
        (getter, ordinal) => getter.getBinary(ordinal)

      case (DateType, INT) =>
        (getter, ordinal) => getter.getInt(ordinal)

      case (TimestampType, MESSAGE) =>
        (getter, ordinal) =>
          val millis = DateTimeUtils.microsToMillis(getter.getLong(ordinal))
          Timestamp
            .newBuilder()
            .setSeconds((millis / 1000))
            .setNanos(((millis % 1000) * 1000000).toInt)
            .build()

      case (ArrayType(et, containsNull), _) =>
        val elementConverter =
          newConverter(et, fieldDescriptor, catalystPath :+ "element", protoPath :+ "element")
        (getter, ordinal) => {
          val arrayData = getter.getArray(ordinal)
          val len = arrayData.numElements()
          val result = new Array[Any](len)
          var i = 0
          while (i < len) {
            if (containsNull && arrayData.isNullAt(i)) {
              result(i) = null
            } else {
              result(i) = elementConverter(arrayData, i)
            }
            i += 1
          }
          // Protobuf writer is expecting a Java Collection, so we convert it into
          // `ArrayList` backed by the specified array without data copying.
          java.util.Arrays.asList(result: _*)
        }

      // Handle serializing primitives back into well known wrapper types.
      case (BooleanType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == BoolValue.getDescriptor.getFullName =>
        (getter, ordinal) =>
          BoolValue.of(getter.getBoolean(ordinal))

      case (IntegerType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == Int32Value.getDescriptor.getFullName =>
        (getter, ordinal) =>
          Int32Value.of(getter.getInt(ordinal))

      case (IntegerType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName =>
        (getter, ordinal) =>
          UInt32Value.of(getter.getInt(ordinal))

      case (LongType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == Int64Value.getDescriptor.getFullName =>
        (getter, ordinal) =>
          Int64Value.of(getter.getLong(ordinal))

      case (LongType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName =>
        (getter, ordinal) =>
          UInt64Value.of(getter.getLong(ordinal))

      case (StringType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == StringValue.getDescriptor.getFullName =>
        (getter, ordinal) =>
          StringValue.of(getter.getUTF8String(ordinal).toString)

      case (BinaryType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == BytesValue.getDescriptor.getFullName =>
        (getter, ordinal) =>
          BytesValue.of(ByteString.copyFrom(getter.getBinary(ordinal)))

      case (FloatType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == FloatValue.getDescriptor.getFullName =>
        (getter, ordinal) =>
          FloatValue.of(getter.getFloat(ordinal))

      case (DoubleType, MESSAGE)
        if fieldDescriptor.getMessageType.getFullName == DoubleValue.getDescriptor.getFullName =>
        (getter, ordinal) =>
          DoubleValue.of(getter.getDouble(ordinal))

      case (st: StructType, MESSAGE) =>
        val structConverter =
          newStructConverter(st, fieldDescriptor.getMessageType, catalystPath, protoPath)
        val numFields = st.length
        (getter, ordinal) => structConverter(getter.getStruct(ordinal, numFields))

      case (MapType(kt, vt, valueContainsNull), MESSAGE) =>
        var keyField: FieldDescriptor = null
        var valueField: FieldDescriptor = null
        fieldDescriptor.getMessageType.getFields.asScala.foreach { field =>
          field.getName match {
            case "key" =>
              keyField = field
            case "value" =>
              valueField = field
          }
        }

        val keyConverter = newConverter(kt, keyField, catalystPath :+ "key", protoPath :+ "key")
        val valueConverter =
          newConverter(vt, valueField, catalystPath :+ "value", protoPath :+ "value")

        (getter, ordinal) =>
          val mapData = getter.getMap(ordinal)
          val len = mapData.numElements()
          val list = new java.util.ArrayList[DynamicMessage]()
          val keyArray = mapData.keyArray()
          val valueArray = mapData.valueArray()
          var i = 0
          while (i < len) {
            val result = DynamicMessage.newBuilder(fieldDescriptor.getMessageType)
            if (valueContainsNull && valueArray.isNullAt(i)) {
              result.setField(keyField, keyConverter(keyArray, i))
              result.setField(valueField, valueField.getDefaultValue)
            } else {
              result.setField(keyField, keyConverter(keyArray, i))
              result.setField(valueField, valueConverter(valueArray, i))
            }
            list.add(result.build())
            i += 1
          }
          list

      case (DayTimeIntervalType(startField, endField), MESSAGE) =>
        (getter, ordinal) =>
          val dayTimeIntervalString =
            IntervalUtils.toDayTimeIntervalString(getter.getLong(ordinal)
              , ANSI_STYLE, startField, endField)
          val calendarInterval = IntervalUtils.fromIntervalString(dayTimeIntervalString)

          val millis = DateTimeUtils.microsToMillis(calendarInterval.microseconds)
          val duration = Duration
            .newBuilder()
            .setSeconds((millis / 1000))
            .setNanos(((millis % 1000) * 1000000).toInt)

          if (duration.getSeconds < 0 && duration.getNanos > 0) {
            duration.setSeconds(duration.getSeconds + 1)
            duration.setNanos(duration.getNanos - 1000000000)
          } else if (duration.getSeconds > 0 && duration.getNanos < 0) {
            duration.setSeconds(duration.getSeconds - 1)
            duration.setNanos(duration.getNanos + 1000000000)
          }
          duration.build()

      case _ =>
        throw QueryCompilationErrors.cannotConvertCatalystTypeToProtobufTypeError(
          catalystPath,
          toFieldStr(protoPath),
          catalystType,
          s"${fieldDescriptor} ${fieldDescriptor.toProto.getLabel} ${fieldDescriptor.getJavaType}" +
            s" ${fieldDescriptor.getType}")
    }
  }