in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala [159:194]
def catalystDataType(cassandraType: DataType, nullable: Boolean): SparkSqlDataType = {
def fromUdt(udt: UserDefinedType): StructType = {
val fieldsAndType = udt.getFieldNames.asScala.zip(udt.getFieldTypes.asScala)
val structFields = fieldsAndType.map { case (fieldName, dataType) =>
StructField(
fieldName.asInternal(),
catalystDataType(dataType, nullable = true),
nullable = true)
}
StructType(structFields.asJava)
}
def fromTuple(t: TupleType): StructType = {
val structFields = t.getComponentTypes.asScala.zipWithIndex.map { case (dataType, index) =>
StructField(
index.toString,
catalystDataType(dataType, nullable = true),
nullable = true)
}
StructType(structFields.asJava)
}
cassandraType match {
case s: SetType => ArrayType(catalystDataType(s.getElementType, nullable), nullable)
case l: ListType => ArrayType(catalystDataType(l.getElementType, nullable), nullable)
case m: MapType => SparkSqlMapType(catalystDataType(m.getKeyType, nullable), catalystDataType(m.getValueType, nullable), nullable)
case udt: UserDefinedType => fromUdt(udt)
case t: TupleType => fromTuple(t)
case v: VectorType => ArrayType(catalystDataType(v.getElementType, nullable), nullable)
case VARINT =>
logWarning("VarIntType is mapped to catalystTypes.DecimalType with unlimited values.")
primitiveCatalystDataType(cassandraType)
case _ => primitiveCatalystDataType(cassandraType)
}
}