in sql/catalyst/src/main/scala/org/apache/spark/sql/catalyst/json/JacksonParser.scala [228:465]
def makeConverter(dataType: DataType): ValueConverter = dataType match {
case BooleanType =>
(parser: JsonParser) => parseJsonToken[java.lang.Boolean](parser, dataType) {
case VALUE_TRUE => true
case VALUE_FALSE => false
}
case ByteType =>
(parser: JsonParser) => parseJsonToken[java.lang.Byte](parser, dataType) {
case VALUE_NUMBER_INT => parser.getByteValue
}
case ShortType =>
(parser: JsonParser) => parseJsonToken[java.lang.Short](parser, dataType) {
case VALUE_NUMBER_INT => parser.getShortValue
}
case IntegerType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_NUMBER_INT => parser.getIntValue
}
case LongType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_NUMBER_INT => parser.getLongValue
}
case FloatType =>
(parser: JsonParser) => parseJsonToken[java.lang.Float](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getFloatValue
case VALUE_STRING if parser.getTextLength >= 1 =>
// Special case handling for NaN and Infinity.
parser.getText match {
case "NaN" if options.allowNonNumericNumbers =>
Float.NaN
case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
Float.PositiveInfinity
case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
Float.NegativeInfinity
case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
FloatType)
}
}
case DoubleType =>
(parser: JsonParser) => parseJsonToken[java.lang.Double](parser, dataType) {
case VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT =>
parser.getDoubleValue
case VALUE_STRING if parser.getTextLength >= 1 =>
// Special case handling for NaN and Infinity.
parser.getText match {
case "NaN" if options.allowNonNumericNumbers =>
Double.NaN
case "+INF" | "+Infinity" | "Infinity" if options.allowNonNumericNumbers =>
Double.PositiveInfinity
case "-INF" | "-Infinity" if options.allowNonNumericNumbers =>
Double.NegativeInfinity
case _ => throw StringAsDataTypeException(parser.currentName, parser.getText,
DoubleType)
}
}
case _: StringType => (parser: JsonParser) => {
// This must be enabled if we will retrieve the bytes directly from the raw content:
val oldFeature = parser.getFeatureMask
val featureToAdd = JsonParser.Feature.INCLUDE_SOURCE_IN_LOCATION.getMask
parser.overrideStdFeatures(oldFeature | featureToAdd, featureToAdd)
val result = parseJsonToken[UTF8String](parser, dataType) {
case VALUE_STRING =>
UTF8String.fromString(parser.getText)
case other =>
// Note that it always tries to convert the data as string without the case of failure.
val startLocation = parser.currentTokenLocation()
def skipAhead(): Unit = {
other match {
case START_OBJECT =>
parser.skipChildren()
case START_ARRAY =>
parser.skipChildren()
case _ =>
// Do nothing in this case; we've already read the token
}
}
// PositionedReadable
startLocation.contentReference().getRawContent match {
case byteArray: Array[Byte] if exactStringParsing =>
skipAhead()
val endLocation = parser.currentLocation.getByteOffset
UTF8String.fromBytes(
byteArray,
startLocation.getByteOffset.toInt,
endLocation.toInt - (startLocation.getByteOffset.toInt))
case positionedReadable: PositionedReadable if exactStringParsing =>
skipAhead()
val endLocation = parser.currentLocation.getByteOffset
val size = endLocation.toInt - (startLocation.getByteOffset.toInt)
val buffer = new Array[Byte](size)
positionedReadable.read(startLocation.getByteOffset, buffer, 0, size)
UTF8String.fromBytes(buffer, 0, size)
case _ =>
val writer = new ByteArrayOutputStream()
Utils.tryWithResource(factory.createGenerator(writer, JsonEncoding.UTF8)) {
generator => generator.copyCurrentStructure(parser)
}
UTF8String.fromBytes(writer.toByteArray)
}
}
// Reset back to the original configuration using `~0` as the mask,
// which is a bitmask with all bits set, effectively allowing all features
// to be reset. This ensures that every feature is restored to its previous
// state as defined by `oldFeature`.
parser.overrideStdFeatures(oldFeature, ~0)
result
}
case TimestampType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
try {
timestampFormatter.parse(parser.getText)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForTimestampType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToTimestamp(str, options.zoneId).getOrElse(throw e)
}
case VALUE_NUMBER_INT =>
parser.getLongValue * 1000000L
}
case TimestampNTZType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
timestampNTZFormatter.parseWithoutTimeZone(parser.getText, false)
}
case DateType =>
(parser: JsonParser) => parseJsonToken[java.lang.Integer](parser, dataType) {
case VALUE_STRING if parser.getTextLength >= 1 =>
try {
dateFormatter.parse(parser.getText)
} catch {
case NonFatal(e) =>
// If fails to parse, then tries the way used in 2.0 and 1.x for backwards
// compatibility if enabled.
if (!enableParsingFallbackForDateType) {
throw e
}
val str = DateTimeUtils.cleanLegacyTimestampStr(UTF8String.fromString(parser.getText))
DateTimeUtils.stringToDate(str).getOrElse {
// In Spark 1.5.0, we store the data as number of days since epoch in string.
// So, we just convert it to Int.
try {
RebaseDateTime.rebaseJulianToGregorianDays(parser.getText.toInt)
} catch {
case _: NumberFormatException => throw e
}
}.asInstanceOf[Integer]
}
}
case BinaryType =>
(parser: JsonParser) => parseJsonToken[Array[Byte]](parser, dataType) {
case VALUE_STRING => parser.getBinaryValue
}
case dt: DecimalType =>
(parser: JsonParser) => parseJsonToken[Decimal](parser, dataType) {
case (VALUE_NUMBER_INT | VALUE_NUMBER_FLOAT) =>
Decimal(parser.getDecimalValue, dt.precision, dt.scale)
case VALUE_STRING if parser.getTextLength >= 1 =>
val bigDecimal = decimalParser(parser.getText)
Decimal(bigDecimal, dt.precision, dt.scale)
}
case CalendarIntervalType => (parser: JsonParser) =>
parseJsonToken[CalendarInterval](parser, dataType) {
case VALUE_STRING =>
IntervalUtils.safeStringToInterval(UTF8String.fromString(parser.getText))
}
case ym: YearMonthIntervalType => (parser: JsonParser) =>
parseJsonToken[Integer](parser, dataType) {
case VALUE_STRING =>
val expr = Cast(Literal(parser.getText), ym)
Integer.valueOf(expr.eval(EmptyRow).asInstanceOf[Int])
}
case dt: DayTimeIntervalType => (parser: JsonParser) =>
parseJsonToken[java.lang.Long](parser, dataType) {
case VALUE_STRING =>
val expr = Cast(Literal(parser.getText), dt)
java.lang.Long.valueOf(expr.eval(EmptyRow).asInstanceOf[Long])
}
case st: StructType =>
val fieldConverters = st.map(_.dataType).map(makeConverter).toArray
(parser: JsonParser) => parseJsonToken[InternalRow](parser, dataType) {
case START_OBJECT => convertObject(parser, st, fieldConverters).get
}
case at: ArrayType =>
val elementConverter = makeConverter(at.elementType)
(parser: JsonParser) => parseJsonToken[ArrayData](parser, dataType) {
case START_ARRAY => convertArray(parser, elementConverter)
}
case mt: MapType =>
val valueConverter = makeConverter(mt.valueType)
(parser: JsonParser) => parseJsonToken[MapData](parser, dataType) {
case START_OBJECT => convertMap(parser, valueConverter)
}
case udt: UserDefinedType[_] =>
makeConverter(udt.sqlType)
case _: NullType =>
(parser: JsonParser) => parseJsonToken[java.lang.Long](parser, dataType) {
case _ => null
}
case _: VariantType => parseVariant
// We don't actually hit this exception though, we keep it for understandability
case _ => throw ExecutionErrors.unsupportedDataTypeError(dataType)
}