private def newWriter()

in connector/protobuf/src/main/scala/org/apache/spark/sql/protobuf/ProtobufDeserializer.scala [173:395]


  private def newWriter(
      protoType: FieldDescriptor,
      catalystType: DataType,
      protoPath: Seq[String],
      catalystPath: Seq[String]): (CatalystDataUpdater, Int, Any) => Unit = {

    (protoType.getJavaType, catalystType) match {

      case (null, NullType) => (updater, ordinal, _) => updater.setNullAt(ordinal)

      // TODO: we can avoid boxing if future version of Protobuf provide primitive accessors.
      case (BOOLEAN, BooleanType) =>
        (updater, ordinal, value) => updater.setBoolean(ordinal, value.asInstanceOf[Boolean])

      case (INT, IntegerType) =>
        (updater, ordinal, value) => updater.setInt(ordinal, value.asInstanceOf[Int])

      case (INT, ByteType) =>
        (updater, ordinal, value) => updater.setByte(ordinal, value.asInstanceOf[Byte])

      case (INT, ShortType) =>
        (updater, ordinal, value) => updater.setShort(ordinal, value.asInstanceOf[Short])

      case (INT, LongType) =>
        (updater, ordinal, value) =>
          updater.setLong(
            ordinal,
            Integer.toUnsignedLong(value.asInstanceOf[Int]))
      case  (
        MESSAGE | BOOLEAN | INT | FLOAT | DOUBLE | LONG | STRING | ENUM | BYTE_STRING,
        ArrayType(dataType: DataType, containsNull)) if protoType.isRepeated =>
        newArrayWriter(protoType, protoPath, catalystPath, dataType, containsNull)

      case (LONG, LongType) =>
        (updater, ordinal, value) => updater.setLong(ordinal, value.asInstanceOf[Long])

      case (LONG, DecimalType.LongDecimal) =>
        (updater, ordinal, value) =>
          updater.setDecimal(
            ordinal,
            Decimal.fromString(
              UTF8String.fromString(java.lang.Long.toUnsignedString(value.asInstanceOf[Long]))))

      case (FLOAT, FloatType) =>
        (updater, ordinal, value) => updater.setFloat(ordinal, value.asInstanceOf[Float])

      case (DOUBLE, DoubleType) =>
        (updater, ordinal, value) => updater.setDouble(ordinal, value.asInstanceOf[Double])

      case (STRING, StringType) =>
        (updater, ordinal, value) =>
          val str = value match {
            case s: String => UTF8String.fromString(s)
          }
          updater.set(ordinal, str)

      case (BYTE_STRING, BinaryType) =>
        (updater, ordinal, value) =>
          val byte_array = value match {
            case s: ByteString => s.toByteArray
            case unsupported =>
              throw QueryCompilationErrors.invalidByteStringFormatError(unsupported)
          }
          updater.set(ordinal, byte_array)

      case (MESSAGE, MapType(keyType, valueType, valueContainsNull)) =>
        newMapWriter(protoType, protoPath, catalystPath, keyType, valueType, valueContainsNull)

      case (MESSAGE, TimestampType) =>
        (updater, ordinal, value) =>
          val secondsField = protoType.getMessageType.getFields.get(0)
          val nanoSecondsField = protoType.getMessageType.getFields.get(1)
          val message = value.asInstanceOf[DynamicMessage]
          val seconds = message.getField(secondsField).asInstanceOf[Long]
          val nanoSeconds = message.getField(nanoSecondsField).asInstanceOf[Int]
          val micros = DateTimeUtils.millisToMicros(seconds * 1000)
          updater.setLong(ordinal, micros + TimeUnit.NANOSECONDS.toMicros(nanoSeconds))

      case (MESSAGE, DayTimeIntervalType(startField, endField)) =>
        (updater, ordinal, value) =>
          val secondsField = protoType.getMessageType.getFields.get(0)
          val nanoSecondsField = protoType.getMessageType.getFields.get(1)
          val message = value.asInstanceOf[DynamicMessage]
          val seconds = message.getField(secondsField).asInstanceOf[Long]
          val nanoSeconds = message.getField(nanoSecondsField).asInstanceOf[Int]
          val micros = DateTimeUtils.millisToMicros(seconds * 1000)
          updater.setLong(ordinal, micros + TimeUnit.NANOSECONDS.toMicros(nanoSeconds))

      case (MESSAGE, StringType)
        if protoType.getMessageType.getFullName == "google.protobuf.Any" =>
        (updater, ordinal, value) =>
          // Convert 'Any' protobuf message to JSON string.
          val jsonStr = jsonPrinter.print(value.asInstanceOf[DynamicMessage])
          updater.set(ordinal, UTF8String.fromString(jsonStr))

      // Handle well known wrapper types. We unpack the value field when the desired
      // output type is a primitive (determined by the option in [[ProtobufOptions]])
      case (MESSAGE, BooleanType)
        if protoType.getMessageType.getFullName == BoolValue.getDescriptor.getFullName =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setBoolean(ordinal, unwrapped.asInstanceOf[Boolean])
          }
      case (MESSAGE, IntegerType)
        if (protoType.getMessageType.getFullName == Int32Value.getDescriptor.getFullName
          || protoType.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName) =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setInt(ordinal, unwrapped.asInstanceOf[Int])
          }
      case (MESSAGE, LongType)
        if (protoType.getMessageType.getFullName == UInt32Value.getDescriptor.getFullName) =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setLong(ordinal, Integer.toUnsignedLong(unwrapped.asInstanceOf[Int]))
          }
      case (MESSAGE, LongType)
        if (protoType.getMessageType.getFullName == Int64Value.getDescriptor.getFullName
          || protoType.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName) =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setLong(ordinal, unwrapped.asInstanceOf[Long])
          }
      case (MESSAGE, DecimalType.LongDecimal)
        if (protoType.getMessageType.getFullName == UInt64Value.getDescriptor.getFullName) =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            val dec = Decimal.fromString(
              UTF8String.fromString(java.lang.Long.toUnsignedString(unwrapped.asInstanceOf[Long])))
            updater.setDecimal(ordinal, dec)
          }
      case (MESSAGE, StringType)
        if protoType.getMessageType.getFullName == StringValue.getDescriptor.getFullName =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.set(ordinal, UTF8String.fromString(unwrapped.asInstanceOf[String]))
          }
      case (MESSAGE, BinaryType)
        if protoType.getMessageType.getFullName == BytesValue.getDescriptor.getFullName =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.set(ordinal, unwrapped.asInstanceOf[ByteString].toByteArray)
          }
      case (MESSAGE, FloatType)
        if protoType.getMessageType.getFullName == FloatValue.getDescriptor.getFullName =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setFloat(ordinal, unwrapped.asInstanceOf[Float])
          }
      case (MESSAGE, DoubleType)
        if protoType.getMessageType.getFullName == DoubleValue.getDescriptor.getFullName =>
        (updater, ordinal, value) =>
          val dm = value.asInstanceOf[DynamicMessage]
          val unwrapped = getFieldValue(dm, dm.getDescriptorForType.getFields.get(0))
          if (unwrapped == null) {
            updater.setNullAt(ordinal)
          } else {
            updater.setDouble(ordinal, unwrapped.asInstanceOf[Double])
          }

      case (MESSAGE, st: StructType) =>
        val writeRecord = getRecordWriter(
          protoType.getMessageType,
          st,
          protoPath,
          catalystPath,
          applyFilters = _ => false)
        (updater, ordinal, value) =>
          val row = new SpecificInternalRow(st)
          writeRecord(new RowUpdater(row), value.asInstanceOf[DynamicMessage])
          updater.set(ordinal, row)

      case (ENUM, StringType) =>
        (updater, ordinal, value) =>
          updater.set(
            ordinal,
            UTF8String.fromString(value.asInstanceOf[EnumValueDescriptor].getName))

      case (ENUM, IntegerType) =>
        (updater, ordinal, value) =>
          updater.setInt(ordinal, value.asInstanceOf[EnumValueDescriptor].getNumber)

      case _ =>
        throw QueryCompilationErrors.cannotConvertProtobufTypeToSqlTypeError(
          toFieldStr(protoPath),
          catalystPath,
          s"${protoType} ${protoType.toProto.getLabel} ${protoType.getJavaType}" +
            s" ${protoType.getType}",
          catalystType)
    }
  }