def toDataFrame()

in phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala [114:147]


  def toDataFrame(sqlContext: SQLContext): DataFrame = {
    val columnInfoList = PhoenixConfigurationUtil
      .getSelectColumnMetadataList(new Configuration(phoenixConf))
      .asScala

    // Keep track of the sql type and column names.
    val columns: Seq[(String, Int)] = columnInfoList.map(ci => {
      (ci.getDisplayName, ci.getSqlType)
    })


    // Lookup the Spark catalyst types from the Phoenix schema
    val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp)

    // Create the data frame from the converted Spark schema
    sqlContext.createDataFrame(map(pr => {

      // Create a sequence of column data
      val rowSeq = columns.map { case (name, sqlType) =>
        val res = pr.resultMap(name)
          // Special handling for data types
          if (dateAsTimestamp && (sqlType == 91 || sqlType == 19) && res!=null) { // 91 is the defined type for Date and 19 for UNSIGNED_DATE
            new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
          } else if ((sqlType == 92 || sqlType == 18) && res!=null) { // 92 is the defined type for Time and 18 for UNSIGNED_TIME
            new java.sql.Timestamp(res.asInstanceOf[java.sql.Time].getTime)
          } else {
            res
          }
      }

      // Create a Spark Row from the sequence
      Row.fromSeq(rowSeq)
    }), structType)
  }