private def newConverter()

in sql/core/src/main/scala/org/apache/spark/sql/execution/datasources/parquet/ParquetRowConverter.scala [299:561]


  private def newConverter(
      parquetType: Type,
      catalystType: DataType,
      updater: ParentContainerUpdater): Converter with HasParentContainerUpdater = {

    def isUnsignedIntTypeMatched(bitWidth: Int): Boolean = {
      parquetType.getLogicalTypeAnnotation match {
        case i: IntLogicalTypeAnnotation if !i.isSigned => i.getBitWidth == bitWidth
        case _ => false
      }
    }

    catalystType match {
      case LongType if isUnsignedIntTypeMatched(32) =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            this.updater.setLong(Integer.toUnsignedLong(value))
        }
      case LongType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            this.updater.setLong(value)
        }
      case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            this.updater.setDouble(value)
        }
      case DoubleType if parquetType.asPrimitiveType().getPrimitiveTypeName == FLOAT =>
        new ParquetPrimitiveConverter(updater) {
          override def addFloat(value: Float): Unit =
            this.updater.setDouble(value)
        }
      case BooleanType | IntegerType | LongType | FloatType | DoubleType | BinaryType |
        _: AnsiIntervalType =>
        new ParquetPrimitiveConverter(updater)

      case ByteType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            this.updater.setByte(value.asInstanceOf[PhysicalByteType#InternalType])
        }

      case ShortType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit =
            this.updater.setShort(value.asInstanceOf[PhysicalShortType#InternalType])
        }

      // For INT32 backed decimals
      case _: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetIntDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            // If the column is a plain INT32, we should pick the precision that can host the
            // largest INT32 value.
            new ParquetIntDictionaryAwareDecimalConverter(
              DecimalType.IntDecimal.precision, 0, updater)
        }

      // For unsigned int64
      case _: DecimalType if isUnsignedIntTypeMatched(64) =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            this.updater.set(Decimal(java.lang.Long.toUnsignedString(value)))
          }
        }

      // For INT64 backed decimals
      case t: DecimalType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT64 =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetLongDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            // If the column is a plain INT64, we should pick the precision that can host the
            // largest INT64 value.
            new ParquetLongDictionaryAwareDecimalConverter(
              DecimalType.LongDecimal.precision, 0, updater)
        }

      // For BINARY and FIXED_LEN_BYTE_ARRAY backed decimals
      case t: DecimalType
        if parquetType.asPrimitiveType().getPrimitiveTypeName == FIXED_LEN_BYTE_ARRAY ||
           parquetType.asPrimitiveType().getPrimitiveTypeName == BINARY =>
        parquetType.asPrimitiveType().getLogicalTypeAnnotation match {
          case decimalType: DecimalLogicalTypeAnnotation =>
            new ParquetBinaryDictionaryAwareDecimalConverter(
              decimalType.getPrecision, decimalType.getScale, updater)
          case _ =>
            throw QueryExecutionErrors.cannotCreateParquetConverterForTypeError(
              t, parquetType.toString)
        }

      case t: DecimalType =>
        throw QueryExecutionErrors.cannotCreateParquetConverterForDecimalTypeError(
          t, parquetType.toString)

      case _: StringType =>
        new ParquetStringConverter(updater)

      // As long as the parquet type is INT64 timestamp, whether logical annotation
      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
      case TimestampType
        if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
           parquetType.getLogicalTypeAnnotation
             .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            this.updater.setLong(timestampRebaseFunc(value))
          }
        }

      // As long as the parquet type is INT64 timestamp, whether logical annotation
      // `isAdjustedToUTC` is false or true, it will be read as Spark's TimestampLTZ type
      case TimestampType
        if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimestampLogicalTypeAnnotation] &&
          parquetType.getLogicalTypeAnnotation
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            val micros = DateTimeUtils.millisToMicros(value)
            this.updater.setLong(timestampRebaseFunc(micros))
          }
        }

      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
      case TimestampType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
        new ParquetPrimitiveConverter(updater) {
          // Converts nanosecond timestamps stored as INT96
          override def addBinary(value: Binary): Unit = {
            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
            val gregorianMicros = int96RebaseFunc(julianMicros)
            val adjTime = convertTz.map(DateTimeUtils.convertTz(gregorianMicros, _, ZoneOffset.UTC))
              .getOrElse(gregorianMicros)
            this.updater.setLong(adjTime)
          }
        }

      // INT96 timestamp doesn't have a logical type, here we check the physical type instead.
      case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT96 =>
        new ParquetPrimitiveConverter(updater) {
          // Converts nanosecond timestamps stored as INT96.
          // TimestampNTZ type does not require rebasing due to its lack of time zone context.
          override def addBinary(value: Binary): Unit = {
            val julianMicros = ParquetRowConverter.binaryToSQLTimestamp(value)
            this.updater.setLong(julianMicros)
          }
        }

      case TimestampNTZType
        if canReadAsTimestampNTZ(parquetType) &&
          parquetType.getLogicalTypeAnnotation
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
        new ParquetPrimitiveConverter(updater)

      case TimestampNTZType
        if canReadAsTimestampNTZ(parquetType) &&
          parquetType.getLogicalTypeAnnotation
            .asInstanceOf[TimestampLogicalTypeAnnotation].getUnit == TimeUnit.MILLIS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            val micros = DateTimeUtils.millisToMicros(value)
            this.updater.setLong(micros)
          }
        }

      // Allow upcasting INT32 date to timestampNTZ.
      case TimestampNTZType if parquetType.asPrimitiveType().getPrimitiveTypeName == INT32 &&
          parquetType.getLogicalTypeAnnotation.isInstanceOf[DateLogicalTypeAnnotation] =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit = {
            this.updater.set(DateTimeUtils.daysToMicros(dateRebaseFunc(value), ZoneOffset.UTC))
          }
        }

      case DateType =>
        new ParquetPrimitiveConverter(updater) {
          override def addInt(value: Int): Unit = {
            this.updater.set(dateRebaseFunc(value))
          }
        }

      case _: TimeType
        if parquetType.getLogicalTypeAnnotation.isInstanceOf[TimeLogicalTypeAnnotation] &&
          parquetType.getLogicalTypeAnnotation
            .asInstanceOf[TimeLogicalTypeAnnotation].getUnit == TimeUnit.MICROS =>
        new ParquetPrimitiveConverter(updater) {
          override def addLong(value: Long): Unit = {
            this.updater.setLong(value)
          }
        }

      // A repeated field that is neither contained by a `LIST`- or `MAP`-annotated group nor
      // annotated by `LIST` or `MAP` should be interpreted as a required list of required
      // elements where the element type is the type of the field.
      case t: ArrayType
        if !parquetType.getLogicalTypeAnnotation.isInstanceOf[ListLogicalTypeAnnotation] =>
        if (parquetType.isPrimitive) {
          new RepeatedPrimitiveConverter(parquetType, t.elementType, updater)
        } else {
          new RepeatedGroupConverter(parquetType, t.elementType, updater)
        }

      case t: ArrayType =>
        new ParquetArrayConverter(parquetType.asGroupType(), t, updater)

      case t: MapType =>
        new ParquetMapConverter(parquetType.asGroupType(), t, updater)

      case t: StructType if VariantMetadata.isVariantStruct(t) =>
        new ParquetVariantConverter(t, parquetType.asGroupType(), updater)

      case t: StructType =>
        val wrappedUpdater = {
          // SPARK-30338: avoid unnecessary InternalRow copying for nested structs:
          // There are two cases to handle here:
          //
          //  1. Parent container is a map or array: we must make a deep copy of the mutable row
          //     because this converter may be invoked multiple times per Parquet input record
          //     (if the map or array contains multiple elements).
          //
          //  2. Parent container is a struct: we don't need to copy the row here because either:
          //
          //     (a) all ancestors are structs and therefore no copying is required because this
          //         converter will only be invoked once per Parquet input record, or
          //     (b) some ancestor is struct that is nested in a map or array and that ancestor's
          //         converter will perform deep-copying (which will recursively copy this row).
          if (updater.isInstanceOf[RowUpdater]) {
            // `updater` is a RowUpdater, implying that the parent container is a struct.
            updater
          } else {
            // `updater` is NOT a RowUpdater, implying that the parent container a map or array.
            new ParentContainerUpdater {
              override def set(value: Any): Unit = {
                updater.set(value.asInstanceOf[SpecificInternalRow].copy())  // deep copy
              }
            }
          }
        }
        new ParquetRowConverter(
          schemaConverter,
          parquetType.asGroupType(),
          t,
          convertTz,
          datetimeRebaseSpec,
          int96RebaseSpec,
          wrappedUpdater)

      case t: VariantType =>
        if (SQLConf.get.getConf(SQLConf.VARIANT_ALLOW_READING_SHREDDED)) {
          new ParquetVariantConverter(t, parquetType.asGroupType(), updater)
        } else {
          new ParquetUnshreddedVariantConverter(parquetType.asGroupType(), updater)
        }

      case t =>
        throw QueryExecutionErrors.cannotCreateParquetConverterForDataTypeError(
          t, parquetType.toString)
    }
  }