def deploy()

in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/impl/KubernetesNativeSessionClient.scala [136:205]


  def deploy(deployRequest: DeployRequest): DeployResponse = {
    logInfo(
      s"""
         |--------------------------------------- kubernetes session start ---------------------------------------
         |    userFlinkHome    : ${deployRequest.flinkVersion.flinkHome}
         |    flinkVersion     : ${deployRequest.flinkVersion.version}
         |    execMode         : ${deployRequest.executionMode.name()}
         |    clusterId        : ${deployRequest.clusterId}
         |    namespace        : ${deployRequest.k8sDeployParam.kubernetesNamespace}
         |    exposedType      : ${deployRequest.k8sDeployParam.flinkRestExposedType}
         |    serviceAccount   : ${deployRequest.k8sDeployParam.serviceAccount}
         |    flinkImage       : ${deployRequest.k8sDeployParam.flinkImage}
         |    properties       : ${deployRequest.properties.mkString(" ")}
         |-------------------------------------------------------------------------------------------
         |""".stripMargin)
    var clusterDescriptor: KubernetesClusterDescriptor = null
    var client: ClusterClient[String] = null
    var kubeClient: FlinkKubeClient = null
    try {
      val flinkConfig =
        extractConfiguration(deployRequest.flinkVersion.flinkHome, deployRequest.properties)
      flinkConfig
        .safeSet(DeploymentOptions.TARGET, KubernetesDeploymentTarget.SESSION.getName)
        .safeSet(
          KubernetesConfigOptions.NAMESPACE,
          deployRequest.k8sDeployParam.kubernetesNamespace)
        .safeSet(
          KubernetesConfigOptions.KUBERNETES_SERVICE_ACCOUNT,
          deployRequest.k8sDeployParam.serviceAccount)
        .safeSet(
          KubernetesConfigOptions.REST_SERVICE_EXPOSED_TYPE,
          ServiceExposedType.valueOf(deployRequest.k8sDeployParam.flinkRestExposedType.getName))
        .safeSet(KubernetesConfigOptions.CLUSTER_ID, deployRequest.clusterId)
        .safeSet(KubernetesConfigOptions.CONTAINER_IMAGE, deployRequest.k8sDeployParam.flinkImage)
        .safeSet(
          KubernetesConfigOptions.KUBE_CONFIG_FILE,
          getDefaultKubernetesConf(deployRequest.k8sDeployParam.kubeConf))
        .safeSet(
          DeploymentOptionsInternal.CONF_DIR,
          s"${deployRequest.flinkVersion.flinkHome}/conf")

      val kubernetesClusterDescriptor = getK8sClusterDescriptorAndSpecification(flinkConfig)
      clusterDescriptor = kubernetesClusterDescriptor._1
      kubeClient = FlinkKubeClientFactory.getInstance.fromConfiguration(flinkConfig, "client")
      val kubeClientWrapper = new FlinkKubernetesClient(kubeClient)

      if (
        deployRequest.clusterId != null && kubeClientWrapper
          .getService(deployRequest.clusterId)
          .isPresent
      ) {
        client = clusterDescriptor.retrieve(deployRequest.clusterId).getClusterClient
      } else {
        client =
          clusterDescriptor.deploySessionCluster(kubernetesClusterDescriptor._2).getClusterClient
      }
      if (client.getWebInterfaceURL != null) {
        DeployResponse(client.getWebInterfaceURL, client.getClusterId)
      } else {
        null
      }
    } catch {
      case e: Exception =>
        logError(s"start flink session fail in ${deployRequest.executionMode} mode")
        e.printStackTrace()
        throw e
    } finally {
      Utils.close(client, clusterDescriptor, kubeClient)
    }
  }