public void execute()

in seatunnel-core/seatunnel-starter/src/main/java/org/apache/seatunnel/core/starter/seatunnel/command/ClientExecuteCommand.java [82:253]


    public void execute() throws CommandExecuteException {
        JobMetricsRunner.JobMetricsSummary jobMetricsSummary = null;
        LocalDateTime startTime = LocalDateTime.now();
        LocalDateTime endTime = LocalDateTime.now();
        SeaTunnelConfig seaTunnelConfig = ConfigProvider.locateAndGetSeaTunnelConfig();
        try {
            String clusterName = clientCommandArgs.getClusterName();
            ClientConfig clientConfig = ConfigProvider.locateAndGetClientConfig();
            //  get running mode
            boolean isLocalMode = clientCommandArgs.getMasterType().equals(MasterType.LOCAL);
            if (isLocalMode) {
                clusterName =
                        creatRandomClusterName(
                                StringUtils.isNotEmpty(clusterName)
                                        ? clusterName
                                        : Constant.DEFAULT_SEATUNNEL_CLUSTER_NAME);
                instance = createServerInLocal(clusterName, seaTunnelConfig);
                int port = instance.getCluster().getLocalMember().getSocketAddress().getPort();
                clientConfig
                        .getNetworkConfig()
                        .setAddresses(Collections.singletonList("localhost:" + port));
            }
            if (StringUtils.isNotEmpty(clusterName)) {
                seaTunnelConfig.getHazelcastConfig().setClusterName(clusterName);
                clientConfig.setClusterName(clusterName);
            }
            engineClient = new SeaTunnelClient(clientConfig);
            if (clientCommandArgs.isListJob()) {
                String jobStatus = engineClient.getJobClient().listJobStatus(true);
                System.out.println(jobStatus);
            } else if (clientCommandArgs.isGetRunningJobMetrics()) {
                String runningJobMetrics = engineClient.getJobClient().getRunningJobMetrics();
                System.out.println(runningJobMetrics);
            } else if (null != clientCommandArgs.getJobId()) {
                String jobState =
                        engineClient
                                .getJobClient()
                                .getJobDetailStatus(Long.parseLong(clientCommandArgs.getJobId()));
                System.out.println(jobState);
            } else if (null != clientCommandArgs.getCancelJobId()) {
                List<String> cancelJobIds = clientCommandArgs.getCancelJobId();
                for (String cancelJobId : cancelJobIds) {
                    engineClient.getJobClient().cancelJob(Long.parseLong(cancelJobId));
                }
            } else if (null != clientCommandArgs.getMetricsJobId()) {
                String jobMetrics =
                        engineClient
                                .getJobClient()
                                .getJobMetrics(Long.parseLong(clientCommandArgs.getMetricsJobId()));
                System.out.println(jobMetrics);
            } else if (null != clientCommandArgs.getSavePointJobId()) {
                engineClient
                        .getJobClient()
                        .savePointJob(Long.parseLong(clientCommandArgs.getSavePointJobId()));
            } else {
                Path configFile = FileUtils.getConfigPath(clientCommandArgs);
                checkConfigExist(configFile);
                JobConfig jobConfig = new JobConfig();
                ClientJobExecutionEnvironment jobExecutionEnv;
                jobConfig.setName(clientCommandArgs.getJobName());
                if (null != clientCommandArgs.getRestoreJobId()) {
                    jobExecutionEnv =
                            engineClient.restoreExecutionContext(
                                    configFile.toString(),
                                    clientCommandArgs.getVariables(),
                                    jobConfig,
                                    seaTunnelConfig,
                                    Long.parseLong(clientCommandArgs.getRestoreJobId()));
                } else {
                    jobExecutionEnv =
                            engineClient.createExecutionContext(
                                    configFile.toString(),
                                    clientCommandArgs.getVariables(),
                                    jobConfig,
                                    seaTunnelConfig,
                                    clientCommandArgs.getCustomJobId() != null
                                            ? Long.parseLong(clientCommandArgs.getCustomJobId())
                                            : null);
                }

                // get job start time
                startTime = LocalDateTime.now();
                // create job proxy
                ClientJobProxy clientJobProxy = jobExecutionEnv.execute();
                if (clientCommandArgs.isAsync()) {
                    if (isLocalMode) {
                        log.warn("The job is running in local mode, can not use async mode.");
                    } else {
                        return;
                    }
                }
                // register cancelJob hook
                Runtime.getRuntime()
                        .addShutdownHook(
                                new Thread(
                                        () -> {
                                            CompletableFuture<Void> future =
                                                    CompletableFuture.runAsync(
                                                            () -> {
                                                                log.info(
                                                                        "run shutdown hook because get close signal");
                                                                shutdownHook(clientJobProxy);
                                                            });
                                            try {
                                                future.get(15, TimeUnit.SECONDS);
                                            } catch (Exception e) {
                                                log.error("Cancel job failed.", e);
                                            }
                                        }));
                // get job id
                long jobId = clientJobProxy.getJobId();
                JobMetricsRunner jobMetricsRunner = new JobMetricsRunner(engineClient, jobId);
                executorService =
                        Executors.newScheduledThreadPool(
                                2,
                                new ThreadFactoryBuilder()
                                        .setNameFormat("job-metrics-runner-%d")
                                        .setDaemon(true)
                                        .build());
                executorService.scheduleAtFixedRate(
                        jobMetricsRunner,
                        0,
                        seaTunnelConfig.getEngineConfig().getPrintJobMetricsInfoInterval(),
                        TimeUnit.SECONDS);

                if (!isLocalMode) {
                    // LOCAL mode does not require running the job status runner
                    executorService.schedule(
                            new JobStatusRunner(engineClient.getJobClient(), jobId),
                            0,
                            TimeUnit.SECONDS);
                }

                // wait for job complete
                JobResult jobResult = clientJobProxy.waitForJobCompleteV2();
                jobStatus = jobResult.getStatus();
                if (StringUtils.isNotEmpty(jobResult.getError())
                        || jobResult.getStatus().equals(JobStatus.FAILED)) {
                    throw new SeaTunnelEngineException(jobResult.getError());
                }
                // get job end time
                endTime = LocalDateTime.now();
                // get job statistic information when job finished
                jobMetricsSummary = engineClient.getJobMetricsSummary(jobId);
            }
        } catch (Exception e) {
            throw new CommandExecuteException("SeaTunnel job executed failed", e);
        } finally {
            if (jobMetricsSummary != null) {
                // print job statistics information when job finished
                log.info(
                        StringFormatUtils.formatTable(
                                "Job Statistic Information",
                                "Start Time",
                                DateTimeUtils.toString(
                                        startTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
                                "End Time",
                                DateTimeUtils.toString(
                                        endTime, DateTimeUtils.Formatter.YYYY_MM_DD_HH_MM_SS),
                                "Total Time(s)",
                                Duration.between(startTime, endTime).getSeconds(),
                                "Total Read Count",
                                jobMetricsSummary.getSourceReadCount(),
                                "Total Write Count",
                                jobMetricsSummary.getSinkWriteCount(),
                                "Total Failed Count",
                                jobMetricsSummary.getSourceReadCount()
                                        - jobMetricsSummary.getSinkWriteCount()));
            }
            closeClient();
        }
    }