in inlong-manager/manager-plugins/base/src/main/java/org/apache/inlong/manager/plugin/flink/IntegrationTaskRunner.java [50:161]
public void run() {
TaskCommitType commitType = TaskCommitType.getInstance(this.commitType);
if (commitType == null) {
commitType = TaskCommitType.START_NOW;
}
switch (commitType) {
case START_NOW:
try {
String jobId = flinkService.submit(flinkInfo);
flinkInfo.setJobId(jobId);
log.info("Start job {} success in backend", jobId);
} catch (Exception e) {
String msg = String.format("Start job %s failed in backend exception[%s]", flinkInfo.getJobId(),
getExceptionStackMsg(e));
log.warn(msg);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(msg);
}
break;
case RESUME:
try {
String jobId = flinkService.restore(flinkInfo);
log.info("Restore job {} success in backend", jobId);
} catch (Exception e) {
String msg = String.format("Restore job %s failed in backend exception[%s]", flinkInfo.getJobId(),
getExceptionStackMsg(e));
log.warn(msg);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(msg);
}
break;
case RESTART:
try {
StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId: {} savepoint: {} ", flinkInfo.getJobId(), location);
int times = 0;
while (times < TRY_MAX_TIMES) {
JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
// restore job
if (jobStatus == FINISHED) {
try {
String jobId = flinkService.restore(flinkInfo);
log.info("Restore job {} success in backend", jobId);
} catch (Exception e) {
log.error("Restore job failed in backend", e);
}
break;
}
log.info("Try start job but the job {} is {}", flinkInfo.getJobId(), jobStatus.toString());
try {
Thread.sleep(INTERVAL * 1000);
} catch (Exception e) {
e.printStackTrace();
}
times++;
}
log.info("Restart job {} success in backend", flinkInfo.getJobId());
} catch (Exception e) {
String msg = String.format("Restart job %s failed in backend exception[%s]", flinkInfo.getJobId(),
getExceptionStackMsg(e));
log.warn(msg);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(msg);
}
break;
case STOP:
try {
StopWithSavepointRequest stopWithSavepointRequest = new StopWithSavepointRequest();
FlinkConfig flinkConfig = flinkService.getFlinkConfig();
stopWithSavepointRequest.setDrain(flinkConfig.isDrain());
stopWithSavepointRequest.setTargetDirectory(flinkConfig.getSavepointDirectory());
String location = flinkService.stopJob(flinkInfo, stopWithSavepointRequest);
flinkInfo.setSavepointPath(location);
log.info("the jobId {} savepoint: {} ", flinkInfo.getJobId(), location);
} catch (Exception e) {
String msg = String.format("stop job %s failed in backend exception[%s]", flinkInfo.getJobId(),
getExceptionStackMsg(e));
log.warn(msg);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(msg);
}
break;
case DELETE:
try {
flinkService.cancelJob(flinkInfo);
log.info("delete job {} success in backend", flinkInfo.getJobId());
JobStatus jobStatus = flinkService.getJobStatus(flinkInfo);
if (jobStatus.isTerminalState()) {
log.info("delete job {} success in backend", flinkInfo.getJobId());
} else {
log.info("delete job {} failed in backend", flinkInfo.getJobId());
}
} catch (Exception e) {
String msg = String.format("delete job %s failed in backend exception[%s]", flinkInfo.getJobId(),
getExceptionStackMsg(e));
log.warn(msg);
flinkInfo.setException(true);
flinkInfo.setExceptionMsg(msg);
}
break;
default:
String msg = "not found commitType";
flinkInfo.setException(true);
log.warn(msg);
flinkInfo.setExceptionMsg(msg);
}
}