override def newExecuteStatementOperation()

in externals/kyuubi-spark-sql-engine/src/main/scala/org/apache/kyuubi/engine/spark/operation/SparkSQLOperationManager.scala [58:137]


  override def newExecuteStatementOperation(
      session: Session,
      statement: String,
      confOverlay: Map[String, String],
      runAsync: Boolean,
      queryTimeout: Long): Operation = {
    val spark = session.asInstanceOf[SparkSessionImpl].spark
    if (spark.conf.getOption(ENGINE_OPERATION_CONVERT_CATALOG_DATABASE_ENABLED.key)
        .map(_.toBoolean).getOrElse(operationConvertCatalogDatabaseDefault)) {
      val catalogDatabaseOperation = processCatalogDatabase(session, statement, confOverlay)
      if (catalogDatabaseOperation != null) {
        return catalogDatabaseOperation
      }
    }
    val lang = OperationLanguages(confOverlay.getOrElse(
      OPERATION_LANGUAGE.key,
      spark.conf.get(OPERATION_LANGUAGE.key, operationLanguageDefault)))
    val opHandle = confOverlay.get(KYUUBI_OPERATION_HANDLE_KEY).map(
      OperationHandle.apply).getOrElse(OperationHandle())
    val operation =
      lang match {
        case OperationLanguages.SQL =>
          val mode = PlanOnlyMode.fromString(spark.conf.get(
            OPERATION_PLAN_ONLY_MODE.key,
            planOnlyModeDefault))

          spark.conf.set(OPERATION_PLAN_ONLY_MODE.key, mode.name)
          mode match {
            case NoneMode =>
              val incrementalCollect =
                spark.conf.getOption(ENGINE_SPARK_OPERATION_INCREMENTAL_COLLECT.key)
                  .orElse(spark.conf.getOption(OPERATION_INCREMENTAL_COLLECT.key))
                  .map(_.toBoolean).getOrElse(operationIncrementalCollectDefault)
              // TODO: respect the config of the operation ExecuteStatement, if it was set.
              val resultFormat = spark.conf.get(OPERATION_RESULT_FORMAT.key, "thrift")
              resultFormat.toLowerCase match {
                case "arrow" =>
                  new ArrowBasedExecuteStatement(
                    session,
                    statement,
                    runAsync,
                    queryTimeout,
                    incrementalCollect,
                    opHandle)
                case _ =>
                  new ExecuteStatement(
                    session,
                    statement,
                    runAsync,
                    queryTimeout,
                    incrementalCollect,
                    opHandle)
              }
            case mode =>
              new PlanOnlyStatement(session, statement, mode, opHandle)
          }
        case OperationLanguages.SCALA =>
          val repl = sessionToRepl.getOrElseUpdate(session.handle, KyuubiSparkILoop(spark))
          new ExecuteScala(session, repl, statement, runAsync, queryTimeout, opHandle)
        case OperationLanguages.PYTHON =>
          try {
            ExecutePython.init()
            val worker = sessionToPythonProcess.getOrElseUpdate(
              session.handle,
              ExecutePython.createSessionPythonWorker(spark, session))
            new ExecutePython(session, statement, runAsync, queryTimeout, worker, opHandle)
          } catch {
            case e: Throwable =>
              spark.conf.set(OPERATION_LANGUAGE.key, OperationLanguages.SQL.toString)
              throw KyuubiSQLException(
                s"Failed to init python environment, fall back to SQL mode: ${e.getMessage}",
                e)
          }
        case OperationLanguages.UNKNOWN =>
          spark.conf.unset(OPERATION_LANGUAGE.key)
          throw KyuubiSQLException(s"The operation language $lang" +
            " doesn't support in Spark SQL engine.")
      }
    addOperation(operation)
  }