def getSQLString()

in spark-connector/datasource/src/main/scala/org/apache/spark/sql/execution/datasources/v2/odps/OdpsTableCatalog.scala [576:676]


  def getSQLString(
      projectName: String,
      odpsSchema: String,
      tableName: String,
      schema: TableSchema,
      ifNotExists: Boolean,
      tableDefinition: CatalogTable): String = {
    val sb = new StringBuilder()
    if (tableDefinition.tableType != CatalogTableType.EXTERNAL) {
      sb.append("CREATE TABLE ")
    } else {
      sb.append("CREATE EXTERNAL TABLE ")
    }
    if (ifNotExists) {
      sb.append(" IF NOT EXISTS ")
    }
    sb.append(projectName)
    if (!StringUtils.isNullOrEmpty(odpsSchema)) sb.append(".").append(odpsSchema)
    sb.append(".`").append(tableName).append("` (")
    val columns = schema.getColumns
    var pColumns = 0
    while (pColumns < columns.size) {
      {
        val i = columns.get(pColumns)
        sb.append("`").append(i.getName).append("` ").append(i.getTypeInfo.getTypeName)
        if (i.getComment != null) sb.append(" COMMENT \'").append(i.getComment).append("\'")
        if (pColumns + 1 < columns.size) sb.append(',')
      }
      {
        pColumns += 1
      }
    }
    sb.append(')')
    tableDefinition.comment map (comment => sb.append(" COMMENT \'" + comment + "\' "))
    val partCols = schema.getPartitionColumns

    // partitioned by
    if (partCols.size > 0) {
      sb.append(" PARTITIONED BY (")
      var index = 0
      while (index < partCols.size) {
        val c = partCols.get(index)
        sb.append(c.getName).append(" ").append(c.getTypeInfo.getTypeName)
        if (c.getComment != null) {
          sb.append(" COMMENT \'").append(c.getComment).append("\'")
        }
        if (index + 1 < partCols.size) {
          sb.append(',')
        }
        index += 1
      }
      sb.append(')')
    }

    // clustered by
    tableDefinition.bucketSpec.map(bucketSpec => {
      sb.append(" CLUSTERED BY ")
      val bucketCols = bucketSpec.bucketColumnNames.mkString("(", ",", ")")
      sb.append(bucketCols)
      val sortCols = bucketSpec.sortColumnNames.mkString("(", ",", ")")
      if (sortCols.nonEmpty) {
        sb.append(" SORTED BY ").append(sortCols)
      }
      sb.append(" INTO ").append(bucketSpec.numBuckets).append(" BUCKETS")
    })

    // storage
    if (tableDefinition.tableType == CatalogTableType.EXTERNAL) {
      // external table
      require(tableDefinition.storage.locationUri.isDefined)
      val outputFormat = tableDefinition.storage.outputFormat.get
      val formatList = Set("PARQUET", "TEXTFILE", "ORC", "RCFILE", "AVRO", "SEQUENCEFILE")

      val outputFormatClause = if (formatList.contains(outputFormat.toUpperCase)) {
        s" STORED AS $outputFormat"
      } else {
        s" STORED BY '$outputFormat'"
      }
      sb.append(outputFormatClause)
      if (tableDefinition.storage.properties.nonEmpty) {
        val properties = tableDefinition.storage.properties
          .mkString(" WITH SERDEPROPERTIES (", ",", ")")
        sb.append(properties)
      }
      sb.append(s" LOCATION '${tableDefinition.storage.locationUri.get.toString}'")
    } else {
      // non-external table
      tableDefinition.storage.outputFormat foreach (format => sb.append(s" STORED AS $format"))
    }

    // table properties
    if (tableDefinition.properties.nonEmpty) {
      val props = tableDefinition.properties.map(x => {
        s"'${x._1}'='${x._2}'".stripMargin
      }) mkString("(", ",", ")")
      sb.append(" TBLPROPERTIES ").append(props)
    }

    sb.append(';')
    sb.toString
  }