override def generateCode()

in linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/scala/org/apache/linkis/metadata/ddl/ImportDDLCreator.scala [166:346]


  override def generateCode(mdqTableBO: MdqTableBO, user: String): String = {
    logger.info(
      s"begin to generate code for ${mdqTableBO.getTableBaseInfo.getBase.getName} using Hive way"
    )
    val executeCode = new StringBuilder
    val args = mdqTableBO.getImportInfo.getArgs
    val destinationDatabase = mdqTableBO.getTableBaseInfo.getBase.getDatabase
    val destinationTable = mdqTableBO.getTableBaseInfo.getBase.getName
    if (StringUtils.isEmpty(destinationDatabase) || StringUtils.isEmpty(destinationTable)) {
      logger.error("Hive create table destination database or tablename is null")
      throw MdqIllegalParamException(HIVE_CREATE_IS_NULL.getErrorDesc)
    }
    val sourceDatabase =
      if (StringUtils.isEmpty(args.get(DATABASE))) {
        throw MdqIllegalParamException(HIVE_CREATE__TABLE_IS_NULL.getErrorDesc)
      } else {
        args.get(DATABASE)
      }
    val sourceTableName =
      if (StringUtils.isEmpty(args.get(TABLE))) {
        throw MdqIllegalParamException(HIVE_CREATE__TABLE_IS_NULL.getErrorDesc)
      } else {
        args.get(TABLE)
      }
    // 判断目标表是否是分区表,如果是分区表,先建表
    val isPartitionTable = mdqTableBO.getTableBaseInfo.getBase.getPartitionTable
    if (isPartitionTable != null && isPartitionTable == true) {
      // 是分区表,先建表,再插入数据
      executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
      executeCode
        .append(CREATE_TABLE)
        .append(destinationDatabase)
        .append(".")
        .append(destinationTable)
        .append(SPACE)
      executeCode.append(LEFT_PARENTHESES)
      val fields = mdqTableBO.getTableFieldsInfo
      val createFieldsArray = new ArrayBuffer[String]()
      val insertFieldsArray = new ArrayBuffer[String]()
      var dsCount = 0
      var partitionValue: String = null
      // 建表
      fields.asScala foreach { field =>
        val name = field.getName
        val _type = field.getType
        val desc = field.getComment
        if (!DEFAULT_PARTITION_NAME.getValue.equals(name)) {
          if (StringUtils.isNotEmpty(desc)) {
            createFieldsArray += (name + SPACE + _type + SPACE + COMMENT + SPACE + SINGLE_MARK + desc + SINGLE_MARK)
            insertFieldsArray += name
          } else {
            createFieldsArray += (name + SPACE + _type)
            insertFieldsArray += name
          }
        } else {
          dsCount += 1
          if (StringUtils.isNotBlank(field.getPartitionsValue)) {
            partitionValue = field.getPartitionsValue
          }
        }
      }
      executeCode
        .append(createFieldsArray.mkString(COMMA + SPACE))
        .append(RIGHT_PARENTHESES)
        .append(SPACE)
      executeCode.append(DEFAULT_PARTITION_DESC).append(SPACE)
      executeCode.append(STORED_AS_ORC)
      executeCode.append(MARKS).append(RIGHT_PARENTHESES).append(LINE_BREAK)
      // 判断源表是否是分区表
      val isSourceTablePartition: Boolean = checkPartitionTable(fields)
      val standardDs =
        if (StringUtils.isNotBlank(partitionValue)) {
          partitionValue
        } else {
          new SimpleDateFormat("yyyyMMdd").format(new java.util.Date(System.currentTimeMillis()))
        }
      if (!isSourceTablePartition) {
        // 插入数据
        executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
        executeCode
          .append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
          .append(SPACE)
        executeCode
          .append("partition")
          .append(LEFT_PARENTHESES)
          .append("ds=")
          .append("\"")
          .append(standardDs)
          .append("\"")
          .append(RIGHT_PARENTHESES)
          .append(SPACE)
        executeCode
          .append(AS_SELECT)
          .append(SPACE)
          .append(insertFieldsArray.mkString(COMMA))
          .append(SPACE)
        executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
        executeCode.append(MARKS).append(RIGHT_PARENTHESES)
      } else {
        // 如果源表有ds字段,那么doubleDs为true,如果为true,需要设置动态分区插入
        val doubleDs: Boolean = dsCount >= 2
        if (doubleDs) {
          // 动态分区插入
          executeCode.append(DYNAMIC_PAR).append(LINE_BREAK)
          executeCode.append(DYNAMIC_MODE).append(LINE_BREAK)
          executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
          executeCode
            .append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
            .append(SPACE)
          executeCode
            .append("partition")
            .append(LEFT_PARENTHESES)
            .append("ds")
            .append(RIGHT_PARENTHESES)
            .append(SPACE)
          executeCode
            .append(AS_SELECT)
            .append(SPACE)
            .append(insertFieldsArray.mkString(COMMA))
            .append(COMMA)
            .append("ds")
            .append(SPACE)
          executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
          executeCode.append(MARKS).append(RIGHT_PARENTHESES)
        } else {
          // 直接插入
          executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
          executeCode
            .append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
            .append(SPACE)
          executeCode
            .append("partition")
            .append(LEFT_PARENTHESES)
            .append("ds=")
            .append("\"")
            .append(standardDs)
            .append("\"")
            .append(RIGHT_PARENTHESES)
            .append(SPACE)
          executeCode
            .append(AS_SELECT)
            .append(SPACE)
            .append(insertFieldsArray.mkString(COMMA))
            .append(SPACE)
          executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
          executeCode.append(MARKS).append(RIGHT_PARENTHESES)
        }
      }
    } else {
      // 如果目标表不是分区表,直接将create table as select * from ...
      executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
      executeCode
        .append(CREATE_TABLE)
        .append(destinationDatabase)
        .append(".")
        .append(destinationTable)
        .append(SPACE)
      executeCode.append(AS_SELECT).append(SPACE)
      val fields = mdqTableBO.getTableFieldsInfo
      if (fields.isEmpty) {
        // 如果是空 默认用 *
        executeCode.append("*").append(SPACE)
      } else {
        val fieldArr = new ArrayBuffer[String]()
        fields.asScala filter (_ != null) foreach (fieldArr += _.getName)
        executeCode.append(fieldArr.mkString(", ")).append(SPACE)
      }
      executeCode
        .append(FROM)
        .append(SPACE)
        .append(sourceDatabase)
        .append(".")
        .append(sourceTableName)
      executeCode.append(MARKS).append(RIGHT_PARENTHESES)
    }
    val resultCode = executeCode.toString()
    logger.info(
      s"end to generate code for ${mdqTableBO.getTableBaseInfo.getBase.getName} code is $resultCode"
    )
    resultCode
  }