in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala [136:205]
def deploy(deployRequest: DeployRequest): DeployResponse = {
logInfo(
s"""
|--------------------------------------- kubernetes session start ---------------------------------------
| userFlinkHome : ${deployRequest.flinkVersion.flinkHome}
| flinkVersion : ${deployRequest.flinkVersion.version}
| execMode : ${deployRequest.executionMode.name()}
| clusterId : ${deployRequest.clusterId}
| namespace : ${deployRequest.k8sDeployParam.kubernetesNamespace}
| exposedType : ${deployRequest.k8sDeployParam.flinkRestExposedType}
| serviceAccount : ${deployRequest.k8sDeployParam.serviceAccount}
| flinkImage : ${deployRequest.k8sDeployParam.flinkImage}
| properties : ${deployRequest.properties.mkString(" ")}
|-------------------------------------------------------------------------------------------
|""".stripMargin)
var clusterDescriptor: KubernetesClusterDescriptor = null
var client: ClusterClient[String] = null
var kubeClient: FlinkKubeClient = null
try {
val flinkConfig =
extractConfiguration(deployRequest.flinkVersion.flinkHome, deployRequest.properties)
flinkConfig
.safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName)
.safeSet(
KubernetesConfigOptions.NAMESPACE,
deployRequest.k8sDeployParam.kubernetesNamespace)
.safeSet(
KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
deployRequest.k8sDeployParam.serviceAccount)
.safeSet(
KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
.safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
.safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, deployRequest.k8sDeployParam.flinkImage)
.safeSet(
KubernetesConfigOptions.KUBE_CONFIG_FILE,
getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
.safeSet(
DeploymentOptionsInternal.CONF_DIR,
s"${deployRequest.flinkVersion.flinkHome}/conf")
val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
clusterDescriptor = kubernetesClusterDescriptor._1
kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)
if (
deployRequest.clusterId != null && kubeClientWrapper
.getService(deployRequest.clusterId)
.isPresent
) {
client = clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
} else {
client =
clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
}
if (client.getWebInterfaceURL != null) {
DeployResponse(client.getWebInterfaceURL, client.getClusterId)
} else {
null
}
} catch {
case e: Exception =>
logError(s"start flink session fail in ${deployRequest.executionMode} mode")
e.printStackTrace()
throw e
} finally {
Utils.close(client, clusterDescriptor, kubeClient)
}
}