in linkis-engineconn-plugins/spark/src/main/scala/org/apache/linkis/engineplugin/spark/metadata/SparkSQLHistoryParser.scala [208:337]
private[this] def parseRunnableCommand(
plan: LogicalPlan,
inputObjects: JList[HPO],
outputObjects: JList[HPO]
): Unit = {
plan match {
case c: CreateDataSourceTableAsSelectCommand =>
val columnList = toCSColumns(c.table.schema)
addTableOrViewLevelObjs(
c.table.identifier,
outputObjects,
columns = columnList,
actionType = TableOperationType.CREATE
)
ParseQuery(c.query, inputObjects)
case c: CreateDataSourceTableCommand =>
addTableOrViewLevelObjs(
c.table.identifier,
outputObjects,
columns = toCSColumns(c.table.schema),
actionType = TableOperationType.CREATE
)
case c: CreateHiveTableAsSelectCommand =>
val columnList = toCSColumns(c.tableDesc.schema)
addTableOrViewLevelObjs(
c.tableDesc.identifier,
outputObjects,
columns = columnList,
actionType = TableOperationType.CREATE
)
ParseQuery(c.query, inputObjects)
case c: CreateTableCommand =>
addTableOrViewLevelObjs(
c.table.identifier,
outputObjects,
columns = toCSColumns(c.table.schema),
actionType = TableOperationType.CREATE
)
case c: CreateTableLikeCommand =>
addTableOrViewLevelObjs(
c.targetTable,
outputObjects,
actionType = TableOperationType.CREATE
)
addTableOrViewLevelObjs(c.sourceTable, inputObjects)
case c: CreateViewCommand =>
addTableOrViewLevelObjs(
c.name,
outputObjects,
columns = toCSColumnsByNamed(c.output),
actionType = TableOperationType.CREATE
)
// after spark 3.2.0, `child` field will be replaced by `plan` in CreateViewCommand
val logicalPlan = if (SPARK_VERSION < "3.2") {
getFieldVal(c, "child").asInstanceOf[LogicalPlan]
} else {
getFieldVal(c, "plan").asInstanceOf[LogicalPlan]
}
ParseQuery(logicalPlan, inputObjects)
case l: LoadDataCommand => addTableOrViewLevelObjs(l.table, outputObjects)
case i if i.nodeName == "InsertIntoHiveTable" =>
val table = getFieldVal(i, "table").asInstanceOf[CatalogTable]
addTableOrViewLevelObjs(
table.identifier,
outputObjects,
columns = toCSColumns(table.schema),
actionType = TableOperationType.CREATE
)
ParseQuery(getFieldVal(i, "query").asInstanceOf[LogicalPlan], inputObjects)
case d: DropTableCommand =>
addTableOrViewLevelObjs(d.tableName, outputObjects, actionType = TableOperationType.DROP)
case s: TruncateTableCommand =>
addTableOrViewLevelObjs(s.tableName, outputObjects, actionType = TableOperationType.DROP)
case a: AlterTableAddPartitionCommand =>
addTableOrViewLevelObjs(a.tableName, outputObjects, actionType = TableOperationType.ALTER)
case a: AlterTableDropPartitionCommand =>
addTableOrViewLevelObjs(a.tableName, outputObjects)
case a: AlterTableRenameCommand if !a.isView || a.oldName.database.nonEmpty =>
addTableOrViewLevelObjs(a.oldName, inputObjects)
addTableOrViewLevelObjs(a.newName, outputObjects)
case a: AlterTableRenamePartitionCommand =>
addTableOrViewLevelObjs(a.tableName, inputObjects)
addTableOrViewLevelObjs(a.tableName, outputObjects)
case a: AlterViewAsCommand =>
if (a.name.database.nonEmpty) {
// it's a permanent view
addTableOrViewLevelObjs(a.name, outputObjects, actionType = TableOperationType.ALTER)
}
ParseQuery(a.query, inputObjects)
case a if a.nodeName == "AlterTableAddColumnsCommand" =>
addTableOrViewLevelObjs(
getFieldVal(a, "table").asInstanceOf[TableIdentifier],
inputObjects,
columns = toCSColumns(getFieldVal(a, "colsToAdd").asInstanceOf[Seq[StructField]])
)
addTableOrViewLevelObjs(
getFieldVal(a, "table").asInstanceOf[TableIdentifier],
outputObjects,
columns = toCSColumns(getFieldVal(a, "colsToAdd").asInstanceOf[Seq[StructField]]),
actionType = TableOperationType.ALTER
)
case a if a.nodeName == "AlterTableChangeColumnCommand" =>
addTableOrViewLevelObjs(
getFieldVal(a, "tableName").asInstanceOf[TableIdentifier],
inputObjects,
columns = toCSColumns(Seq(getFieldVal(a, "newColumn").asInstanceOf[StructField])),
actionType = TableOperationType.ALTER
)
case _ =>
}
}