def makeConverter()

in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala [228:465]


  def makeConverter(dataType: DataType): ValueConverter = dataType match {
    case BooleanType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Boolean](parser, dataType) {
        case VALUE_TRUE => true
        case VALUE_FALSE => false
      }

    case ByteType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Byte](parser, dataType) {
        case VALUE_NUMBER_INT => parser.getByteValue
      }

    case ShortType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Short](parser, dataType) {
        case VALUE_NUMBER_INT => parser.getShortValue
      }

    case IntegerType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
        case VALUE_NUMBER_INT => parser.getIntValue
      }

    case LongType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
        case VALUE_NUMBER_INT => parser.getLongValue
      }

    case FloatType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Float](parser, dataType) {
        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
          parser.getFloatValue

        case VALUE_STRING if parser.getTextLength >= 1 =>
          // Special case handling for NaN and Infinity.
          parser.getText match {
            case "NaN" if options.allowNonNumericNumbers =>
              Float.NaN
            case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
              Float.PositiveInfinity
            case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
              Float.NegativeInfinity
            case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
              FloatType)
          }
      }

    case DoubleType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Double](parser, dataType) {
        case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
          parser.getDoubleValue

        case VALUE_STRING if parser.getTextLength >= 1 =>
          // Special case handling for NaN and Infinity.
          parser.getText match {
            case "NaN" if options.allowNonNumericNumbers =>
              Double.NaN
            case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
              Double.PositiveInfinity
            case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
              Double.NegativeInfinity
            case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
              DoubleType)
          }
      }

    case _: StringType => (parser: JsonParser) => {
      // This must be enabled if we will retrieve the bytes directly from the raw content:
      val oldFeature = parser.getFeatureMask
      val featureToAdd = JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION.getMask
      parser.overrideStdFeatures(oldFeature | featureToAdd, featureToAdd)
      val result = parseJsonToken[UTF8String](parser, dataType) {
        case VALUE_STRING =>
          UTF8String.fromString(parser.getText)

        case other =>
          // Note that it always tries to convert the data as string without the case of failure.
          val startLocation = parser.currentTokenLocation()
          def skipAhead(): Unit = {
            other match {
              case START_OBJECT =>
                parser.skipChildren()
              case START_ARRAY =>
                parser.skipChildren()
              case _ =>
              // Do nothing in this case; we've already read the token
            }
          }

          // PositionedReadable
          startLocation.contentReference().getRawContent match {
            case byteArray: Array[Byte] if exactStringParsing =>
              skipAhead()
              val endLocation = parser.currentLocation.getByteOffset

              UTF8String.fromBytes(
                byteArray,
                startLocation.getByteOffset.toInt,
                endLocation.toInt - (startLocation.getByteOffset.toInt))
            case positionedReadable: PositionedReadable if exactStringParsing =>
              skipAhead()
              val endLocation = parser.currentLocation.getByteOffset

              val size = endLocation.toInt - (startLocation.getByteOffset.toInt)
              val buffer = new Array[Byte](size)
              positionedReadable.read(startLocation.getByteOffset, buffer, 0, size)
              UTF8String.fromBytes(buffer, 0, size)
            case _ =>
              val writer = new ByteArrayOutputStream()
              Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
                generator => generator.copyCurrentStructure(parser)
              }
              UTF8String.fromBytes(writer.toByteArray)
          }
        }
      // Reset back to the original configuration using `~0` as the mask,
      // which is a bitmask with all bits set, effectively allowing all features
      // to be reset. This ensures that every feature is restored to its previous
      // state as defined by `oldFeature`.
      parser.overrideStdFeatures(oldFeature, ~0)
      result
    }

    case TimestampType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
        case VALUE_STRING if parser.getTextLength >= 1 =>
          try {
            timestampFormatter.parse(parser.getText)
          } catch {
            case NonFatal(e) =>
              // If fails to parse, then tries the way used in 2.0 and 1.x for backwards
              // compatibility if enabled.
              if (!enableParsingFallbackForTimestampType) {
                throw e
              }
              val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
              DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
          }

        case VALUE_NUMBER_INT =>
          parser.getLongValue * 1000000L
      }

    case TimestampNTZType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
        case VALUE_STRING if parser.getTextLength >= 1 =>
          timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
      }

    case DateType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
        case VALUE_STRING if parser.getTextLength >= 1 =>
          try {
            dateFormatter.parse(parser.getText)
          } catch {
            case NonFatal(e) =>
              // If fails to parse, then tries the way used in 2.0 and 1.x for backwards
              // compatibility if enabled.
              if (!enableParsingFallbackForDateType) {
                throw e
              }
              val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
              DateTimeUtils.stringToDate(str).getOrElse {
                // In Spark 1.5.0, we store the data as number of days since epoch in string.
                // So, we just convert it to Int.
                try {
                  RebaseDateTime.rebaseJulianToGregorianDays(parser.getText.toInt)
                } catch {
                  case _: NumberFormatException => throw e
                }
              }.asInstanceOf[Integer]
          }
      }

    case BinaryType =>
      (parser: JsonParser) => parseJsonToken[Array[Byte]](parser, dataType) {
        case VALUE_STRING => parser.getBinaryValue
      }

    case dt: DecimalType =>
      (parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) {
        case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
          Decimal(parser.getDecimalValue, dt.precision, dt.scale)
        case VALUE_STRING if parser.getTextLength >= 1 =>
          val bigDecimal = decimalParser(parser.getText)
          Decimal(bigDecimal, dt.precision, dt.scale)
      }

    case CalendarIntervalType => (parser: JsonParser) =>
      parseJsonToken[CalendarInterval](parser, dataType) {
        case VALUE_STRING =>
          IntervalUtils.safeStringToInterval(UTF8String.fromString(parser.getText))
      }

    case ym: YearMonthIntervalType => (parser: JsonParser) =>
      parseJsonToken[Integer](parser, dataType) {
        case VALUE_STRING =>
          val expr = Cast(Literal(parser.getText), ym)
          Integer.valueOf(expr.eval(EmptyRow).asInstanceOf[Int])
      }

    case dt: DayTimeIntervalType => (parser: JsonParser) =>
      parseJsonToken[java.lang.Long](parser, dataType) {
        case VALUE_STRING =>
          val expr = Cast(Literal(parser.getText), dt)
          java.lang.Long.valueOf(expr.eval(EmptyRow).asInstanceOf[Long])
      }

    case st: StructType =>
      val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
      (parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) {
        case START_OBJECT => convertObject(parser, st, fieldConverters).get
      }

    case at: ArrayType =>
      val elementConverter = makeConverter(at.elementType)
      (parser: JsonParser) => parseJsonToken[ArrayData](parser, dataType) {
        case START_ARRAY => convertArray(parser, elementConverter)
      }

    case mt: MapType =>
      val valueConverter = makeConverter(mt.valueType)
      (parser: JsonParser) => parseJsonToken[MapData](parser, dataType) {
        case START_OBJECT => convertMap(parser, valueConverter)
      }

    case udt: UserDefinedType[_] =>
      makeConverter(udt.sqlType)

    case _: NullType =>
      (parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
        case _ => null
      }

    case _: VariantType => parseVariant

    // We don't actually hit this exception though, we keep it for understandability
    case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
  }