def catalystDataType()

in connector/src/main/scala/com/datastax/spark/connector/datasource/CassandraSourceUtil.scala [159:194]


  def catalystDataType(cassandraType: DataType, nullable: Boolean): SparkSqlDataType = {

    def fromUdt(udt: UserDefinedType): StructType = {
      val fieldsAndType = udt.getFieldNames.asScala.zip(udt.getFieldTypes.asScala)
      val structFields = fieldsAndType.map { case (fieldName, dataType) =>
        StructField(
          fieldName.asInternal(),
          catalystDataType(dataType, nullable = true),
          nullable = true)
      }
      StructType(structFields.asJava)
    }

    def fromTuple(t: TupleType): StructType = {
      val structFields = t.getComponentTypes.asScala.zipWithIndex.map { case (dataType, index) =>
        StructField(
          index.toString,
          catalystDataType(dataType, nullable = true),
          nullable = true)
      }
      StructType(structFields.asJava)
    }

    cassandraType match {
      case s: SetType => ArrayType(catalystDataType(s.getElementType, nullable), nullable)
      case l: ListType => ArrayType(catalystDataType(l.getElementType, nullable), nullable)
      case m: MapType => SparkSqlMapType(catalystDataType(m.getKeyType, nullable), catalystDataType(m.getValueType, nullable), nullable)
      case udt: UserDefinedType => fromUdt(udt)
      case t: TupleType => fromTuple(t)
      case v: VectorType => ArrayType(catalystDataType(v.getElementType, nullable), nullable)
      case VARINT =>
        logWarning("VarIntType is mapped to catalystTypes.DecimalType with unlimited values.")
        primitiveCatalystDataType(cassandraType)
      case _ => primitiveCatalystDataType(cassandraType)
    }
  }