in linkis-public-enhancements/linkis-datasource/linkis-metadata/src/main/scala/org/apache/linkis/metadata/ddl/ImportDDLCreator.scala [166:346]
override def generateCode(mdqTableBO: MdqTableBO, user: String): String = {
logger.info(
s"begin to generate code for ${mdqTableBO.getTableBaseInfo.getBase.getName} using Hive way"
)
val executeCode = new StringBuilder
val args = mdqTableBO.getImportInfo.getArgs
val destinationDatabase = mdqTableBO.getTableBaseInfo.getBase.getDatabase
val destinationTable = mdqTableBO.getTableBaseInfo.getBase.getName
if (StringUtils.isEmpty(destinationDatabase) || StringUtils.isEmpty(destinationTable)) {
logger.error("Hive create table destination database or tablename is null")
throw MdqIllegalParamException(HIVE_CREATE_IS_NULL.getErrorDesc)
}
val sourceDatabase =
if (StringUtils.isEmpty(args.get(DATABASE))) {
throw MdqIllegalParamException(HIVE_CREATE__TABLE_IS_NULL.getErrorDesc)
} else {
args.get(DATABASE)
}
val sourceTableName =
if (StringUtils.isEmpty(args.get(TABLE))) {
throw MdqIllegalParamException(HIVE_CREATE__TABLE_IS_NULL.getErrorDesc)
} else {
args.get(TABLE)
}
// 判断目标表是否是分区表,如果是分区表,先建表
val isPartitionTable = mdqTableBO.getTableBaseInfo.getBase.getPartitionTable
if (isPartitionTable != null && isPartitionTable == true) {
// 是分区表,先建表,再插入数据
executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
executeCode
.append(CREATE_TABLE)
.append(destinationDatabase)
.append(".")
.append(destinationTable)
.append(SPACE)
executeCode.append(LEFT_PARENTHESES)
val fields = mdqTableBO.getTableFieldsInfo
val createFieldsArray = new ArrayBuffer[String]()
val insertFieldsArray = new ArrayBuffer[String]()
var dsCount = 0
var partitionValue: String = null
// 建表
fields.asScala foreach { field =>
val name = field.getName
val _type = field.getType
val desc = field.getComment
if (!DEFAULT_PARTITION_NAME.getValue.equals(name)) {
if (StringUtils.isNotEmpty(desc)) {
createFieldsArray += (name + SPACE + _type + SPACE + COMMENT + SPACE + SINGLE_MARK + desc + SINGLE_MARK)
insertFieldsArray += name
} else {
createFieldsArray += (name + SPACE + _type)
insertFieldsArray += name
}
} else {
dsCount += 1
if (StringUtils.isNotBlank(field.getPartitionsValue)) {
partitionValue = field.getPartitionsValue
}
}
}
executeCode
.append(createFieldsArray.mkString(COMMA + SPACE))
.append(RIGHT_PARENTHESES)
.append(SPACE)
executeCode.append(DEFAULT_PARTITION_DESC).append(SPACE)
executeCode.append(STORED_AS_ORC)
executeCode.append(MARKS).append(RIGHT_PARENTHESES).append(LINE_BREAK)
// 判断源表是否是分区表
val isSourceTablePartition: Boolean = checkPartitionTable(fields)
val standardDs =
if (StringUtils.isNotBlank(partitionValue)) {
partitionValue
} else {
new SimpleDateFormat("yyyyMMdd").format(new java.util.Date(System.currentTimeMillis()))
}
if (!isSourceTablePartition) {
// 插入数据
executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
executeCode
.append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
.append(SPACE)
executeCode
.append("partition")
.append(LEFT_PARENTHESES)
.append("ds=")
.append("\"")
.append(standardDs)
.append("\"")
.append(RIGHT_PARENTHESES)
.append(SPACE)
executeCode
.append(AS_SELECT)
.append(SPACE)
.append(insertFieldsArray.mkString(COMMA))
.append(SPACE)
executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
executeCode.append(MARKS).append(RIGHT_PARENTHESES)
} else {
// 如果源表有ds字段,那么doubleDs为true,如果为true,需要设置动态分区插入
val doubleDs: Boolean = dsCount >= 2
if (doubleDs) {
// 动态分区插入
executeCode.append(DYNAMIC_PAR).append(LINE_BREAK)
executeCode.append(DYNAMIC_MODE).append(LINE_BREAK)
executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
executeCode
.append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
.append(SPACE)
executeCode
.append("partition")
.append(LEFT_PARENTHESES)
.append("ds")
.append(RIGHT_PARENTHESES)
.append(SPACE)
executeCode
.append(AS_SELECT)
.append(SPACE)
.append(insertFieldsArray.mkString(COMMA))
.append(COMMA)
.append("ds")
.append(SPACE)
executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
executeCode.append(MARKS).append(RIGHT_PARENTHESES)
} else {
// 直接插入
executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
executeCode
.append(INSERT_OVERWRITE + destinationDatabase + "." + destinationTable)
.append(SPACE)
executeCode
.append("partition")
.append(LEFT_PARENTHESES)
.append("ds=")
.append("\"")
.append(standardDs)
.append("\"")
.append(RIGHT_PARENTHESES)
.append(SPACE)
executeCode
.append(AS_SELECT)
.append(SPACE)
.append(insertFieldsArray.mkString(COMMA))
.append(SPACE)
executeCode.append(FROM).append(sourceDatabase).append(".").append(sourceTableName)
executeCode.append(MARKS).append(RIGHT_PARENTHESES)
}
}
} else {
// 如果目标表不是分区表,直接将create table as select * from ...
executeCode.append(SPARK_SQL).append(LEFT_PARENTHESES).append(MARKS)
executeCode
.append(CREATE_TABLE)
.append(destinationDatabase)
.append(".")
.append(destinationTable)
.append(SPACE)
executeCode.append(AS_SELECT).append(SPACE)
val fields = mdqTableBO.getTableFieldsInfo
if (fields.isEmpty) {
// 如果是空 默认用 *
executeCode.append("*").append(SPACE)
} else {
val fieldArr = new ArrayBuffer[String]()
fields.asScala filter (_ != null) foreach (fieldArr += _.getName)
executeCode.append(fieldArr.mkString(", ")).append(SPACE)
}
executeCode
.append(FROM)
.append(SPACE)
.append(sourceDatabase)
.append(".")
.append(sourceTableName)
executeCode.append(MARKS).append(RIGHT_PARENTHESES)
}
val resultCode = executeCode.toString()
logger.info(
s"end to generate code for ${mdqTableBO.getTableBaseInfo.getBase.getName} code is $resultCode"
)
resultCode
}