in linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala [55:189]
def create_table_from_a_file(spark: SparkSession, src: JValue, dest: JValue): Unit = {
val source = src.extract[Map[String, Any]]
val destination = dest.extract[Map[String, Any]]
var path = getMapValue[String](source, "path")
val pathType = getMapValue[String](source, "pathType", "share")
var hasHeader = getMapValue[Boolean](source, "hasHeader", false)
val sheetName = getMapValue[String](source, "sheet", "Sheet1")
val dateFormat = getMapValue[String](source, "dateFormat", "yyyy-MM-dd")
val suffix = path.substring(path.lastIndexOf("."))
val sheetNames = sheetName.split(",").toBuffer.asJava
var fs: FileSystem = null
val database = getMapValue[String](destination, "database")
val tableName = getMapValue[String](destination, "tableName")
val importData = getMapValue[Boolean](destination, "importData", true)
val isPartition = Utils.tryCatch {
getMapValue[Boolean](destination, "isPartition", true)
} { case e: Exception =>
val flag = getMapValue[BigInt](destination, "isPartition", 0)
if (flag == 1) true else false
}
val isOverwrite = getMapValue[Boolean](destination, "isOverwrite", false)
val partition = getMapValue[String](destination, "partition", "ds")
val partitionValue = getMapValue[String](destination, "partitionValue", "1993-01-02")
val columns = (dest \ "columns").extract[List[Map[String, Any]]]
val dateFormats =
columns.map(_.get("dateFormat").get.toString).map(f => if (f isEmpty) "yyyy-MM-dd" else f)
var isFirst = true
val dateFormatsJson = new StringBuilder()
dateFormats.foreach(f => {
if (isFirst) isFirst = false else dateFormatsJson.append(";")
dateFormatsJson.append(f)
})
val indexesStr = String.join(",", columns.map(_.getOrElse("index", 0).toString).asJava)
if ("hdfs".equalsIgnoreCase(pathType)) {
if (".xls".equalsIgnoreCase(suffix)) {
val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
config.setBoolean("fs.hdfs.impl.disable.cache", true)
fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
path = XlsUtils.excelToCsv(fs.open(new Path(path)), fs, hasHeader, sheetNames)
hasHeader = false
} else {
path = if (SparkConfiguration.IS_VIEWFS_ENV.getValue) path else "hdfs://" + path
}
} else {
if (".xlsx".equalsIgnoreCase(suffix)) {
path = "file://" + path
} else if (".xls".equalsIgnoreCase(suffix)) {
val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
config.setBoolean("fs.hdfs.impl.disable.cache", true)
fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
path = XlsUtils.excelToCsv(new FileInputStream(path), fs, hasHeader, sheetNames)
hasHeader = false
} else {
val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
config.setBoolean("fs.hdfs.impl.disable.cache", true)
fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
path = copyFileToHdfs(path, fs)
}
}
val df = if (".xlsx".equalsIgnoreCase(suffix)) {
// info(dateFormatsJson.toString()+ "----------")
spark.read
.format("com.webank.wedatasphere.spark.excel")
.option("useHeader", hasHeader)
.option("maxRowsInMemory", 100)
.option("sheetName", sheetName)
// .option("dateFormat", dateFormat)
.option("indexes", indexesStr)
.option("dateFormats", dateFormatsJson.toString())
.schema(StructType(getFields(columns)))
.load(path)
} else {
CsvRelation.csvToDF(spark, StructType(getFields(columns)), hasHeader, path, source, columns)
}
// warn(s"Fetched ${df.columns.length} col(s) : ${df.count()} row(s).")
df.createOrReplaceTempView("tempTable")
try {
if (importData) {
if (isPartition) {
if (isOverwrite) {
spark.sql(
s"INSERT OVERWRITE TABLE $database.$tableName partition($partition='$partitionValue') select * from tempTable"
)
} else {
spark.sql(
s"INSERT INTO $database.$tableName partition($partition='$partitionValue') select * from tempTable"
)
}
} else {
if (isOverwrite) {
spark.sql(s"INSERT OVERWRITE TABLE $database.$tableName select * from tempTable")
} else {
spark.sql(s"INSERT INTO $database.$tableName select * from tempTable")
}
}
} else {
if (spark.catalog.tableExists(database, tableName)) {
spark.sql(s"drop table if exists $database.$tableName")
}
if (isPartition) {
val columnSql = getColumnSql(columns)
val sql =
s"create table $database.$tableName($columnSql) PARTITIONED BY (`$partition` string) stored as orc tblproperties ('orc.compress'='SNAPPY')"
spark.sql(sql)
spark.sql(
s"INSERT OVERWRITE TABLE $database.$tableName partition($partition='$partitionValue') select * from tempTable"
)
} else {
val columnSql = getColumnSql(columns)
val sql =
s"create table $database.$tableName($columnSql) stored as orc tblproperties ('orc.compress'='SNAPPY')"
spark.sql(sql)
spark.sql(s"INSERT OVERWRITE TABLE $database.$tableName select * from tempTable")
}
}
} catch {
case t: Throwable =>
if (!importData) {
ImExportUtils.tryAndIngoreError(spark.sql(s"drop table $database.$tableName"))
}
throw t
} finally {
if (fs != null) {
fs.delete(new Path(path), true)
// fs.close()
}
}
// warn(s"create table $database $tableName Success")
}