def apply[T : TypeTag : ColumnMapper]()

in driver/src/main/scala/com/datastax/spark/connector/mapper/MappedToGettableDataConverter.scala [39:235]


  def apply[T : TypeTag : ColumnMapper](
    struct: StructDef,
    columnSelection: IndexedSeq[ColumnRef],
    forceClassLoader: Option[ClassLoader] = None): TypeConverter[struct.ValueRepr] =

    new TypeConverter[struct.ValueRepr] {

      /** To determine which default column mapper to use for UDT fields.
        * If this class is a Java bean, then the UDT fields will be mapped with
        * the `JavaBeanColumnMapper` */
      private val isJavaBean =
        implicitly[ColumnMapper[T]].isInstanceOf[JavaBeanColumnMapper[_]]

      private val columnMap =
        implicitly[ColumnMapper[T]].columnMapForWriting(struct, columnSelection)

      /** Returns the column mapper associated with the given type.
        * Used to find column mappers for UDT columns.
        * For tuples, returns [[com.datastax.spark.connector.mapper.TupleColumnMapper TupleColumnMapper]],
        * for Java beans uses
        * [[com.datastax.spark.connector.mapper.JavaBeanColumnMapper JavaBeanColumnMapper]],
        * and for everything else uses
        * [[com.datastax.spark.connector.mapper.DefaultColumnMapper DefaultColumnMapper]] */
      private def columnMapper[U: TypeTag]: ColumnMapper[U] = {
        logDebug(s"Finding a UDT ColumnMapper for typeTag ${typeTag[U]}")
        val tpe = typeTag[U].tpe
        if (ReflectionUtil.isScalaTuple(tpe))
          new TupleColumnMapper[U]
        else if (isJavaBean)
          new JavaBeanColumnMapper[U]()(ReflectionUtil.classTag[U])
        else
          new DefaultColumnMapper[U]
      }

      /** Returns true for tuple types that provide full type information of their components */
      private def isTypedTuple(sym: Symbol): Boolean =
        ReflectionUtil.isScalaTuple(sym) ||
          sym == Symbols.PairSymbol ||
          sym == Symbols.TripleSymbol

      /** Returns a converter for converting given column to appropriate type savable to Cassandra */
      private def converter[U : TypeTag](columnType: ColumnType[_]): TypeConverter[_ <: AnyRef] = {
        {
          val scalaType = typeTag[U].tpe

          logDebug(s"Getting converter for $columnType to $scalaType")

          (columnType, scalaType) match {
            // Collections need recursive call to get the converter for the collection elements
            // to handle nested UDT values and tuples properly.
            case (ListType(argColumnType, _), TypeRef(_, _, List(argScalaType))) =>
              val argConverter = converter(argColumnType, argScalaType)
              TypeConverter.javaArrayListConverter(argConverter)

            case (SetType(argColumnType, _), TypeRef(_, _, List(argScalaType))) =>
              val argConverter = converter(argColumnType, argScalaType)
              TypeConverter.javaHashSetConverter(argConverter)

            case (MapType(keyColumnType, valueColumnType, _),
                  TypeRef(_, _, List(keyScalaType, valueScalaType))) =>
              val keyConverter = converter(keyColumnType, keyScalaType)
              val valueConverter = converter(valueColumnType, valueScalaType)
              TypeConverter.javaHashMapConverter(keyConverter, valueConverter)

            case (VectorType(argColumnType, dimension), TypeRef(_, _, List(argScalaType))) =>
              val argConverter = converter(argColumnType, argScalaType)
              TypeConverter.cqlVectorConverter(dimension)(argConverter.asInstanceOf[TypeConverter[Number]])

            case (tt @ TupleType(argColumnType1, argColumnType2),
                  TypeRef(_, Symbols.PairSymbol, List(argScalaType1, argScalaType2))) =>
              val c1 = converter(argColumnType1.columnType, argScalaType1)
              val c2 = converter(argColumnType2.columnType, argScalaType2)
              tt.converterToCassandra(IndexedSeq(c1, c2))

            case (tt @ TupleType(argColumnType1, argColumnType2, argColumnType3),
                  TypeRef(_, Symbols.TripleSymbol, List(argScalaType1, argScalaType2, argScalaType3))) =>
              val c1 = converter(argColumnType1.columnType, argScalaType1)
              val c2 = converter(argColumnType2.columnType, argScalaType2)
              val c3 = converter(argColumnType3.columnType, argScalaType3)
              tt.converterToCassandra(IndexedSeq(c1, c2, c3))

            // If tuple with mismatched number of components, don't continue:
            case (tt: TupleType, TypeRef(_, symbol, args))
                if isTypedTuple(symbol) && tt.columns.length != args.length =>
              throw new IllegalArgumentException(
                s"Expected ${tt.columns.length} components in the tuple, " +
                  s"instead of ${args.length} in $scalaType")

            // UDTValue, TupleValue:
            case (t: StructDef, _) if scalaType <:< typeOf[GettableByIndexData] =>
              columnType.converterToCassandra

            //Options
            case (t: StructDef, TypeRef(_, _, List(argScalaType))) if scalaType <:< typeOf[Option[Any]] =>
              type U2 = u2 forSome {type u2}
              implicit val tt = ReflectionUtil.typeToTypeTag[U2](argScalaType)
              implicit val cm: ColumnMapper[U2] = columnMapper[U2]
              apply[U2](t, t.columnRefs, Some(childClassloader))

            // UDTs mapped to case classes and tuples mapped to Scala tuples.
            // ColumnMappers support mapping Scala tuples, so we don't need a special case for them.
            case (t: StructDef, _) =>
              implicit val cm: ColumnMapper[U] = columnMapper[U]
              apply[U](t, t.columnRefs, Some(childClassloader))

            // Primitive types
            case _ =>
              columnType.converterToCassandra
          }
        }
      }

      /** Returns a converter that can convert
        * a Scala type given by `Type` to a Cassandra value of given `ColumnType` */
      private def converter(columnType: ColumnType[_], tpe: Type): TypeConverter[_ <: AnyRef] = {
        type U = u forSome {type u}
        implicit val tt = ReflectionUtil.typeToTypeTag[U](tpe)
        converter[U](columnType)
      }

      @transient
      private val tpe = typeTag[T].tpe

      @transient
      private val mirror = forceClassLoader match {
        case Some(cl) => runtimeMirror(cl)
        case None => typeTag[T].mirror
      }

      /**
        * All converters descended from this converter should use the same classloader even if
        * this class (MappedToGettableDataConverter) happens to be on a different classloader.
        */
      @transient
      private val childClassloader = mirror.classLoader

      logDebug(s"Finding a class for $tpe in ${mirror.classLoader}")
      private val cls = mirror.runtimeClass(typeTag[T].tpe).asInstanceOf[Class[T]]
      private val typeName = tpe.toString

      val columnNames =
        columnSelection.map(_.columnName)

      private val getterByColumnName = columnMap.getters.map {
        case (name, colRef) => (colRef.columnName, name)
      }

      private val getters =
        columnNames.map(getterByColumnName)

      @transient
      private val scalaTypes: IndexedSeq[Type] =
        getters.map(ReflectionUtil.returnType(tpe, _))

      private val columnTypes: IndexedSeq[ColumnType[_]] =
        columnNames.map(c => struct.columnByName(c).columnType)

      private val extractor =
        new PropertyExtractor(cls, getters)

      private val converters = {
        for (i <- columnNames.indices) yield {
          try {
            val ct = columnTypes(i)
            val st = scalaTypes(i)
            converter(ct, st)
          } catch {
            case NonFatal(e) =>
              throw new IllegalArgumentException(
                s"""Failed to get converter for field "${getters(i)}"
                   |of type ${scalaTypes(i)} in $typeName
                   |mapped to column "${columnNames(i)}" of "${struct.name}"
                   |""".stripMargin.replaceAll("\n", " "), e)
          }
        }
      }

      override def targetTypeTag = typeTag[struct.ValueRepr]


      override def convertPF = {
        case obj if cls.isInstance(obj) =>
          val columnValues = extractor.extract(obj.asInstanceOf[T])
          for (i <- columnValues.indices)
            columnValues(i) = converters(i).convert(columnValues(i))
          struct.newInstance(columnValues: _*)
        case obj if obj == null =>
          null.asInstanceOf[struct.ValueRepr]
        case Some(obj) if cls.isInstance(obj) =>
          val columnValues = extractor.extract(obj.asInstanceOf[T])
          for (i <- columnValues.indices)
            columnValues(i) = converters(i).convert(columnValues(i))
          struct.newInstance(columnValues: _*)
        case None =>
          null.asInstanceOf[struct.ValueRepr]
      }
    }