in spark-doris-connector/spark-doris-connector-base/src/main/scala/org/apache/doris/spark/util/RowConvertors.scala [67:120]
private def asScalaValue(row: SpecializedGetters, dataType: DataType, ordinal: Int): Any = {
if (row.isNullAt(ordinal)) null
else {
dataType match {
case NullType => NULL_VALUE
case BooleanType => row.getBoolean(ordinal)
case ByteType => row.getByte(ordinal)
case ShortType => row.getShort(ordinal)
case IntegerType => row.getInt(ordinal)
case LongType => row.getLong(ordinal)
case FloatType => row.getFloat(ordinal)
case DoubleType => row.getDouble(ordinal)
case StringType => Option(row.getUTF8String(ordinal)).map(_.toString).getOrElse(NULL_VALUE)
case TimestampType =>
DateTimeUtils.toJavaTimestamp(row.getLong(ordinal)).toString
case DateType => DateTimeUtils.toJavaDate(row.getInt(ordinal)).toString
case BinaryType => row.getBinary(ordinal)
case dt: DecimalType => row.getDecimal(ordinal, dt.precision, dt.scale).toJavaBigDecimal
case at: ArrayType =>
val arrayData = row.getArray(ordinal)
if (arrayData == null) NULL_VALUE
else {
(0 until arrayData.numElements()).map(i => {
if (arrayData.isNullAt(i)) null else asScalaValue(arrayData, at.elementType, i)
}).mkString("[", ",", "]")
}
case mt: MapType =>
val mapData = row.getMap(ordinal)
if (mapData.numElements() == 0) "{}"
else {
val keys = mapData.keyArray()
val values = mapData.valueArray()
val map = mutable.HashMap[Any, Any]()
var i = 0
while (i < keys.numElements()) {
map += asScalaValue(keys, mt.keyType, i) -> asScalaValue(values, mt.valueType, i)
i += 1
}
MAPPER.writeValueAsString(map)
}
case st: StructType =>
val structData = row.getStruct(ordinal, st.length)
val map = new java.util.TreeMap[String, Any]()
var i = 0
while (i < structData.numFields) {
val field = st.fields(i)
map.put(field.name, asScalaValue(structData, field.dataType, i))
i += 1
}
MAPPER.writeValueAsString(map)
case _ => throw new Exception(s"Unsupported spark type: ${dataType.typeName}")
}
}
}