in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog.scala [291:325]
def convert(parameters: Map[String, String]): Map[String, String] = {
val nsTableName = parameters.get(TABLE_KEY).getOrElse(null)
// if the hbase.table is not defined, we assume it is json format already.
if (nsTableName == null) return parameters
val tableParts = nsTableName.trim.split(':')
val tableNamespace = if (tableParts.length == 1) {
"default"
} else if (tableParts.length == 2) {
tableParts(0)
} else {
throw new IllegalArgumentException("Invalid table name '" + nsTableName +
"' should be '<namespace>:<name>' or '<name>' ")
}
val tableName = tableParts(tableParts.length - 1)
val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
import scala.collection.JavaConverters._
val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(_._2.asInstanceOf[SchemaQualifierDefinition])
val rowkey = schemaMap.filter {
_.columnFamily == "rowkey"
}.map(_.columnName)
val cols = schemaMap.map { x =>
s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin
}
val jsonCatalog =
s"""{
|"table":{"namespace":"${tableNamespace}", "name":"${tableName}"},
|"rowkey":"${rowkey.mkString(":")}",
|"columns":{
|${cols.mkString(",")}
|}
|}
""".stripMargin
parameters ++ Map(HBaseTableCatalog.tableCatalog->jsonCatalog)
}