in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/application/impl/FlinkApplicationManageServiceImpl.java [394:481]
public Long copy(FlinkApplication appParam) {
boolean existsByJobName = this.existsByJobName(appParam.getJobName());
ApiAlertException.throwIfFalse(
!existsByJobName,
"[StreamPark] Application names can't be repeated, copy application failed.");
FlinkApplication persist = getById(appParam.getId());
FlinkApplication newApp = new FlinkApplication();
String jobName = appParam.getJobName();
newApp.setJobName(jobName);
newApp.setClusterId(
FlinkDeployMode.isSessionMode(persist.getDeployModeEnum())
? persist.getClusterId()
: null);
newApp.setArgs(appParam.getArgs() != null ? appParam.getArgs() : persist.getArgs());
newApp.setVersionId(persist.getVersionId());
newApp.setFlinkClusterId(persist.getFlinkClusterId());
newApp.setRestartSize(persist.getRestartSize());
newApp.setJobType(persist.getJobType());
newApp.setOptions(persist.getOptions());
newApp.setDynamicProperties(persist.getDynamicProperties());
newApp.setResolveOrder(persist.getResolveOrder());
newApp.setDeployMode(persist.getDeployMode());
newApp.setFlinkImage(persist.getFlinkImage());
newApp.setK8sNamespace(persist.getK8sNamespace());
newApp.setK8sRestExposedType(persist.getK8sRestExposedType());
newApp.setK8sPodTemplate(persist.getK8sPodTemplate());
newApp.setK8sJmPodTemplate(persist.getK8sJmPodTemplate());
newApp.setK8sTmPodTemplate(persist.getK8sTmPodTemplate());
newApp.setK8sHadoopIntegration(persist.getK8sHadoopIntegration());
newApp.setDescription(persist.getDescription());
newApp.setAlertId(persist.getAlertId());
newApp.setCpFailureAction(persist.getCpFailureAction());
newApp.setCpFailureRateInterval(persist.getCpFailureRateInterval());
newApp.setCpMaxFailureInterval(persist.getCpMaxFailureInterval());
newApp.setMainClass(persist.getMainClass());
newApp.setAppType(persist.getAppType());
newApp.setResourceFrom(persist.getResourceFrom());
newApp.setProjectId(persist.getProjectId());
newApp.setModule(persist.getModule());
newApp.setUserId(ServiceHelper.getUserId());
newApp.setState(FlinkAppStateEnum.ADDED.getValue());
newApp.setRelease(ReleaseStateEnum.NEED_RELEASE.get());
newApp.setOptionState(OptionStateEnum.NONE.getValue());
newApp.setHotParams(persist.getHotParams());
// createTime & modifyTime
Date date = new Date();
newApp.setCreateTime(date);
newApp.setModifyTime(date);
newApp.setJar(persist.getJar());
newApp.setJarCheckSum(persist.getJarCheckSum());
newApp.setTags(persist.getTags());
newApp.setTeamId(persist.getTeamId());
newApp.setDependency(persist.getDependency());
Application application = applicationService.create(EngineTypeEnum.FLINK);
newApp.setId(application.getId());
boolean saved = save(newApp);
if (saved) {
if (newApp.isJobTypeFlinkSqlOrCDC()) {
FlinkSql copyFlinkSql = flinkSqlService.getLatestFlinkSql(appParam.getId(), true);
newApp.setFlinkSql(copyFlinkSql.getSql());
newApp.setDependency(copyFlinkSql.getDependency());
FlinkSql flinkSql = new FlinkSql(newApp);
flinkSqlService.create(flinkSql);
}
FlinkApplicationConfig copyConfig = configService.getEffective(appParam.getId());
if (copyConfig != null) {
FlinkApplicationConfig config = new FlinkApplicationConfig();
config.setAppId(newApp.getId());
config.setFormat(copyConfig.getFormat());
config.setContent(copyConfig.getContent());
config.setCreateTime(new Date());
config.setVersion(1);
configService.save(config);
configService.setLatestOrEffective(true, config.getId(), newApp.getId());
}
return newApp.getId();
} else {
throw new ApiAlertException(
"create application from copy failed, copy source app: " + persist.getJobName());
}
}