in hudi-client/hudi-spark-client/src/main/scala/org/apache/spark/sql/HoodieInternalRowUtils.scala [258:417]
private def newWriterRenaming(prevDataType: DataType,
newDataType: DataType,
renamedColumnsMap: JMap[String, String],
fieldNameStack: JDeque[String]): RowFieldUpdater = {
(newDataType, prevDataType) match {
case (newType, prevType) if prevType.sql == newType.sql =>
(fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value)
case (newStructType: StructType, prevStructType: StructType) =>
modifiableRowWriter(prevStructType, newStructType, renamedColumnsMap, fieldNameStack)
case (ArrayType(newElementType, _), ArrayType(prevElementType, containsNull)) =>
fieldNameStack.push("element")
val elementWriter = newWriterRenaming(prevElementType, newElementType, renamedColumnsMap, fieldNameStack)
fieldNameStack.pop()
(fieldUpdater, ordinal, value) => {
val prevArrayData = value.asInstanceOf[ArrayData]
val prevArray = prevArrayData.toObjectArray(prevElementType)
val newArrayData = createArrayData(newElementType, prevArrayData.numElements())
val elementUpdater = new ArrayDataUpdater(newArrayData)
var i = 0
while (i < prevArray.length) {
val element = prevArray(i)
if (element == null) {
if (!containsNull) {
throw new HoodieException(
s"Array value at path '${fieldNameStack.asScala.mkString(".")}' is not allowed to be null")
} else {
elementUpdater.setNullAt(i)
}
} else {
elementWriter(elementUpdater, i, element)
}
i += 1
}
fieldUpdater.set(ordinal, newArrayData)
}
case (MapType(_, newValueType, _), MapType(_, prevValueType, valueContainsNull)) =>
fieldNameStack.push("value")
val valueWriter = newWriterRenaming(prevValueType, newValueType, renamedColumnsMap, fieldNameStack)
fieldNameStack.pop()
(updater, ordinal, value) =>
val mapData = value.asInstanceOf[MapData]
val prevKeyArrayData = mapData.keyArray
val prevValueArrayData = mapData.valueArray
val prevValueArray = prevValueArrayData.toObjectArray(prevValueType)
val newValueArray = createArrayData(newValueType, mapData.numElements())
val valueUpdater = new ArrayDataUpdater(newValueArray)
var i = 0
while (i < prevValueArray.length) {
val value = prevValueArray(i)
if (value == null) {
if (!valueContainsNull) {
throw new HoodieException(s"Map value at path ${fieldNameStack.asScala.mkString(".")} is not allowed to be null")
} else {
valueUpdater.setNullAt(i)
}
} else {
valueWriter(valueUpdater, i, value)
}
i += 1
}
// NOTE: Key's couldn't be transformed and have to always be of [[StringType]]
updater.set(ordinal, new ArrayBasedMapData(prevKeyArrayData, newValueArray))
case (newDecimal: DecimalType, _) =>
prevDataType match {
case IntegerType | LongType | FloatType | DoubleType | StringType =>
(fieldUpdater, ordinal, value) =>
val scale = newDecimal.scale
// TODO this has to be revisited to avoid loss of precision (for fps)
fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(BigDecimal(value.toString).setScale(scale, ROUND_HALF_UP)))
case _: DecimalType =>
(fieldUpdater, ordinal, value) =>
fieldUpdater.setDecimal(ordinal, Decimal.fromDecimal(value.asInstanceOf[Decimal].toBigDecimal.setScale(newDecimal.scale)))
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: ShortType, _) =>
prevDataType match {
case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setShort(ordinal, value.asInstanceOf[Byte].toShort)
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: IntegerType, _) =>
prevDataType match {
case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Short].toInt)
case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setInt(ordinal, value.asInstanceOf[Byte].toInt)
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: LongType, _) =>
prevDataType match {
case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Int].toLong)
case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Short].toLong)
case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setLong(ordinal, value.asInstanceOf[Byte].toLong)
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: FloatType, _) =>
prevDataType match {
case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Long].toFloat)
case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Int].toFloat)
case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Short].toFloat)
case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setFloat(ordinal, value.asInstanceOf[Byte].toFloat)
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: DoubleType, _) =>
prevDataType match {
case _: FloatType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Float].toDouble)
case _: LongType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Long].toDouble)
case _: IntegerType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Int].toDouble)
case _: ShortType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Short].toDouble)
case _: ByteType => (fieldUpdater, ordinal, value) => fieldUpdater.setDouble(ordinal, value.asInstanceOf[Byte].toDouble)
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (_: BinaryType, _: StringType) =>
(fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, value.asInstanceOf[UTF8String].getBytes)
// TODO revisit this (we need to align permitted casting w/ Spark)
// NOTE: This is supported to stay compatible w/ [[HoodieAvroUtils.rewriteRecordWithNewSchema]]
case (_: StringType, _) =>
prevDataType match {
case BinaryType => (fieldUpdater, ordinal, value) =>
fieldUpdater.set(ordinal, UTF8String.fromBytes(value.asInstanceOf[Array[Byte]]))
case DateType => (fieldUpdater, ordinal, value) =>
fieldUpdater.set(ordinal, UTF8String.fromString(toJavaDate(value.asInstanceOf[Integer]).toString))
case IntegerType | LongType | FloatType | DoubleType | _: DecimalType =>
(fieldUpdater, ordinal, value) => fieldUpdater.set(ordinal, UTF8String.fromString(value.toString))
case _ =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
case (DateType, StringType) =>
(fieldUpdater, ordinal, value) =>
fieldUpdater.set(ordinal, CatalystTypeConverters.convertToCatalyst(java.sql.Date.valueOf(value.toString)))
case (_, _) =>
throw new IllegalArgumentException(s"$prevDataType and $newDataType are incompatible")
}
}