in externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala [58:137]
override def newExecuteStatementOperation(
session: Session,
statement: String,
confOverlay: Map[String, String],
runAsync: Boolean,
queryTimeout: Long): Operation = {
val spark = session.asInstanceOf[SparkSessionImpl].spark
if (spark.conf.getOption(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key)
.map(_.toBoolean).getOrElse(operationConvertCatalogDatabaseDefault)) {
val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
if (catalogDatabaseOperation != null) {
return catalogDatabaseOperation
}
}
val lang = OperationLanguages(confOverlay.getOrElse(
OPERATION_LANGUAGE.key,
spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)))
val opHandle = confOverlay.get(KYUUBI_OPERATION_HANDLE_KEY).map(
OperationHandle.apply).getOrElse(OperationHandle())
val operation =
lang match {
case OperationLanguages.SQL =>
val mode = PlanOnlyMode.fromString(spark.conf.get(
OPERATION_PLAN_ONLY_MODE.key,
planOnlyModeDefault))
spark.conf.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
mode match {
case NoneMode =>
val incrementalCollect =
spark.conf.getOption(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key)
.orElse(spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key))
.map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
// TODO: respect the config of the operation ExecuteStatement, if it was set.
val resultFormat = spark.conf.get(OPERATION_RESULT_FORMAT.key, "thrift")
resultFormat.toLowerCase match {
case "arrow" =>
new ArrowBasedExecuteStatement(
session,
statement,
runAsync,
queryTimeout,
incrementalCollect,
opHandle)
case _ =>
new ExecuteStatement(
session,
statement,
runAsync,
queryTimeout,
incrementalCollect,
opHandle)
}
case mode =>
new PlanOnlyStatement(session, statement, mode, opHandle)
}
case OperationLanguages.SCALA =>
val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
new ExecuteScala(session, repl, statement, runAsync, queryTimeout, opHandle)
case OperationLanguages.PYTHON =>
try {
ExecutePython.init()
val worker = sessionToPythonProcess.getOrElseUpdate(
session.handle,
ExecutePython.createSessionPythonWorker(spark, session))
new ExecutePython(session, statement, runAsync, queryTimeout, worker, opHandle)
} catch {
case e: Throwable =>
spark.conf.set(OPERATION_LANGUAGE.key, OperationLanguages.SQL.toString)
throw KyuubiSQLException(
s"Failed to init python environment, fall back to SQL mode: ${e.getMessage}",
e)
}
case OperationLanguages.UNKNOWN =>
spark.conf.unset(OPERATION_LANGUAGE.key)
throw KyuubiSQLException(s"The operation language $lang" +
" doesn't support in Spark SQL engine.")
}
addOperation(operation)
}