private def newWriterRenaming()

in hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala [258:417]


  private def newWriterRenaming(prevDataType: DataType,
                                newDataType: DataType,
                                renamedColumnsMap: JMap[String, String],
                                fieldNameStack: JDeque[String]): RowFieldUpdater = {
    (newDataType, prevDataType) match {
      case (newType, prevType) if prevType.sql == newType.sql =>
        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)

      case (newStructType: StructType, prevStructType: StructType) =>
        modifiableRowWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)

      case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
        fieldNameStack.push("element")
        val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
        fieldNameStack.pop()

        (fieldUpdater, ordinal, value) => {
          val prevArrayData = value.asInstanceOf[ArrayData]
          val prevArray = prevArrayData.toObjectArray(prevElementType)

          val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
          val elementUpdater = new ArrayDataUpdater(newArrayData)

          var i = 0
          while (i < prevArray.length) {
            val element = prevArray(i)
            if (element == null) {
              if (!containsNull) {
                throw new HoodieException(
                  s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
              } else {
                elementUpdater.setNullAt(i)
              }
            } else {
              elementWriter(elementUpdater, i, element)
            }
            i += 1
          }

          fieldUpdater.set(ordinal, newArrayData)
        }

      case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
        fieldNameStack.push("value")
        val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
        fieldNameStack.pop()

        (updater, ordinal, value) =>
          val mapData = value.asInstanceOf[MapData]
          val prevKeyArrayData = mapData.keyArray
          val prevValueArrayData = mapData.valueArray
          val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)

          val newValueArray = createArrayData(newValueType, mapData.numElements())
          val valueUpdater = new ArrayDataUpdater(newValueArray)
          var i = 0
          while (i < prevValueArray.length) {
            val value = prevValueArray(i)
            if (value == null) {
              if (!valueContainsNull) {
                throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
              } else {
                valueUpdater.setNullAt(i)
              }
            } else {
              valueWriter(valueUpdater, i, value)
            }
            i += 1
          }

          // NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
          updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))

      case (newDecimal: DecimalType, _) =>
        prevDataType match {
          case IntegerType | LongType | FloatType | DoubleType | StringType =>
            (fieldUpdater, ordinal, value) =>
              val scale = newDecimal.scale
              // TODO this has to be revisited to avoid loss of precision (for fps)
              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_UP)))

          case _: DecimalType =>
            (fieldUpdater, ordinal, value) =>
              fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))

          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: ShortType, _) =>
        prevDataType match {
          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: IntegerType, _) =>
        prevDataType match {
          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: LongType, _) =>
        prevDataType match {
          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: FloatType, _) =>
        prevDataType match {
          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: DoubleType, _) =>
        prevDataType match {
          case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
          case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
          case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
          case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
          case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (_: BinaryType, _: StringType) =>
        (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)

      // TODO revisit this (we need to align permitted casting w/ Spark)
      // NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
      case (_: StringType, _) =>
        prevDataType match {
          case BinaryType => (fieldUpdater, ordinal, value) =>
            fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
          case DateType => (fieldUpdater, ordinal, value) =>
            fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
          case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
            (fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))

          case _ =>
            throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
        }

      case (DateType, StringType) =>
        (fieldUpdater, ordinal, value) =>
          fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))

      case (_, _) =>
        throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
    }
  }