private[this] def prepareConfig()

in streampark-flink/streampark-flink-client/streampark-flink-client-core/src/main/scala/org/apache/streampark/flink/client/trait/FlinkClientTrait.scala [98:188]


  private[this] def prepareConfig(submitRequest: SubmitRequest): Configuration = {

    val (commandLine, flinkConfig) = getCommandLineAndFlinkConfig(submitRequest)

    submitRequest.jobType match {
      case FlinkJobType.PYFLINK =>
        val pythonVenv: String = Workspace.local.APP_PYTHON_VENV
        AssertUtils.required(FsOperator.lfs.exists(pythonVenv), s"$pythonVenv File does not exist")

        flinkConfig
          // python.archives
          .safeSet(PythonOptions.PYTHON_ARCHIVES, pythonVenv)
          // python.client.executable
          .safeSet(PythonOptions.PYTHON_CLIENT_EXECUTABLE, Constants.PYTHON_EXECUTABLE)
          // python.executable
          .safeSet(PythonOptions.PYTHON_EXECUTABLE, Constants.PYTHON_EXECUTABLE)

        val flinkOptPath: String =
          System.getenv(ConfigConstants.ENV_FLINK_OPT_DIR)
        if (StringUtils.isBlank(flinkOptPath)) {
          logWarn(s"Get environment variable ${ConfigConstants.ENV_FLINK_OPT_DIR} fail")
          val flinkHome = submitRequest.flinkVersion.flinkHome
          SystemPropertyUtils.setEnv(ConfigConstants.ENV_FLINK_OPT_DIR, s"$flinkHome/opt")
          logInfo(
            s"Set temporary environment variables ${ConfigConstants.ENV_FLINK_OPT_DIR} = $flinkHome/opt")
        }
      case _ =>
        if (submitRequest.userJarFile != null) {
          val uri = PackagedProgramUtils.resolveURI(submitRequest.userJarFile.getAbsolutePath)
          val programOptions = ProgramOptions.create(commandLine)
          val executionParameters = ExecutionConfigAccessor.fromProgramOptions(
            programOptions,
            Collections.singletonList(uri.toString))
          executionParameters.applyToConfiguration(flinkConfig)
        }
    }

    // 1) set common parameter
    flinkConfig
      .safeSet(PipelineOptions.NAME, submitRequest.effectiveAppName)
      .safeSet(DeploymentOptions.TARGET, submitRequest.deployMode.getName)
      .safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
      .safeSet(ApplicationConfiguration.APPLICATION_MAIN_CLASS, submitRequest.appMain)
      .safeSet(ApplicationConfiguration.APPLICATION_ARGS, extractProgramArgs(submitRequest))
      .safeSet(PipelineOptionsInternal.PIPELINE_FIXED_JOB_ID, submitRequest.jobId)

    if (!submitRequest.hasProp(CheckpointingOptions.MAX_RETAINED_CHECKPOINTS.key())) {
      val flinkDefaultConfiguration = getFlinkDefaultConfiguration(
        submitRequest.flinkVersion.flinkHome)
      // state.checkpoints.num-retained
      val retainedOption = CheckpointingOptions.MAX_RETAINED_CHECKPOINTS
      flinkConfig.safeSet(retainedOption, flinkDefaultConfiguration.get(retainedOption))
    }

    // 2) set savepoint parameter
    if (StringUtils.isNotBlank(submitRequest.savePoint)) {
      flinkConfig.safeSet(SavepointConfigOptions.SAVEPOINT_PATH, submitRequest.savePoint)
      flinkConfig.setBoolean(
        SavepointConfigOptions.SAVEPOINT_IGNORE_UNCLAIMED_STATE,
        submitRequest.allowNonRestoredState)
      val enableRestoreMode =
        submitRequest.restoreMode != null && submitRequest.flinkVersion
          .checkVersion(FlinkRestoreMode.SINCE_FLINK_VERSION)
      if (enableRestoreMode) {
        flinkConfig.setString(FlinkRestoreMode.RESTORE_MODE, submitRequest.restoreMode.getName);
      }
    }

    // 4) set env.xx.opts parameter
    if (MapUtils.isNotEmpty(submitRequest.properties)) {
      // file.encoding...
      if (submitRequest.hasProp(CoreOptions.FLINK_JVM_OPTIONS.key())) {
        val jvmOpt =
          submitRequest.getProp(CoreOptions.FLINK_JVM_OPTIONS.key()).toString
        if (!jvmOpt.contains("-Dfile.encoding=")) {
          // set default file.encoding
          val opt = s"-Dfile.encoding=UTF-8 $jvmOpt"
          submitRequest.properties.put(CoreOptions.FLINK_JVM_OPTIONS.key(), opt)
        }
      }

      submitRequest.properties
        .filter(_._1.startsWith("env."))
        .foreach(x => {
          logInfo(s"env opts:  ${x._1}: ${x._2}")
          flinkConfig.setString(x._1, x._2.toString)
        })
    }

    flinkConfig
  }