private def makeGetter()

in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/jdbc/JdbcUtils.scala [406:619]


  private def makeGetter(
      dt: DataType,
      dialect: JdbcDialect,
      metadata: Metadata): JDBCValueGetter = dt match {
    case BooleanType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setBoolean(pos, rs.getBoolean(pos + 1))

    case DateType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        // DateTimeUtils.fromJavaDate does not handle null value, so we need to check it.
        val dateVal = rs.getDate(pos + 1)
        if (dateVal != null) {
          row.setInt(pos, fromJavaDate(dialect.convertJavaDateToDate(dateVal)))
        } else {
          row.update(pos, null)
        }

    // When connecting with Oracle DB through JDBC, the precision and scale of BigDecimal
    // object returned by ResultSet.getBigDecimal is not correctly matched to the table
    // schema reported by ResultSetMetaData.getPrecision and ResultSetMetaData.getScale.
    // If inserting values like 19999 into a column with NUMBER(12, 2) type, you get through
    // a BigDecimal object with scale as 0. But the dataframe schema has correct type as
    // DecimalType(12, 2). Thus, after saving the dataframe into parquet file and then
    // retrieve it, you will get wrong result 199.99.
    // So it is needed to set precision and scale for Decimal based on JDBC metadata.
    case DecimalType.Fixed(p, s) =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val decimal =
          nullSafeConvert[JBigDecimal](rs.getBigDecimal(pos + 1), d => Decimal(d, p, s))
        row.update(pos, decimal)

    case DoubleType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setDouble(pos, rs.getDouble(pos + 1))

    case FloatType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setFloat(pos, rs.getFloat(pos + 1))

    case IntegerType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setInt(pos, rs.getInt(pos + 1))

    case LongType if metadata.contains("binarylong") =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val l = nullSafeConvert[Array[Byte]](rs.getBytes(pos + 1), bytes => {
          var ans = 0L
          var j = 0
          while (j < bytes.length) {
            ans = 256 * ans + (255 & bytes(j))
            j = j + 1
          }
          ans
        })
        row.update(pos, l)

    case LongType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setLong(pos, rs.getLong(pos + 1))

    case ShortType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setShort(pos, rs.getShort(pos + 1))

    case ByteType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.setByte(pos, rs.getByte(pos + 1))

    case StringType if metadata.contains("rowid") =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val rawRowId = rs.getRowId(pos + 1)
        if (rawRowId == null) {
          row.update(pos, null)
        } else {
          row.update(pos, UTF8String.fromString(rawRowId.toString))
        }

    case StringType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        // TODO(davies): use getBytes for better performance, if the encoding is UTF-8
        row.update(pos, UTF8String.fromString(rs.getString(pos + 1)))

    // SPARK-34357 - sql TIME type represents as zero epoch timestamp.
    // It is mapped as Spark TimestampType but fixed at 1970-01-01 for day,
    // time portion is time of day, with no reference to a particular calendar,
    // time zone or date, with a precision till microseconds.
    // It stores the number of milliseconds after midnight, 00:00:00.000000
    case TimestampType if metadata.contains("logical_time_type") =>
      (rs: ResultSet, row: InternalRow, pos: Int) => {
        row.update(pos, nullSafeConvert[Time](
          rs.getTime(pos + 1), t => Math.multiplyExact(t.getTime, MICROS_PER_MILLIS)))
      }

    case TimestampType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val t = rs.getTimestamp(pos + 1)
        if (t != null) {
          row.setLong(pos, fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t)))
        } else {
          row.update(pos, null)
        }

    case TimestampNTZType if metadata.contains("logical_time_type") =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val micros = nullSafeConvert[Time](rs.getTime(pos + 1), t => {
          val time = dialect.convertJavaTimestampToTimestampNTZ(new Timestamp(t.getTime))
          localDateTimeToMicros(time)
        })
        row.update(pos, micros)

    case TimestampNTZType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val t = rs.getTimestamp(pos + 1)
        if (t != null) {
          row.setLong(pos, localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t)))
        } else {
          row.update(pos, null)
        }

    case BinaryType if metadata.contains("binarylong") =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val bytes = rs.getBytes(pos + 1)
        if (bytes != null) {
          val binary = bytes.flatMap(Integer.toBinaryString(_).getBytes(StandardCharsets.US_ASCII))
          row.update(pos, binary)
        } else {
          row.update(pos, null)
        }

    case BinaryType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.update(pos, rs.getBytes(pos + 1))

    case _: YearMonthIntervalType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.update(pos,
          nullSafeConvert(rs.getString(pos + 1), dialect.getYearMonthIntervalAsMonths))

    case _: DayTimeIntervalType =>
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        row.update(pos,
          nullSafeConvert(rs.getString(pos + 1), dialect.getDayTimeIntervalAsMicros))

    case _: ArrayType if metadata.contains("pg_bit_array_type") =>
      // SPARK-47628: Handle PostgreSQL bit(n>1) array type ahead. As in the pgjdbc driver,
      // bit(n>1)[] is not distinguishable from bit(1)[], and they are all recognized as boolen[].
      // This is wrong for bit(n>1)[], so we need to handle it first as byte array.
      (rs: ResultSet, row: InternalRow, pos: Int) =>
        val fieldString = rs.getString(pos + 1)
        if (fieldString != null) {
          val strArray = fieldString.substring(1, fieldString.length - 1).split(",")
          // Charset is picked from the pgjdbc driver for consistency.
          val bytesArray = strArray.map(_.getBytes(StandardCharsets.US_ASCII))
          row.update(pos, new GenericArrayData(bytesArray))
        } else {
          row.update(pos, null)
        }

    case ArrayType(et, _) =>
      def elementConversion(et: DataType): AnyRef => Any = et match {
        case TimestampType => arrayConverter[Timestamp] {
          (t: Timestamp) => fromJavaTimestamp(dialect.convertJavaTimestampToTimestamp(t))
        }

        case TimestampNTZType =>
          arrayConverter[Timestamp] {
            (t: Timestamp) => localDateTimeToMicros(dialect.convertJavaTimestampToTimestampNTZ(t))
          }

        case StringType =>
          arrayConverter[Object]((obj: Object) => UTF8String.fromString(obj.toString))

        case DateType => arrayConverter[Date] {
          (d: Date) => fromJavaDate(dialect.convertJavaDateToDate(d))
        }

        case dt: DecimalType =>
            arrayConverter[java.math.BigDecimal](d => Decimal(d, dt.precision, dt.scale))

        case LongType if metadata.contains("binarylong") =>
          throw QueryExecutionErrors.unsupportedArrayElementTypeBasedOnBinaryError(dt)

        case ArrayType(et0, _) =>
          arrayConverter[Array[Any]] {
            arr => new GenericArrayData(elementConversion(et0)(arr))
          }

        case IntegerType => arrayConverter[Int]((i: Int) => i)
        case FloatType => arrayConverter[Float]((f: Float) => f)
        case DoubleType => arrayConverter[Double]((d: Double) => d)
        case ShortType => arrayConverter[Short]((s: Short) => s)
        case BooleanType => arrayConverter[Boolean]((b: Boolean) => b)
        case LongType => arrayConverter[Long]((l: Long) => l)

        case _ => (array: Object) => array.asInstanceOf[Array[Any]]
      }

      (rs: ResultSet, row: InternalRow, pos: Int) =>
        try {
          val array = nullSafeConvert[java.sql.Array](
            input = rs.getArray(pos + 1),
            array => new GenericArrayData(elementConversion(et)(array.getArray())))
          row.update(pos, array)
        } catch {
          case e: java.lang.ClassCastException =>
            throw QueryExecutionErrors.wrongDatatypeInSomeRows(pos, dt)
        }

    case NullType =>
      (_: ResultSet, row: InternalRow, pos: Int) => row.update(pos, null)

    case _ => throw QueryExecutionErrors.unsupportedJdbcTypeError(dt.catalogString)
  }