override def doSubmit()

in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala [56:120]


  override def doSubmit(
      submitRequest: SubmitRequest,
      flinkConfig: Configuration): SubmitResponse = {

    val flinkHome = submitRequest.flinkVersion.flinkHome

    val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
    val clientFactory =
      clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
    var packagedProgram: PackagedProgram = null
    var clusterClient: ClusterClient[ApplicationId] = null

    val clusterDescriptor = {
      val clusterDescriptor =
        clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
      val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
      clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
      clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
      clusterDescriptor
    }

    try {
      clusterClient = {
        val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
        logInfo(s"""
                   |------------------------<<specification>>-------------------------
                   |$clusterSpecification
                   |------------------------------------------------------------------
                   |""".stripMargin)

        val packageProgramJobGraph =
          super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
        packagedProgram = packageProgramJobGraph._1
        val jobGraph = packageProgramJobGraph._2

        logInfo(s"""
                   |-------------------------<<applicationId>>------------------------
                   |jobGraph getJobID: ${jobGraph.getJobID.toString}
                   |__________________________________________________________________
                   |""".stripMargin)
        deployInternal(
          clusterDescriptor,
          clusterSpecification,
          submitRequest.effectiveAppName,
          classOf[YarnJobClusterEntrypoint].getName,
          jobGraph,
          true).getClusterClient

      }
      val applicationId = clusterClient.getClusterId
      val jobManagerUrl = clusterClient.getWebInterfaceURL
      logInfo(s"""
                 |-------------------------<<applicationId>>------------------------
                 |Flink Job Started: applicationId: $applicationId
                 |__________________________________________________________________
                 |""".stripMargin)

      SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
    } finally {
      if (submitRequest.safePackageProgram) {
        Utils.close(packagedProgram)
      }
      Utils.close(clusterClient, clusterDescriptor)
    }
  }