in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/YarnPerJobClient.scala [56:120]
override def doSubmit(
submitRequest: SubmitRequest,
flinkConfig: Configuration): SubmitResponse = {
val flinkHome = submitRequest.flinkVersion.flinkHome
val clusterClientServiceLoader = new DefaultClusterClientServiceLoader
val clientFactory =
clusterClientServiceLoader.getClusterClientFactory[ApplicationId](flinkConfig)
var packagedProgram: PackagedProgram = null
var clusterClient: ClusterClient[ApplicationId] = null
val clusterDescriptor = {
val clusterDescriptor =
clientFactory.createClusterDescriptor(flinkConfig).asInstanceOf[YarnClusterDescriptor]
val flinkDistJar = FlinkUtils.getFlinkDistJar(flinkHome)
clusterDescriptor.setLocalJarPath(new HadoopPath(flinkDistJar))
clusterDescriptor.addShipFiles(List(new File(s"$flinkHome/lib")))
clusterDescriptor
}
try {
clusterClient = {
val clusterSpecification = clientFactory.getClusterSpecification(flinkConfig)
logInfo(s"""
|------------------------<<specification>>-------------------------
|$clusterSpecification
|------------------------------------------------------------------
|""".stripMargin)
val packageProgramJobGraph =
super.getJobGraph(flinkConfig, submitRequest, submitRequest.userJarFile)
packagedProgram = packageProgramJobGraph._1
val jobGraph = packageProgramJobGraph._2
logInfo(s"""
|-------------------------<<applicationId>>------------------------
|jobGraph getJobID: ${jobGraph.getJobID.toString}
|__________________________________________________________________
|""".stripMargin)
deployInternal(
clusterDescriptor,
clusterSpecification,
submitRequest.effectiveAppName,
classOf[YarnJobClusterEntrypoint].getName,
jobGraph,
true).getClusterClient
}
val applicationId = clusterClient.getClusterId
val jobManagerUrl = clusterClient.getWebInterfaceURL
logInfo(s"""
|-------------------------<<applicationId>>------------------------
|Flink Job Started: applicationId: $applicationId
|__________________________________________________________________
|""".stripMargin)
SubmitResponse(applicationId.toString, flinkConfig.toMap, jobManagerUrl = jobManagerUrl)
} finally {
if (submitRequest.safePackageProgram) {
Utils.close(packagedProgram)
}
Utils.close(clusterClient, clusterDescriptor)
}
}