in kyuubi-server/src/main/scala/org/apache/kyuubi/server/api/v1/BatchesResource.scala [235:332]
private def openBatchSessionInternal(
request: BatchRequest,
isResourceFromUpload: Boolean = false,
resourceFileInputStream: Option[InputStream] = None,
resourceFileMetadata: Option[FormDataContentDisposition] = None,
formDataMultiPartOpt: Option[FormDataMultiPart] = None): Batch = {
require(
supportedBatchType(request.getBatchType),
s"${request.getBatchType} is not in the supported list: $SUPPORTED_BATCH_TYPES}")
require(request.getResource != null, "resource is a required parameter")
if (request.getBatchType.equalsIgnoreCase("SPARK")) {
require(request.getClassName != null, "classname is a required parameter for SPARK")
}
request.setBatchType(request.getBatchType.toUpperCase(Locale.ROOT))
val userName = fe.getSessionUser(request.getConf.asScala.toMap)
val ipAddress = fe.getIpAddress
val userProvidedBatchId = request.getConf.asScala.get(KYUUBI_BATCH_ID_KEY)
userProvidedBatchId.foreach { batchId =>
try UUID.fromString(batchId)
catch {
case NonFatal(e) =>
throw new IllegalArgumentException(s"$KYUUBI_BATCH_ID_KEY=$batchId must be an UUID", e)
}
}
userProvidedBatchId.flatMap { batchId =>
sessionManager.getBatchFromMetadataStore(batchId)
} match {
case Some(batch) =>
markDuplicated(batch)
case None =>
val batchId = userProvidedBatchId.getOrElse(UUID.randomUUID().toString)
if (isResourceFromUpload) {
handleUploadingFiles(
batchId,
request,
resourceFileInputStream.get,
resourceFileMetadata.get.getFileName,
formDataMultiPartOpt)
}
request.setConf(
(request.getConf.asScala ++ Map(
KYUUBI_BATCH_ID_KEY -> batchId,
KYUUBI_BATCH_RESOURCE_UPLOADED_KEY -> isResourceFromUpload.toString,
KYUUBI_CLIENT_IP_KEY -> ipAddress,
KYUUBI_SERVER_IP_KEY -> fe.host,
KYUUBI_SESSION_CONNECTION_URL_KEY -> fe.connectionUrl,
KYUUBI_SESSION_REAL_USER_KEY -> fe.getRealUser())).asJava)
if (batchV2Enabled(request.getConf.asScala.toMap)) {
logger.info(s"Submit batch job $batchId using Batch API v2")
return Try {
sessionManager.initializeBatchState(
userName,
ipAddress,
request.getConf.asScala.toMap,
request)
} match {
case Success(batchId) =>
sessionManager.getBatchFromMetadataStore(batchId) match {
case Some(batch) => batch
case None => throw new IllegalStateException(
s"can not find batch $batchId from metadata store")
}
case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
sessionManager.getBatchFromMetadataStore(batchId) match {
case Some(batch) => markDuplicated(batch)
case None => throw new IllegalStateException(
s"can not find duplicated batch $batchId from metadata store")
}
case Failure(cause) => throw new IllegalStateException(cause)
}
}
Try {
sessionManager.openBatchSession(
userName,
"anonymous",
ipAddress,
request)
} match {
case Success(sessionHandle) =>
sessionManager.getBatchSession(sessionHandle) match {
case Some(batchSession) => buildBatch(batchSession)
case None => throw new IllegalStateException(
s"can not find batch $batchId from metadata store")
}
case Failure(cause) if JdbcUtils.isDuplicatedKeyDBErr(cause) =>
sessionManager.getBatchFromMetadataStore(batchId) match {
case Some(batch) => markDuplicated(batch)
case None => throw new IllegalStateException(
s"can not find duplicated batch $batchId from metadata store")
}
case Failure(cause) => throw new IllegalStateException(cause)
}
}
}