def create_table_from_a_file()

in linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/imexport/LoadData.scala [55:189]


  def create_table_from_a_file(spark: SparkSession, src: JValue, dest: JValue): Unit = {
    val source = src.extract[Map[String, Any]]
    val destination = dest.extract[Map[String, Any]]

    var path = getMapValue[String](source, "path")
    val pathType = getMapValue[String](source, "pathType", "share")
    var hasHeader = getMapValue[Boolean](source, "hasHeader", false)
    val sheetName = getMapValue[String](source, "sheet", "Sheet1")
    val dateFormat = getMapValue[String](source, "dateFormat", "yyyy-MM-dd")
    val suffix = path.substring(path.lastIndexOf("."))
    val sheetNames = sheetName.split(",").toBuffer.asJava
    var fs: FileSystem = null

    val database = getMapValue[String](destination, "database")
    val tableName = getMapValue[String](destination, "tableName")

    val importData = getMapValue[Boolean](destination, "importData", true)
    val isPartition = Utils.tryCatch {
      getMapValue[Boolean](destination, "isPartition", true)
    } { case e: Exception =>
      val flag = getMapValue[BigInt](destination, "isPartition", 0)
      if (flag == 1) true else false
    }
    val isOverwrite = getMapValue[Boolean](destination, "isOverwrite", false)
    val partition = getMapValue[String](destination, "partition", "ds")
    val partitionValue = getMapValue[String](destination, "partitionValue", "1993-01-02")

    val columns = (dest \ "columns").extract[List[Map[String, Any]]]
    val dateFormats =
      columns.map(_.get("dateFormat").get.toString).map(f => if (f isEmpty) "yyyy-MM-dd" else f)
    var isFirst = true
    val dateFormatsJson = new StringBuilder()
    dateFormats.foreach(f => {
      if (isFirst) isFirst = false else dateFormatsJson.append(";")
      dateFormatsJson.append(f)
    })
    val indexesStr = String.join(",", columns.map(_.getOrElse("index", 0).toString).asJava)

    if ("hdfs".equalsIgnoreCase(pathType)) {
      if (".xls".equalsIgnoreCase(suffix)) {
        val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
        config.setBoolean("fs.hdfs.impl.disable.cache", true)
        fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
        path = XlsUtils.excelToCsv(fs.open(new Path(path)), fs, hasHeader, sheetNames)
        hasHeader = false
      } else {
        path = if (SparkConfiguration.IS_VIEWFS_ENV.getValue) path else "hdfs://" + path
      }
    } else {
      if (".xlsx".equalsIgnoreCase(suffix)) {
        path = "file://" + path
      } else if (".xls".equalsIgnoreCase(suffix)) {
        val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
        config.setBoolean("fs.hdfs.impl.disable.cache", true)
        fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
        path = XlsUtils.excelToCsv(new FileInputStream(path), fs, hasHeader, sheetNames)
        hasHeader = false
      } else {
        val config = HDFSUtils.getConfiguration(HadoopConf.HADOOP_ROOT_USER.getValue)
        config.setBoolean("fs.hdfs.impl.disable.cache", true)
        fs = HDFSUtils.getHDFSUserFileSystem(System.getProperty("user.name"), null, config)
        path = copyFileToHdfs(path, fs)
      }
    }

    val df = if (".xlsx".equalsIgnoreCase(suffix)) {
      // info(dateFormatsJson.toString()+ "----------")
      spark.read
        .format("com.webank.wedatasphere.spark.excel")
        .option("useHeader", hasHeader)
        .option("maxRowsInMemory", 100)
        .option("sheetName", sheetName)
        // .option("dateFormat", dateFormat)
        .option("indexes", indexesStr)
        .option("dateFormats", dateFormatsJson.toString())
        .schema(StructType(getFields(columns)))
        .load(path)
    } else {
      CsvRelation.csvToDF(spark, StructType(getFields(columns)), hasHeader, path, source, columns)
    }
    // warn(s"Fetched ${df.columns.length} col(s) : ${df.count()} row(s).")
    df.createOrReplaceTempView("tempTable")
    try {
      if (importData) {
        if (isPartition) {
          if (isOverwrite) {
            spark.sql(
              s"INSERT OVERWRITE TABLE  $database.$tableName partition($partition='$partitionValue') select * from tempTable"
            )
          } else {
            spark.sql(
              s"INSERT INTO  $database.$tableName partition($partition='$partitionValue') select * from tempTable"
            )
          }
        } else {
          if (isOverwrite) {
            spark.sql(s"INSERT OVERWRITE TABLE  $database.$tableName select * from tempTable")
          } else {
            spark.sql(s"INSERT INTO   $database.$tableName select * from tempTable")
          }
        }
      } else {
        if (spark.catalog.tableExists(database, tableName)) {
          spark.sql(s"drop table if exists $database.$tableName")
        }
        if (isPartition) {
          val columnSql = getColumnSql(columns)
          val sql =
            s"create table $database.$tableName($columnSql) PARTITIONED BY (`$partition` string) stored as orc tblproperties ('orc.compress'='SNAPPY')"
          spark.sql(sql)
          spark.sql(
            s"INSERT OVERWRITE TABLE  $database.$tableName partition($partition='$partitionValue') select * from tempTable"
          )
        } else {
          val columnSql = getColumnSql(columns)
          val sql =
            s"create table $database.$tableName($columnSql) stored as orc tblproperties ('orc.compress'='SNAPPY')"
          spark.sql(sql)
          spark.sql(s"INSERT OVERWRITE TABLE  $database.$tableName select * from tempTable")
        }
      }
    } catch {
      case t: Throwable =>
        if (!importData) {
          ImExportUtils.tryAndIngoreError(spark.sql(s"drop table $database.$tableName"))
        }
        throw t
    } finally {
      if (fs != null) {
        fs.delete(new Path(path), true)
        // fs.close()
      }
    }
    // warn(s"create table $database $tableName Success")
  }