protected void runJar()

in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [849:890]


    protected void runJar(
            JobSpec job,
            JobID jobID,
            JarUploadResponseBody response,
            Configuration conf,
            String savepoint) {
        String jarId =
                response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
        try (var clusterClient = getClusterClient(conf)) {
            JarRunHeaders headers = JarRunHeaders.getInstance();
            JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
            parameters.jarIdPathParameter.resolve(jarId);
            var flinkVersion = conf.get(FLINK_VERSION);
            JarRunRequestBody runRequestBody =
                    new JarRunRequestBody(
                            job.getEntryClass(),
                            null,
                            job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
                            job.getParallelism() > 0 ? job.getParallelism() : null,
                            jobID,
                            job.getAllowNonRestoredState(),
                            savepoint,
                            flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
                                    ? null
                                    : RestoreMode.DEFAULT,
                            flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
                                    ? RestoreMode.DEFAULT
                                    : null,
                            conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
                                    ? conf.toMap()
                                    : null);
            LOG.info("Submitting job: {} to session cluster.", jobID);
            clusterClient
                    .sendRequest(headers, parameters, runRequestBody)
                    .get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
        } catch (Exception e) {
            LOG.error("Failed to submit job to session cluster.", e);
            throw new FlinkRuntimeException(e);
        } finally {
            deleteJar(conf, jarId);
        }
    }