public ParquetVectorUpdater getUpdater()

in sql/core/src/main/java/org/apache/spark/sql/execution/datasources/parquet/ParquetVectorUpdaterFactory.java [71:229]


  public ParquetVectorUpdater getUpdater(ColumnDescriptor descriptor, DataType sparkType) {
    PrimitiveType.PrimitiveTypeName typeName = descriptor.getPrimitiveType().getPrimitiveTypeName();

    switch (typeName) {
      case BOOLEAN -> {
        if (sparkType == DataTypes.BooleanType) {
          return new BooleanUpdater();
        }
      }
      case INT32 -> {
        if (sparkType == DataTypes.IntegerType || canReadAsIntDecimal(descriptor, sparkType)) {
          return new IntegerUpdater();
        } else if (sparkType == DataTypes.LongType && isUnsignedIntTypeMatched(32)) {
          // In `ParquetToSparkSchemaConverter`, we map parquet UINT32 to our LongType.
          // For unsigned int32, it stores as plain signed int32 in Parquet when dictionary
          // fallbacks. We read them as long values.
          return new UnsignedIntegerUpdater();
        } else if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
          return new IntegerToLongUpdater();
        } else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
          return new IntegerToBinaryUpdater();
        } else if (sparkType == DataTypes.ByteType) {
          return new ByteUpdater();
        } else if (sparkType == DataTypes.ShortType) {
          return new ShortUpdater();
        } else if (sparkType == DataTypes.DoubleType) {
          return new IntegerToDoubleUpdater();
        } else if (sparkType == DataTypes.DateType) {
          if ("CORRECTED".equals(datetimeRebaseMode)) {
            return new IntegerUpdater();
          } else {
            boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
            return new IntegerWithRebaseUpdater(failIfRebase);
          }
        } else if (sparkType == DataTypes.TimestampNTZType && isDateTypeMatched(descriptor)) {
          if ("CORRECTED".equals(datetimeRebaseMode)) {
            return new DateToTimestampNTZUpdater();
          } else {
            boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
            return new DateToTimestampNTZWithRebaseUpdater(failIfRebase);
          }
        } else if (sparkType instanceof YearMonthIntervalType) {
          return new IntegerUpdater();
        } else if (canReadAsDecimal(descriptor, sparkType)) {
          return new IntegerToDecimalUpdater(descriptor, (DecimalType) sparkType);
        }
      }
      case INT64 -> {
        // This is where we implement support for the valid type conversions.
        if (sparkType == DataTypes.LongType || canReadAsLongDecimal(descriptor, sparkType)) {
          if (DecimalType.is32BitDecimalType(sparkType)) {
            return new DowncastLongUpdater();
          } else {
            return new LongUpdater();
          }
        } else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
          return new LongToBinaryUpdater();
        } else if (isLongDecimal(sparkType) && isUnsignedIntTypeMatched(64)) {
          // In `ParquetToSparkSchemaConverter`, we map parquet UINT64 to our Decimal(20, 0).
          // For unsigned int64, it stores as plain signed int64 in Parquet when dictionary
          // fallbacks. We read them as decimal values.
          return new UnsignedLongUpdater();
        } else if (sparkType == DataTypes.TimestampType &&
          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
          if ("CORRECTED".equals(datetimeRebaseMode)) {
            return new LongUpdater();
          } else {
            boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
            return new LongWithRebaseUpdater(failIfRebase, datetimeRebaseTz);
          }
        } else if (sparkType == DataTypes.TimestampType &&
          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
          if ("CORRECTED".equals(datetimeRebaseMode)) {
            return new LongAsMicrosUpdater();
          } else {
            final boolean failIfRebase = "EXCEPTION".equals(datetimeRebaseMode);
            return new LongAsMicrosRebaseUpdater(failIfRebase, datetimeRebaseTz);
          }
        } else if (sparkType == DataTypes.TimestampNTZType &&
          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MICROS)) {
          // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
          return new LongUpdater();
        } else if (sparkType == DataTypes.TimestampNTZType &&
          isTimestampTypeMatched(LogicalTypeAnnotation.TimeUnit.MILLIS)) {
          // TIMESTAMP_NTZ is a new data type and has no legacy files that need to do rebase.
          return new LongAsMicrosUpdater();
        } else if (sparkType instanceof DayTimeIntervalType) {
          return new LongUpdater();
        } else if (canReadAsDecimal(descriptor, sparkType)) {
          return new LongToDecimalUpdater(descriptor, (DecimalType) sparkType);
        } else if (sparkType instanceof TimeType) {
          return new LongUpdater();
        }
      }
      case FLOAT -> {
        if (sparkType == DataTypes.FloatType) {
          return new FloatUpdater();
        } else if (sparkType == DataTypes.DoubleType) {
          return new FloatToDoubleUpdater();
        }
      }
      case DOUBLE -> {
        if (sparkType == DataTypes.DoubleType) {
          return new DoubleUpdater();
        }
      }
      case INT96 -> {
        if (sparkType == DataTypes.TimestampNTZType) {
          // TimestampNTZ type does not require rebasing due to its lack of time zone context.
          return new BinaryToSQLTimestampUpdater();
        } else if (sparkType == DataTypes.TimestampType) {
          final boolean failIfRebase = "EXCEPTION".equals(int96RebaseMode);
          if (!shouldConvertTimestamps()) {
            if ("CORRECTED".equals(int96RebaseMode)) {
              return new BinaryToSQLTimestampUpdater();
            } else {
              return new BinaryToSQLTimestampRebaseUpdater(failIfRebase, int96RebaseTz);
            }
          } else {
            if ("CORRECTED".equals(int96RebaseMode)) {
              return new BinaryToSQLTimestampConvertTzUpdater(convertTz);
            } else {
              return new BinaryToSQLTimestampConvertTzRebaseUpdater(
                failIfRebase,
                convertTz,
                int96RebaseTz);
            }
          }
        }
      }
      case BINARY -> {
        if (sparkType instanceof  StringType || sparkType == DataTypes.BinaryType ||
          canReadAsBinaryDecimal(descriptor, sparkType)) {
          return new BinaryUpdater();
        } else if (canReadAsDecimal(descriptor, sparkType)) {
          return new BinaryToDecimalUpdater(descriptor, (DecimalType) sparkType);
        }
      }
      case FIXED_LEN_BYTE_ARRAY -> {
        int arrayLen = descriptor.getPrimitiveType().getTypeLength();
        if (canReadAsIntDecimal(descriptor, sparkType)) {
          return new FixedLenByteArrayAsIntUpdater(arrayLen);
        } else if (canReadAsLongDecimal(descriptor, sparkType)) {
          return new FixedLenByteArrayAsLongUpdater(arrayLen);
        } else if (canReadAsBinaryDecimal(descriptor, sparkType)) {
          return new FixedLenByteArrayUpdater(arrayLen);
        } else if (sparkType == DataTypes.BinaryType) {
          return new FixedLenByteArrayUpdater(arrayLen);
        } else if (canReadAsDecimal(descriptor, sparkType)) {
          return new FixedLenByteArrayToDecimalUpdater(descriptor, (DecimalType) sparkType);
        }
      }
      default -> {}
    }

    // If we get here, it means the combination of Spark and Parquet type is invalid or not
    // supported.
    throw constructConvertNotSupportedException(descriptor, sparkType);
  }