private def openBatchSessionInternal()

in kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala [235:332]


  private def openBatchSessionInternal(
      request: BatchRequest,
      isResourceFromUpload: Boolean = false,
      resourceFileInputStream: Option[InputStream] = None,
      resourceFileMetadata: Option[FormDataContentDisposition] = None,
      formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = {
    require(
      supportedBatchType(request.getBatchType),
      s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
    require(request.getResource != null, "resource is a required parameter")
    if (request.getBatchType.equalsIgnoreCase("SPARK")) {
      require(request.getClassName != null, "classname is a required parameter for SPARK")
    }
    request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))

    val userName = fe.getSessionUser(request.getConf.asScala.toMap)
    val ipAddress = fe.getIpAddress
    val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
    userProvidedBatchId.foreach { batchId =>
      try UUID.fromString(batchId)
      catch {
        case NonFatal(e) =>
          throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
      }
    }

    userProvidedBatchId.flatMap { batchId =>
      sessionManager.getBatchFromMetadataStore(batchId)
    } match {
      case Some(batch) =>
        markDuplicated(batch)
      case None =>
        val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
        if (isResourceFromUpload) {
          handleUploadingFiles(
            batchId,
            request,
            resourceFileInputStream.get,
            resourceFileMetadata.get.getFileName,
            formDataMultiPartOpt)
        }
        request.setConf(
          (request.getConf.asScala ++ Map(
            KYUUBI_BATCH_ID_KEY -> batchId,
            KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
            KYUUBI_CLIENT_IP_KEY -> ipAddress,
            KYUUBI_SERVER_IP_KEY -> fe.host,
            KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
            KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)

        if (batchV2Enabled(request.getConf.asScala.toMap)) {
          logger.info(s"Submit batch job $batchId using Batch API v2")
          return Try {
            sessionManager.initializeBatchState(
              userName,
              ipAddress,
              request.getConf.asScala.toMap,
              request)
          } match {
            case Success(batchId) =>
              sessionManager.getBatchFromMetadataStore(batchId) match {
                case Some(batch) => batch
                case None => throw new IllegalStateException(
                    s"can not find batch $batchId from metadata store")
              }
            case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
              sessionManager.getBatchFromMetadataStore(batchId) match {
                case Some(batch) => markDuplicated(batch)
                case None => throw new IllegalStateException(
                    s"can not find duplicated batch $batchId from metadata store")
              }
            case Failure(cause) => throw new IllegalStateException(cause)
          }
        }

        Try {
          sessionManager.openBatchSession(
            userName,
            "anonymous",
            ipAddress,
            request)
        } match {
          case Success(sessionHandle) =>
            sessionManager.getBatchSession(sessionHandle) match {
              case Some(batchSession) => buildBatch(batchSession)
              case None => throw new IllegalStateException(
                  s"can not find batch $batchId from metadata store")
            }
          case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
            sessionManager.getBatchFromMetadataStore(batchId) match {
              case Some(batch) => markDuplicated(batch)
              case None => throw new IllegalStateException(
                  s"can not find duplicated batch $batchId from metadata store")
            }
          case Failure(cause) => throw new IllegalStateException(cause)
        }
    }
  }