in extensions/spark/kyuubi-spark-authz/src/main/scala/org/apache/kyuubi/plugin/spark/authz/PrivilegesBuilder.scala [141:269]
private def buildCommand(
plan: LogicalPlan,
inputObjs: ArrayBuffer[PrivilegeObject],
outputObjs: ArrayBuffer[PrivilegeObject],
spark: SparkSession): OperationType = {
def getTablePriv(tableDesc: TableDesc): Seq[PrivilegeObject] = {
try {
val maybeTable = tableDesc.extract(plan, spark)
maybeTable match {
case Some(table) =>
val newTable = if (tableDesc.setCurrentDatabaseIfMissing) {
setCurrentDBIfNecessary(table, spark)
} else {
table
}
if (tableDesc.tableTypeDesc.exists(_.skip(plan))) {
Nil
} else {
val actionType = tableDesc.actionTypeDesc.map(_.extract(plan)).getOrElse(OTHER)
val columnNames = tableDesc.columnDesc.map(_.extract(plan)).getOrElse(Nil)
Seq(PrivilegeObject(newTable, columnNames, actionType))
}
case None => Nil
}
} catch {
case e: Exception =>
LOG.debug(tableDesc.error(plan, e))
Nil
}
}
plan.getClass.getName match {
case classname if DB_COMMAND_SPECS.contains(classname) =>
val desc = DB_COMMAND_SPECS(classname)
desc.databaseDescs.foreach { databaseDesc =>
try {
val database = databaseDesc.extract(plan)
if (databaseDesc.isInput) {
inputObjs += PrivilegeObject(database)
} else {
outputObjs += PrivilegeObject(database)
}
} catch {
case e: Exception =>
LOG.debug(databaseDesc.error(plan, e))
}
}
desc.uriDescs.foreach { ud =>
try {
val uris = ud.extract(plan, spark)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
} else {
outputObjs ++= uris.map(PrivilegeObject(_))
}
} catch {
case e: Exception =>
LOG.debug(ud.error(plan, e))
}
}
desc.operationType
case classname if TABLE_COMMAND_SPECS.contains(classname) =>
val spec = TABLE_COMMAND_SPECS(classname)
spec.tableDescs.foreach { td =>
if (td.isInput) {
inputObjs ++= getTablePriv(td)
} else {
outputObjs ++= getTablePriv(td)
}
}
spec.uriDescs.foreach { ud =>
try {
val uris = ud.extract(plan, spark)
if (ud.isInput) {
inputObjs ++= uris.map(PrivilegeObject(_))
} else {
outputObjs ++= uris.map(PrivilegeObject(_))
}
} catch {
case e: Exception =>
LOG.debug(ud.error(plan, e))
}
}
spec.queries(plan).foreach { p =>
if (p.resolved) {
buildQuery(Project(p.output, p), inputObjs, spark = spark)
} else {
try {
// For spark 3.1, Some command such as CreateTableASSelect, its query was unresolved,
// Before this pr, we just ignore it, now we support this.
val analyzed = spark.sessionState.analyzer.execute(p)
buildQuery(Project(analyzed.output, analyzed), inputObjs, spark = spark)
} catch {
case e: Exception =>
LOG.debug(
s"""
|Failed to analyze unresolved
|$p
|due to ${e.getMessage}""".stripMargin,
e)
}
}
}
spec.operationType
case classname if FUNCTION_COMMAND_SPECS.contains(classname) =>
val spec = FUNCTION_COMMAND_SPECS(classname)
spec.functionDescs.foreach { fd =>
try {
val function = fd.extract(plan)
if (!fd.functionTypeDesc.exists(_.skip(plan, spark))) {
if (fd.isInput) {
inputObjs += PrivilegeObject(function)
} else {
outputObjs += PrivilegeObject(function)
}
}
} catch {
case e: Exception =>
LOG.debug(fd.error(plan, e))
}
}
spec.operationType
case _ => OperationType.QUERY
}
}