in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java [484:606]
public boolean update(FlinkApplication appParam) {
FlinkApplication application = getById(appParam.getId());
/* If the original mode is remote, k8s-session, yarn-session, check cluster status */
FlinkDeployMode flinkDeployMode = application.getDeployModeEnum();
switch (flinkDeployMode) {
case REMOTE:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
FlinkCluster flinkCluster = flinkClusterService.getById(application.getFlinkClusterId());
ApiAlertException.throwIfFalse(
flinkClusterWatcher.getClusterState(flinkCluster) == ClusterState.RUNNING,
"[StreamPark] update failed, because bind flink cluster not running");
break;
default:
}
boolean success = validateQueueIfNeeded(application, appParam);
ApiAlertException.throwIfFalse(
success,
String.format(ERROR_APP_QUEUE_HINT, appParam.getYarnQueue(), appParam.getTeamId()));
application.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
// 1) jar job jar file changed
if (application.isResourceFromUpload()) {
if (!Objects.equals(application.getJar(), appParam.getJar())) {
application.setBuild(true);
} else {
File jarFile = new File(WebUtils.getAppTempDir(), appParam.getJar());
if (jarFile.exists()) {
try {
long checkSum = org.apache.commons.io.FileUtils.checksumCRC32(jarFile);
if (!Objects.equals(checkSum, application.getJarCheckSum())) {
application.setBuild(true);
application.setJarCheckSum(checkSum);
}
} catch (IOException e) {
log.error("Error in checksumCRC32 for {}.", jarFile);
throw new RuntimeException(e);
}
}
}
}
// 2) k8s podTemplate changed.
if (application.getBuild() && isK8sPodTemplateChanged(application, appParam)) {
application.setBuild(true);
}
// 3) flink version changed
if (!application.getBuild()
&& !Objects.equals(application.getVersionId(), appParam.getVersionId())) {
application.setBuild(true);
}
// 4) yarn application mode change
if (!application.getBuild() && isYarnApplicationModeChange(application, appParam)) {
application.setBuild(true);
}
appParam.setJobType(application.getJobType());
// changes to the following parameters need to be re-release to take effect
application.setJobName(appParam.getJobName());
application.setVersionId(appParam.getVersionId());
application.setArgs(appParam.getArgs());
application.setOptions(appParam.getOptions());
application.setDynamicProperties(appParam.getDynamicProperties());
application.setResolveOrder(appParam.getResolveOrder());
application.setDeployMode(appParam.getDeployMode());
application.setFlinkImage(appParam.getFlinkImage());
application.setK8sNamespace(appParam.getK8sNamespace());
application.updateHotParams(appParam);
application.setK8sRestExposedType(appParam.getK8sRestExposedType());
application.setK8sPodTemplate(appParam.getK8sPodTemplate());
application.setK8sJmPodTemplate(appParam.getK8sJmPodTemplate());
application.setK8sTmPodTemplate(appParam.getK8sTmPodTemplate());
application.setK8sHadoopIntegration(appParam.getK8sHadoopIntegration());
// changes to the following parameters do not affect running tasks
application.setModifyTime(new Date());
application.setDescription(appParam.getDescription());
application.setAlertId(appParam.getAlertId());
application.setRestartSize(appParam.getRestartSize());
application.setCpFailureAction(appParam.getCpFailureAction());
application.setCpFailureRateInterval(appParam.getCpFailureRateInterval());
application.setCpMaxFailureInterval(appParam.getCpMaxFailureInterval());
application.setTags(appParam.getTags());
switch (appParam.getDeployModeEnum()) {
case YARN_APPLICATION:
application.setHadoopUser(appParam.getHadoopUser());
break;
case YARN_PER_JOB:
application.setHadoopUser(appParam.getHadoopUser());
break;
case KUBERNETES_NATIVE_APPLICATION:
application.setFlinkClusterId(null);
break;
case REMOTE:
case YARN_SESSION:
case KUBERNETES_NATIVE_SESSION:
application.setFlinkClusterId(appParam.getFlinkClusterId());
break;
default:
break;
}
// Flink Sql job...
if (application.isJobTypeFlinkSqlOrCDC()) {
updateFlinkSqlJob(application, appParam);
return true;
}
if (application.isAppTypeStreamPark()) {
configService.update(appParam, application.isRunning());
} else {
application.setJar(appParam.getJar());
application.setMainClass(appParam.getMainClass());
}
this.updateById(application);
return true;
}