in streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala [62:214]
override protected def buildProcess(): DockerImageBuildResponse = {
// Step-1: init build workspace of spark job
// the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
val buildWorkspace =
execStep(1) {
val buildWorkspace =
s"${request.workspace}/${request.k8sNamespace}"
LfsOperator.mkCleanDirs(buildWorkspace)
logInfo(s"Recreate building workspace: $buildWorkspace")
buildWorkspace
}.getOrElse(throw getError.exception)
// Step-2: export k8s pod template files
val podTemplatePaths = request.sparkPodTemplate match {
case podTemplate if podTemplate.isEmpty =>
skipStep(2)
Map[String, String]()
case podTemplate =>
execStep(2) {
val podTemplateFiles =
PodTemplateTool
.preparePodTemplateFiles(buildWorkspace, podTemplate)
.tmplFiles
logInfo(s"Export spark podTemplates: ${podTemplateFiles.values.mkString(",")}")
podTemplateFiles
}.getOrElse(throw getError.exception)
}
// Step-3: prepare spark job jar
val (mainJarPath, extJarLibs) =
execStep(3) {
val mainJarName = Paths.get(request.mainJar).getFileName
val mainJarPath = s"$buildWorkspace/$mainJarName"
LfsOperator.copy(request.mainJar, mainJarPath)
logInfo(s"Prepared spark job jar: $mainJarPath")
mainJarPath -> Set[String]()
}.getOrElse(throw getError.exception)
// Step-4: generate and Export spark image dockerfiles
val (dockerfile, dockerFileTemplate) =
execStep(4) {
val dockerFileTemplate = {
if (request.integrateWithHadoop) {
SparkHadoopDockerfileTemplate.fromSystemHadoopConf(
buildWorkspace,
request.sparkBaseImage,
mainJarPath,
extJarLibs)
} else {
SparkDockerfileTemplate(
buildWorkspace,
request.sparkBaseImage,
mainJarPath,
extJarLibs)
}
}
val dockerFile = dockerFileTemplate.writeDockerfile
logInfo(
s"Output spark dockerfile: ${dockerFile.getAbsolutePath}, content: \n${dockerFileTemplate.offerDockerfileContent}")
dockerFile -> dockerFileTemplate
}.getOrElse(throw getError.exception)
val dockerConf = request.dockerConfig
val baseImageTag = request.sparkBaseImage.trim
val pushImageTag = {
if (request.k8sNamespace.isEmpty || request.appName.isEmpty) {
throw new IllegalArgumentException("k8sNamespace or appName cannot be empty")
}
val expectedImageTag =
s"streampark-sparkjob-${request.k8sNamespace}-${request.appName}"
compileTag(expectedImageTag, dockerConf.registerAddress, dockerConf.imageNamespace)
}
// Step-5: pull spark base image
execStep(5) {
usingDockerClient {
dockerClient =>
val pullImageCmd = {
// when the register address prefix is explicitly identified on base image tag,
// the user's pre-saved docker register auth info would be used.
val pullImageCmdState =
dockerConf.registerAddress != null && !baseImageTag.startsWith(
dockerConf.registerAddress)
if (pullImageCmdState) {
dockerClient.pullImageCmd(baseImageTag)
} else {
dockerClient
.pullImageCmd(baseImageTag)
.withAuthConfig(dockerConf.toAuthConf)
}
}
val pullCmdCallback = pullImageCmd
.asInstanceOf[HackPullImageCmd]
.start(watchDockerPullProcess {
pullRsp =>
dockerProcess.pull.update(pullRsp)
Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
})
pullCmdCallback.awaitCompletion
logInfo(s"Already pulled docker image from remote register, imageTag=$baseImageTag")
}(err => throw new Exception(s"Pull docker image failed, imageTag=$baseImageTag", err))
}.getOrElse(throw getError.exception)
// Step-6: build spark image
execStep(6) {
usingDockerClient {
dockerClient =>
val buildImageCmd = dockerClient
.buildImageCmd()
.withBaseDirectory(new File(buildWorkspace))
.withDockerfile(dockerfile)
.withTags(Sets.newHashSet(pushImageTag))
val buildCmdCallback = buildImageCmd
.asInstanceOf[HackBuildImageCmd]
.start(watchDockerBuildStep {
buildStep =>
dockerProcess.build.update(buildStep)
Future(
dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
})
val imageId = buildCmdCallback.awaitImageId
logInfo(s"Built docker image, imageId=$imageId, imageTag=$pushImageTag")
}(err => throw new Exception(s"Build docker image failed. tag=$pushImageTag", err))
}.getOrElse(throw getError.exception)
// Step-7: push spark image
execStep(7) {
usingDockerClient {
dockerClient =>
val pushCmd: PushImageCmd = dockerClient
.pushImageCmd(pushImageTag)
.withAuthConfig(dockerConf.toAuthConf)
val pushCmdCallback = pushCmd
.asInstanceOf[HackPushImageCmd]
.start(watchDockerPushProcess {
pushRsp =>
dockerProcess.push.update(pushRsp)
Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
})
pushCmdCallback.awaitCompletion
logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
}(err => throw new Exception(s"Push docker image failed. tag=$pushImageTag", err))
}.getOrElse(throw getError.exception)
DockerImageBuildResponse(
buildWorkspace,
pushImageTag,
podTemplatePaths,
dockerFileTemplate.innerMainJarPath)
}