override protected def buildProcess()

in streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/SparkK8sApplicationBuildPipeline.scala [62:214]


  override protected def buildProcess(): DockerImageBuildResponse = {

    // Step-1: init build workspace of spark job
    // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
    val buildWorkspace =
      execStep(1) {
        val buildWorkspace =
          s"${request.workspace}/${request.k8sNamespace}"
        LfsOperator.mkCleanDirs(buildWorkspace)
        logInfo(s"Recreate building workspace: $buildWorkspace")
        buildWorkspace
      }.getOrElse(throw getError.exception)

    // Step-2: export k8s pod template files
    val podTemplatePaths = request.sparkPodTemplate match {
      case podTemplate if podTemplate.isEmpty =>
        skipStep(2)
        Map[String, String]()
      case podTemplate =>
        execStep(2) {
          val podTemplateFiles =
            PodTemplateTool
              .preparePodTemplateFiles(buildWorkspace, podTemplate)
              .tmplFiles
          logInfo(s"Export spark podTemplates: ${podTemplateFiles.values.mkString(",")}")
          podTemplateFiles
        }.getOrElse(throw getError.exception)
    }

    // Step-3: prepare spark job jar
    val (mainJarPath, extJarLibs) =
      execStep(3) {
        val mainJarName = Paths.get(request.mainJar).getFileName
        val mainJarPath = s"$buildWorkspace/$mainJarName"
        LfsOperator.copy(request.mainJar, mainJarPath)
        logInfo(s"Prepared spark job jar: $mainJarPath")
        mainJarPath -> Set[String]()
      }.getOrElse(throw getError.exception)

    // Step-4: generate and Export spark image dockerfiles
    val (dockerfile, dockerFileTemplate) =
      execStep(4) {
        val dockerFileTemplate = {
          if (request.integrateWithHadoop) {
            SparkHadoopDockerfileTemplate.fromSystemHadoopConf(
              buildWorkspace,
              request.sparkBaseImage,
              mainJarPath,
              extJarLibs)
          } else {
            SparkDockerfileTemplate(
              buildWorkspace,
              request.sparkBaseImage,
              mainJarPath,
              extJarLibs)
          }
        }
        val dockerFile = dockerFileTemplate.writeDockerfile
        logInfo(
          s"Output spark dockerfile: ${dockerFile.getAbsolutePath}, content: \n${dockerFileTemplate.offerDockerfileContent}")
        dockerFile -> dockerFileTemplate
      }.getOrElse(throw getError.exception)

    val dockerConf = request.dockerConfig
    val baseImageTag = request.sparkBaseImage.trim
    val pushImageTag = {
      if (request.k8sNamespace.isEmpty || request.appName.isEmpty) {
        throw new IllegalArgumentException("k8sNamespace or appName cannot be empty")
      }
      val expectedImageTag =
        s"streampark-sparkjob-${request.k8sNamespace}-${request.appName}"
      compileTag(expectedImageTag, dockerConf.registerAddress, dockerConf.imageNamespace)
    }

    // Step-5: pull spark base image
    execStep(5) {
      usingDockerClient {
        dockerClient =>
          val pullImageCmd = {
            // when the register address prefix is explicitly identified on base image tag,
            // the user's pre-saved docker register auth info would be used.
            val pullImageCmdState =
              dockerConf.registerAddress != null && !baseImageTag.startsWith(
                dockerConf.registerAddress)
            if (pullImageCmdState) {
              dockerClient.pullImageCmd(baseImageTag)
            } else {
              dockerClient
                .pullImageCmd(baseImageTag)
                .withAuthConfig(dockerConf.toAuthConf)
            }
          }
          val pullCmdCallback = pullImageCmd
            .asInstanceOf[HackPullImageCmd]
            .start(watchDockerPullProcess {
              pullRsp =>
                dockerProcess.pull.update(pullRsp)
                Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
            })
          pullCmdCallback.awaitCompletion
          logInfo(s"Already pulled docker image from remote register, imageTag=$baseImageTag")
      }(err => throw new Exception(s"Pull docker image failed, imageTag=$baseImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-6: build spark image
    execStep(6) {
      usingDockerClient {
        dockerClient =>
          val buildImageCmd = dockerClient
            .buildImageCmd()
            .withBaseDirectory(new File(buildWorkspace))
            .withDockerfile(dockerfile)
            .withTags(Sets.newHashSet(pushImageTag))

          val buildCmdCallback = buildImageCmd
            .asInstanceOf[HackBuildImageCmd]
            .start(watchDockerBuildStep {
              buildStep =>
                dockerProcess.build.update(buildStep)
                Future(
                  dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
            })
          val imageId = buildCmdCallback.awaitImageId
          logInfo(s"Built docker image, imageId=$imageId, imageTag=$pushImageTag")
      }(err => throw new Exception(s"Build docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-7: push spark image
    execStep(7) {
      usingDockerClient {
        dockerClient =>
          val pushCmd: PushImageCmd = dockerClient
            .pushImageCmd(pushImageTag)
            .withAuthConfig(dockerConf.toAuthConf)

          val pushCmdCallback = pushCmd
            .asInstanceOf[HackPushImageCmd]
            .start(watchDockerPushProcess {
              pushRsp =>
                dockerProcess.push.update(pushRsp)
                Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
            })
          pushCmdCallback.awaitCompletion
          logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
      }(err => throw new Exception(s"Push docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    DockerImageBuildResponse(
      buildWorkspace,
      pushImageTag,
      podTemplatePaths,
      dockerFileTemplate.innerMainJarPath)
  }