in seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java [82:253]
public void execute() throws CommandExecuteException {
JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
LocalDateTime startTime = LocalDateTime.now();
LocalDateTime endTime = LocalDateTime.now();
SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
try {
String clusterName = clientCommandArgs.getClusterName();
ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
// get running mode
boolean isLocalMode = clientCommandArgs.getMasterType().equals(MasterType.LOCAL);
if (isLocalMode) {
clusterName =
creatRandomClusterName(
StringUtils.isNotEmpty(clusterName)
? clusterName
: Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
instance = createServerInLocal(clusterName, seaTunnelConfig);
int port = instance.getCluster().getLocalMember().getSocketAddress().getPort();
clientConfig
.getNetworkConfig()
.setAddresses(Collections.singletonList("localhost:" + port));
}
if (StringUtils.isNotEmpty(clusterName)) {
seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
clientConfig.setClusterName(clusterName);
}
engineClient = new SeaTunnelClient(clientConfig);
if (clientCommandArgs.isListJob()) {
String jobStatus = engineClient.getJobClient().listJobStatus(true);
System.out.println(jobStatus);
} else if (clientCommandArgs.isGetRunningJobMetrics()) {
String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics();
System.out.println(runningJobMetrics);
} else if (null != clientCommandArgs.getJobId()) {
String jobState =
engineClient
.getJobClient()
.getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId()));
System.out.println(jobState);
} else if (null != clientCommandArgs.getCancelJobId()) {
List<String> cancelJobIds = clientCommandArgs.getCancelJobId();
for (String cancelJobId : cancelJobIds) {
engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId));
}
} else if (null != clientCommandArgs.getMetricsJobId()) {
String jobMetrics =
engineClient
.getJobClient()
.getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
System.out.println(jobMetrics);
} else if (null != clientCommandArgs.getSavePointJobId()) {
engineClient
.getJobClient()
.savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
} else {
Path configFile = FileUtils.getConfigPath(clientCommandArgs);
checkConfigExist(configFile);
JobConfig jobConfig = new JobConfig();
ClientJobExecutionEnvironment jobExecutionEnv;
jobConfig.setName(clientCommandArgs.getJobName());
if (null != clientCommandArgs.getRestoreJobId()) {
jobExecutionEnv =
engineClient.restoreExecutionContext(
configFile.toString(),
clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig,
Long.parseLong(clientCommandArgs.getRestoreJobId()));
} else {
jobExecutionEnv =
engineClient.createExecutionContext(
configFile.toString(),
clientCommandArgs.getVariables(),
jobConfig,
seaTunnelConfig,
clientCommandArgs.getCustomJobId() != null
? Long.parseLong(clientCommandArgs.getCustomJobId())
: null);
}
// get job start time
startTime = LocalDateTime.now();
// create job proxy
ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
if (clientCommandArgs.isAsync()) {
if (isLocalMode) {
log.warn("The job is running in local mode, can not use async mode.");
} else {
return;
}
}
// register cancelJob hook
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
log.info(
"run shutdown hook because get close signal");
shutdownHook(clientJobProxy);
});
try {
future.get(15, TimeUnit.SECONDS);
} catch (Exception e) {
log.error("Cancel job failed.", e);
}
}));
// get job id
long jobId = clientJobProxy.getJobId();
JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
executorService =
Executors.newScheduledThreadPool(
2,
new ThreadFactoryBuilder()
.setNameFormat("job-metrics-runner-%d")
.setDaemon(true)
.build());
executorService.scheduleAtFixedRate(
jobMetricsRunner,
0,
seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
TimeUnit.SECONDS);
if (!isLocalMode) {
// LOCAL mode does not require running the job status runner
executorService.schedule(
new JobStatusRunner(engineClient.getJobClient(), jobId),
0,
TimeUnit.SECONDS);
}
// wait for job complete
JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
jobStatus = jobResult.getStatus();
if (StringUtils.isNotEmpty(jobResult.getError())
|| jobResult.getStatus().equals(JobStatus.FAILED)) {
throw new SeaTunnelEngineException(jobResult.getError());
}
// get job end time
endTime = LocalDateTime.now();
// get job statistic information when job finished
jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
}
} catch (Exception e) {
throw new CommandExecuteException("SeaTunnel job executed failed", e);
} finally {
if (jobMetricsSummary != null) {
// print job statistics information when job finished
log.info(
StringFormatUtils.formatTable(
"Job Statistic Information",
"Start Time",
DateTimeUtils.toString(
startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
"End Time",
DateTimeUtils.toString(
endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
"Total Time(s)",
Duration.between(startTime, endTime).getSeconds(),
"Total Read Count",
jobMetricsSummary.getSourceReadCount(),
"Total Write Count",
jobMetricsSummary.getSinkWriteCount(),
"Total Failed Count",
jobMetricsSummary.getSourceReadCount()
- jobMetricsSummary.getSinkWriteCount()));
}
closeClient();
}
}