in extensions/spark/kyuubi-spark-lineage/src/main/scala/org/apache/kyuubi/plugin/lineage/helper/SparkSQLLineageParseHelper.scala [188:482]
private def extractColumnsLineage(
plan: LogicalPlan,
parentColumnsLineage: AttributeMap[AttributeSet]): AttributeMap[AttributeSet] = {
plan match {
// For command
case p if p.nodeName == "CommandResult" =>
val commandPlan = getField[LogicalPlan](plan, "commandLogicalPlan")
extractColumnsLineage(commandPlan, parentColumnsLineage)
case p if p.nodeName == "AlterViewAsCommand" =>
val query =
if (SPARK_RUNTIME_VERSION <= "3.1") {
sparkSession.sessionState.analyzer.execute(getQuery(plan))
} else {
getQuery(plan)
}
val view = getV1TableName(getField[TableIdentifier](plan, "name").unquotedString)
extractColumnsLineage(query, parentColumnsLineage).map { case (k, v) =>
k.withName(s"$view.${k.name}") -> v
}
case p
if p.nodeName == "CreateViewCommand"
&& getField[ViewType](plan, "viewType") == PersistedView =>
val view = getV1TableName(getField[TableIdentifier](plan, "name").unquotedString)
val outputCols =
getField[Seq[(String, Option[String])]](plan, "userSpecifiedColumns").map(_._1)
val query =
if (SPARK_RUNTIME_VERSION <= "3.1") {
sparkSession.sessionState.analyzer.execute(getField[LogicalPlan](plan, "child"))
} else {
getField[LogicalPlan](plan, "plan")
}
val lineages = extractColumnsLineage(query, parentColumnsLineage).zipWithIndex.map {
case ((k, v), i) if outputCols.nonEmpty => k.withName(s"$view.${outputCols(i)}") -> v
case ((k, v), _) => k.withName(s"$view.${k.name}") -> v
}.toSeq
ListMap[Attribute, AttributeSet](lineages: _*)
case p if p.nodeName == "CreateDataSourceTableAsSelectCommand" =>
val table = getV1TableName(getField[CatalogTable](plan, "table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p
if p.nodeName == "CreateHiveTableAsSelectCommand" ||
p.nodeName == "OptimizedCreateHiveTableAsSelectCommand" =>
val table = getV1TableName(getField[CatalogTable](plan, "tableDesc").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p
if p.nodeName == "CreateTableAsSelect" ||
p.nodeName == "ReplaceTableAsSelect" =>
val (table, namespace, catalog) =
if (SPARK_RUNTIME_VERSION <= "3.2") {
(
getField[Identifier](plan, "tableName").name,
getField[Identifier](plan, "tableName").namespace.mkString("."),
getField[TableCatalog](plan, "catalog").name())
} else {
(
invokeAs[Identifier](plan, "tableName").name(),
invokeAs[Identifier](plan, "tableName").namespace().mkString("."),
getField[CatalogPlugin](
invokeAs[LogicalPlan](plan, "name"),
"catalog").name())
}
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(Seq(catalog, namespace, table, k.name).filter(_.nonEmpty).mkString(".")) -> v
}
case p if p.nodeName == "InsertIntoDataSourceCommand" =>
val logicalRelation = getField[LogicalRelation](plan, "logicalRelation")
val table = logicalRelation
.catalogTable.map(t => getV1TableName(t.qualifiedName)).getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "InsertIntoHadoopFsRelationCommand" =>
val table =
getField[Option[CatalogTable]](plan, "catalogTable")
.map(t => getV1TableName(t.qualifiedName))
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if table.nonEmpty =>
k.withName(s"$table.${k.name}") -> v
}
case p
if p.nodeName == "InsertIntoDataSourceDirCommand" ||
p.nodeName == "InsertIntoHiveDirCommand" =>
val dir =
getField[CatalogStorageFormat](plan, "storage").locationUri.map(_.toString)
.getOrElse("")
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map {
case (k, v) if dir.nonEmpty =>
k.withName(s"`$dir`.${k.name}") -> v
}
case p if p.nodeName == "InsertIntoHiveTable" =>
val table = getV1TableName(getField[CatalogTable](plan, "table").qualifiedName)
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "SaveIntoDataSourceCommand" =>
extractColumnsLineage(getQuery(plan), parentColumnsLineage)
case p
if p.nodeName == "AppendData"
|| p.nodeName == "OverwriteByExpression"
|| p.nodeName == "OverwritePartitionsDynamic" =>
val table = getV2TableName(getField[NamedRelation](plan, "table"))
extractColumnsLineage(getQuery(plan), parentColumnsLineage).map { case (k, v) =>
k.withName(s"$table.${k.name}") -> v
}
case p if p.nodeName == "MergeIntoTable" =>
val matchedActions = getField[Seq[MergeAction]](plan, "matchedActions")
val notMatchedActions = getField[Seq[MergeAction]](plan, "notMatchedActions")
val allAssignments = (matchedActions ++ notMatchedActions).collect {
case UpdateAction(_, assignments) => assignments
case InsertAction(_, assignments) => assignments
}.flatten
val nextColumnsLlineage = ListMap(allAssignments.map { assignment =>
(
assignment.key.asInstanceOf[Attribute],
assignment.value.references)
}: _*)
val targetTable = getField[LogicalPlan](plan, "targetTable")
val sourceTable = getField[LogicalPlan](plan, "sourceTable")
val targetColumnsLineage = extractColumnsLineage(
targetTable,
nextColumnsLlineage.map { case (k, _) => (k, AttributeSet(k)) })
val sourceColumnsLineage = extractColumnsLineage(sourceTable, nextColumnsLlineage)
val targetColumnsWithTargetTable = targetColumnsLineage.values.flatten.map { column =>
val unquotedQualifiedName = (column.qualifier :+ column.name).mkString(".")
column.withName(unquotedQualifiedName)
}
ListMap(targetColumnsWithTargetTable.zip(sourceColumnsLineage.values).toSeq: _*)
case p if p.nodeName == "WithCTE" =>
val optimized = sparkSession.sessionState.optimizer.execute(p)
extractColumnsLineage(optimized, parentColumnsLineage)
// For query
case p: Project =>
val nextColumnsLineage =
joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.projectList))
p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
case p: Aggregate =>
val nextColumnsLineage =
joinColumnsLineage(parentColumnsLineage, getSelectColumnLineage(p.aggregateExpressions))
p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
case p: Expand =>
val references =
p.projections.transpose.map(_.flatMap(x => x.references)).map(AttributeSet(_))
val childColumnsLineage = ListMap(p.output.zip(references): _*)
val nextColumnsLineage =
joinColumnsLineage(parentColumnsLineage, childColumnsLineage)
p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
case p: Generate =>
val generateColumnsLineageWithId =
ListMap(p.generatorOutput.map(attrRef => (attrRef.toAttribute.exprId, p.references)): _*)
val nextColumnsLineage = parentColumnsLineage.map {
case (key, attrRefs) =>
key -> AttributeSet(attrRefs.flatMap(attr =>
generateColumnsLineageWithId.getOrElse(
attr.exprId,
AttributeSet(attr))))
}
p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
case p: Window =>
val windowColumnsLineage =
ListMap(p.windowExpressions.map(exp => (exp.toAttribute, exp.references)): _*)
val nextColumnsLineage = if (parentColumnsLineage.isEmpty) {
ListMap(p.child.output.map(attr => (attr, attr.references)): _*) ++ windowColumnsLineage
} else {
parentColumnsLineage.map {
case (k, _) if windowColumnsLineage.contains(k) =>
k -> windowColumnsLineage(k)
case (k, attrs) =>
k -> AttributeSet(attrs.flatten(attr =>
windowColumnsLineage.getOrElse(attr, AttributeSet(attr))))
}
}
p.children.map(extractColumnsLineage(_, nextColumnsLineage)).reduce(mergeColumnsLineage)
case p: Join =>
p.joinType match {
case LeftSemi | LeftAnti =>
extractColumnsLineage(p.left, parentColumnsLineage)
case _ =>
p.children.map(extractColumnsLineage(_, parentColumnsLineage))
.reduce(mergeColumnsLineage)
}
case p: Union =>
val childrenColumnsLineage =
// support for the multi-insert statement
if (p.output.isEmpty) {
p.children
.map(extractColumnsLineage(_, ListMap[Attribute, AttributeSet]()))
.reduce(mergeColumnsLineage)
} else {
// merge all children in to one derivedColumns
val childrenUnion =
p.children.map(extractColumnsLineage(_, ListMap[Attribute, AttributeSet]())).map(
_.values).reduce {
(left, right) =>
left.zip(right).map(attr => attr._1 ++ attr._2)
}
ListMap(p.output.zip(childrenUnion): _*)
}
joinColumnsLineage(parentColumnsLineage, childrenColumnsLineage)
case p: LogicalRelation if p.catalogTable.nonEmpty =>
val tableName = getV1TableName(p.catalogTable.get.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: HiveTableRelation =>
val tableName = getV1TableName(p.tableMeta.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: DataSourceV2ScanRelation =>
val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
// For creating the view from v2 table, the logical plan of table will
// be the `DataSourceV2Relation` not the `DataSourceV2ScanRelation`.
// because the view from the table is not going to read it.
case p: DataSourceV2Relation =>
val tableName = getV2TableName(p)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(tableName))
case p: LocalRelation =>
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(LOCAL_TABLE_IDENTIFIER))
case _: OneRowRelation =>
parentColumnsLineage.map {
case (k, attrs) =>
k -> AttributeSet(attrs.map {
case attr
if attr.qualifier.nonEmpty && attr.qualifier.last.equalsIgnoreCase(
SUBQUERY_COLUMN_IDENTIFIER) =>
attr.withQualifier(attr.qualifier.init)
case attr => attr
})
}
case p: View =>
if (!p.isTempView && SparkContextHelper.getConf(
LineageConf.SKIP_PARSING_PERMANENT_VIEW_ENABLED)) {
val viewName = getV1TableName(p.desc.qualifiedName)
joinRelationColumnLineage(parentColumnsLineage, p.output, Seq(viewName))
} else {
val viewColumnsLineage =
extractColumnsLineage(p.child, ListMap[Attribute, AttributeSet]())
mergeRelationColumnLineage(parentColumnsLineage, p.output, viewColumnsLineage)
}
case p: InMemoryRelation =>
// get logical plan from cachedPlan
val cachedTableLogical = findSparkPlanLogicalLink(Seq(p.cacheBuilder.cachedPlan))
cachedTableLogical match {
case Some(logicPlan) =>
val relationColumnLineage =
extractColumnsLineage(logicPlan, ListMap[Attribute, AttributeSet]())
mergeRelationColumnLineage(parentColumnsLineage, p.output, relationColumnLineage)
case _ =>
joinRelationColumnLineage(
parentColumnsLineage,
p.output,
p.cacheBuilder.tableName.toSeq)
}
case p if p.children.isEmpty => ListMap[Attribute, AttributeSet]()
case p =>
p.children.map(extractColumnsLineage(_, parentColumnsLineage)).reduce(mergeColumnsLineage)
}
}