override protected def buildProcess()

in streampark-flink/streampark-flink-packer/src/main/scala/org/apache/streampark/flink/packer/pipeline/impl/FlinkK8sApplicationBuildPipeline.scala [64:236]


  override protected def buildProcess(): DockerImageBuildResponse = {

    // Step-1: init build workspace of flink job
    // the sub workspace dir like: APP_WORKSPACE/k8s-clusterId@k8s-namespace/
    val buildWorkspace =
      execStep(1) {
        val buildWorkspace =
          s"${request.workspace}/${request.clusterId}@${request.k8sNamespace}"
        LfsOperator.mkCleanDirs(buildWorkspace)
        logInfo(s"Recreate building workspace: $buildWorkspace")
        buildWorkspace
      }.getOrElse(throw getError.exception)

    // Step-2: export k8s pod template files
    val podTemplatePaths = request.flinkPodTemplate match {
      case podTemplate if podTemplate.isEmpty =>
        skipStep(2)
        Map[String, String]()
      case podTemplate =>
        execStep(2) {
          val podTemplateFiles =
            PodTemplateTool
              .preparePodTemplateFiles(buildWorkspace, podTemplate)
              .tmplFiles
          logInfo(s"Export flink podTemplates: ${podTemplateFiles.values.mkString(",")}")
          podTemplateFiles
        }.getOrElse(throw getError.exception)
    }

    // Step-3: build shaded flink job jar and handle extra jars
    // the output shaded jar file name like: streampark-flinkjob_myjob-test.jar
    val (shadedJar, extJarLibs) =
      execStep(3) {
        val shadedJarOutputPath = request.getShadedJarPath(buildWorkspace)
        val shadedJar =
          MavenTool.buildFatJar(request.mainClass, request.providedLibs, shadedJarOutputPath)
        logInfo(s"Output shaded flink job jar: ${shadedJar.getAbsolutePath}")
        shadedJar -> request.dependencyInfo.extJarLibs
      }.getOrElse(throw getError.exception)

    // Step-4: generate and Export flink image dockerfiles
    val (dockerfile, dockerFileTemplate) =
      execStep(4) {
        val dockerFileTemplate = {
          if (request.integrateWithHadoop) {
            FlinkHadoopDockerfileTemplate.fromSystemHadoopConf(
              buildWorkspace,
              request.flinkBaseImage,
              shadedJar.getAbsolutePath,
              extJarLibs)
          } else {
            FlinkDockerfileTemplate(
              buildWorkspace,
              request.flinkBaseImage,
              shadedJar.getAbsolutePath,
              extJarLibs)
          }
        }
        val dockerFile = dockerFileTemplate.writeDockerfile
        logInfo(
          s"Output flink dockerfile: ${dockerFile.getAbsolutePath}, content: \n${dockerFileTemplate.offerDockerfileContent}")
        dockerFile -> dockerFileTemplate
      }.getOrElse(throw getError.exception)

    val dockerConf = request.dockerConfig
    val baseImageTag = request.flinkBaseImage.trim
    val pushImageTag = {
      if (request.k8sNamespace.isEmpty || request.clusterId.isEmpty) {
        throw new IllegalArgumentException("k8sNamespace or clusterId cannot be empty")
      }
      val expectedImageTag =
        s"streampark-flinkjob-${request.k8sNamespace}-${request.clusterId}"
      compileTag(expectedImageTag, dockerConf.registerAddress, dockerConf.imageNamespace)
    }

    // Step-5: pull flink base image
    execStep(5) {
      usingDockerClient {
        dockerClient =>
          val imgExists = dockerClient.listImagesCmd().exec().exists(_.getRepoTags.exists(_.contains(baseImageTag)))
          if (imgExists) {
            logInfo(s"found local docker image $baseImageTag, no need to pull from remote.")
          } else {
            val pullImageCmd = {
              // when the register address prefix is explicitly identified on base image tag,
              // the user's pre-saved docker register auth info would be used.
              val pullImageCmdState =
                dockerConf.registerAddress != null && !baseImageTag.startsWith(
                  dockerConf.registerAddress)
              if (pullImageCmdState) {
                dockerClient.pullImageCmd(baseImageTag)
              } else {
                dockerClient
                  .pullImageCmd(baseImageTag)
                  .withAuthConfig(dockerConf.toAuthConf)
              }
            }
            val pullCmdCallback = pullImageCmd
              .asInstanceOf[HackPullImageCmd]
              .start(watchDockerPullProcess {
                pullRsp =>
                  dockerProcess.pull.update(pullRsp)
                  Future(dockerProcessWatcher.onDockerPullProgressChange(dockerProcess.pull.snapshot))
              })
            pullCmdCallback.awaitCompletion
            logInfo(s"Already pulled docker image from remote register, imageTag=$baseImageTag")
          }
      }(err => throw new Exception(s"Pull docker image failed, imageTag=$baseImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-6: build flink image
    execStep(6) {
      usingDockerClient {
        dockerClient =>
          val buildImageCmd = dockerClient
            .buildImageCmd()
            .withBaseDirectory(new File(buildWorkspace))
            .withDockerfile(dockerfile)
            .withTags(Sets.newHashSet(pushImageTag))

          val buildCmdCallback = buildImageCmd
            .asInstanceOf[HackBuildImageCmd]
            .start(watchDockerBuildStep {
              buildStep =>
                dockerProcess.build.update(buildStep)
                Future(
                  dockerProcessWatcher.onDockerBuildProgressChange(dockerProcess.build.snapshot))
            })
          val imageId = buildCmdCallback.awaitImageId
          logInfo(s"Built docker image, imageId=$imageId, imageTag=$pushImageTag")
      }(err => throw new Exception(s"Build docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-7: push flink image
    execStep(7) {
      usingDockerClient {
        dockerClient =>
          val pushCmd: PushImageCmd = dockerClient
            .pushImageCmd(pushImageTag)
            .withAuthConfig(dockerConf.toAuthConf)

          val pushCmdCallback = pushCmd
            .asInstanceOf[HackPushImageCmd]
            .start(watchDockerPushProcess {
              pushRsp =>
                dockerProcess.push.update(pushRsp)
                Future(dockerProcessWatcher.onDockerPushProgressChange(dockerProcess.push.snapshot))
            })
          pushCmdCallback.awaitCompletion
          logInfo(s"Already pushed docker image, imageTag=$pushImageTag")
      }(err => throw new Exception(s"Push docker image failed. tag=$pushImageTag", err))
    }.getOrElse(throw getError.exception)

    // Step-8:  init build workspace of ingress
    val ingressOutputPath = request.ingressTemplate match {
      case ingress if StringUtils.isBlank(ingress) =>
        skipStep(8)
        ""
      case _ =>
        execStep(8) {
          val ingressOutputPath =
            IngressController.prepareIngressTemplateFiles(buildWorkspace, request.ingressTemplate)
          logInfo(s"Export flink ingress: $ingressOutputPath")
          ingressOutputPath
        }.getOrElse(throw getError.exception)
    }

    DockerImageBuildResponse(
      buildWorkspace,
      pushImageTag,
      podTemplatePaths,
      dockerFileTemplate.innerMainJarPath)
  }