def convert()

in spark/hbase-spark/src/main/scala/org/apache/hadoop/hbase/spark/datasources/HBaseTableCatalog.scala [291:325]


  def convert(parameters: Map[String, String]): Map[String, String] = {
    val nsTableName = parameters.get(TABLE_KEY).getOrElse(null)
    // if the hbase.table is not defined, we assume it is json format already.
    if (nsTableName == null) return parameters
    val tableParts = nsTableName.trim.split(':')
    val tableNamespace = if (tableParts.length == 1) {
        "default"
    } else if (tableParts.length == 2) {
        tableParts(0)
    } else {
        throw new IllegalArgumentException("Invalid table name '" + nsTableName +
            "' should be '<namespace>:<name>' or '<name>' ")
    }
    val tableName = tableParts(tableParts.length - 1)
    val schemaMappingString = parameters.getOrElse(SCHEMA_COLUMNS_MAPPING_KEY, "")
    import scala.collection.JavaConverters._
    val schemaMap = generateSchemaMappingMap(schemaMappingString).asScala.map(_._2.asInstanceOf[SchemaQualifierDefinition])

    val rowkey = schemaMap.filter {
      _.columnFamily == "rowkey"
    }.map(_.columnName)
    val cols = schemaMap.map { x =>
      s""""${x.columnName}":{"cf":"${x.columnFamily}", "col":"${x.qualifier}", "type":"${x.colType}"}""".stripMargin
    }
    val jsonCatalog =
      s"""{
         |"table":{"namespace":"${tableNamespace}", "name":"${tableName}"},
         |"rowkey":"${rowkey.mkString(":")}",
         |"columns":{
         |${cols.mkString(",")}
         |}
         |}
       """.stripMargin
    parameters ++ Map(HBaseTableCatalog.tableCatalog->jsonCatalog)
  }