def convert()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog.scala [303:342]


  def convert(parameters: Map[String, String]): Map[String, String] = {
    val nsTableName = parameters.get(TABLE_KEY).getOrElse(null)
    // if the hbase.table is not defined, we assume it is json format already.
    if (nsTableName == null) return parameters
    val tableParts = nsTableName.trim.split(':')
    val tableNamespace = if (tableParts.length == 1) {
      "default"
    } else if (tableParts.length == 2) {
      tableParts(0)
    } else {
      throw new IllegalArgumentException(
        "Invalid table name '" + nsTableName +
          "' should be '<namespace>:<name>' or '<name>' ")
    }
    val tableName = tableParts(tableParts.length - 1)
    val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
    import scala.collection.JavaConverters._
    val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(
      _._2.asInstanceOf[SchemaQualifierDefinition])

    val rowkey = schemaMap
      .filter {
        _.columnFamily == "rowkey"
      }
      .map(_.columnName)
    val cols = schemaMap.map {
      x =>
        s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin
    }
    val jsonCatalog =
      s"""{
         |"table":{"namespace":"${tableNamespace}", "name":"${tableName}"},
         |"rowkey":"${rowkey.mkString(":")}",
         |"columns":{
         |${cols.mkString(",")}
         |}
         |}
       """.stripMargin
    parameters ++ Map(HBaseTableCatalog.tableCatalog -> jsonCatalog)
  }