in phoenix5-spark/src/main/scala/org/apache/phoenix/spark/PhoenixRDD.scala [114:147]
def toDataFrame(sqlContext: SQLContext): DataFrame = {
val columnInfoList = PhoenixConfigurationUtil
.getSelectColumnMetadataList(new Configuration(phoenixConf))
.asScala
// Keep track of the sql type and column names.
val columns: Seq[(String, Int)] = columnInfoList.map(ci => {
(ci.getDisplayName, ci.getSqlType)
})
// Lookup the Spark catalyst types from the Phoenix schema
val structType = SparkSchemaUtil.phoenixSchemaToCatalystSchema(columnInfoList, dateAsTimestamp)
// Create the data frame from the converted Spark schema
sqlContext.createDataFrame(map(pr => {
// Create a sequence of column data
val rowSeq = columns.map { case (name, sqlType) =>
val res = pr.resultMap(name)
// Special handling for data types
if (dateAsTimestamp && (sqlType == 91 || sqlType == 19) && res!=null) { // 91 is the defined type for Date and 19 for UNSIGNED_DATE
new java.sql.Timestamp(res.asInstanceOf[java.sql.Date].getTime)
} else if ((sqlType == 92 || sqlType == 18) && res!=null) { // 92 is the defined type for Time and 18 for UNSIGNED_TIME
new java.sql.Timestamp(res.asInstanceOf[java.sql.Time].getTime)
} else {
res
}
}
// Create a Spark Row from the sequence
Row.fromSeq(rowSeq)
}), structType)
}