in flink-kubernetes-operator/src/main/java/org/apache/flink/kubernetes/operator/service/AbstractFlinkService.java [849:890]
protected void runJar(
JobSpec job,
JobID jobID,
JarUploadResponseBody response,
Configuration conf,
String savepoint) {
String jarId =
response.getFilename().substring(response.getFilename().lastIndexOf("/") + 1);
try (var clusterClient = getClusterClient(conf)) {
JarRunHeaders headers = JarRunHeaders.getInstance();
JarRunMessageParameters parameters = headers.getUnresolvedMessageParameters();
parameters.jarIdPathParameter.resolve(jarId);
var flinkVersion = conf.get(FLINK_VERSION);
JarRunRequestBody runRequestBody =
new JarRunRequestBody(
job.getEntryClass(),
null,
job.getArgs() == null ? null : Arrays.asList(job.getArgs()),
job.getParallelism() > 0 ? job.getParallelism() : null,
jobID,
job.getAllowNonRestoredState(),
savepoint,
flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
? null
: RestoreMode.DEFAULT,
flinkVersion.isEqualOrNewer(FlinkVersion.v1_20)
? RestoreMode.DEFAULT
: null,
conf.get(FLINK_VERSION).isEqualOrNewer(FlinkVersion.v1_17)
? conf.toMap()
: null);
LOG.info("Submitting job: {} to session cluster.", jobID);
clusterClient
.sendRequest(headers, parameters, runRequestBody)
.get(operatorConfig.getFlinkClientTimeout().toSeconds(), TimeUnit.SECONDS);
} catch (Exception e) {
LOG.error("Failed to submit job to session cluster.", e);
throw new FlinkRuntimeException(e);
} finally {
deleteJar(conf, jarId);
}
}