in connector/src/main/scala/org/apache/spark/sql/cassandra/DataTypeConverter.scala [63:89]
def catalystDataType(cassandraType: connector.types.ColumnType[_], nullable: Boolean): catalystTypes.DataType = {
def catalystStructField(field: UDTFieldDef): catalystTypes.StructField =
catalystTypes.StructField(
field.columnName,
catalystDataType(field.columnType, nullable = true),
nullable = true)
def catalystStructFieldFromTuple(field: TupleFieldDef): catalystTypes.StructField =
catalystTypes.StructField(
field.index.toString,
catalystDataType(field.columnType, nullable = true),
nullable = true)
cassandraType match {
case connector.types.SetType(et, _) => catalystTypes.ArrayType(catalystDataType(et, nullable), nullable)
case connector.types.ListType(et, _) => catalystTypes.ArrayType(catalystDataType(et, nullable), nullable)
case connector.types.VectorType(et, _) => catalystTypes.ArrayType(catalystDataType(et, nullable), nullable)
case connector.types.MapType(kt, vt, _) => catalystTypes.MapType(catalystDataType(kt, nullable), catalystDataType(vt, nullable), nullable)
case connector.types.UserDefinedType(_, fields, _) => catalystTypes.StructType(fields.map(catalystStructField))
case connector.types.TupleType(fields @ _* ) => catalystTypes.StructType(fields.map(catalystStructFieldFromTuple))
case connector.types.VarIntType =>
logWarning("VarIntType is mapped to catalystTypes.DecimalType with unlimited values.")
primitiveCatalystDataType(cassandraType)
case _ => primitiveCatalystDataType(cassandraType)
}
}