public static void main()

in holo-utils/find-incompatible-flink-jobs/src/main/java/com/alibaba/hologres/FindIncompatibleFlinkJobs.java [29:141]


    public static void main(String[] args) throws Exception {
        if (args.length != 5) {
            System.out.println("Need 5 params. Please usage: java -cp find-incompatible-flink-jobs-1.0.jar " +
                    "com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <workspace_url> <access_key_id> <access_key_secret> <binlog/rpc>");
            return;
        }
        String endpoint = REGION.get(args[0]);
        if (endpoint == null) {
            System.out.println("Invalid region");
            return;
        }
        String[] extractedResults = extractWorkspaceAndNamespace(args[1]);
        if (extractedResults == null) {
            System.out.println("Invalid url");
            return;
        }
        String workspaceId = extractedResults[0];
        String namespace = extractedResults[1];

        Config config = new com.aliyun.teaopenapi.models.Config();
        config.setEndpoint(endpoint);
        config.setAccessKeyId(args[2]);
        config.setAccessKeySecret(args[3]);

        // 检查binlog的可能不兼容作业,还是检查rpc模式的可能不兼容作业
        boolean binlog = "binlog".equals(args[4]);

        Client client = new Client(config);

        for (int pageIndex = 1; pageIndex <= 20; pageIndex++) {
            // 获取已部署作业列表。
            ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
            listDeploymentsRequest.setPageIndex(pageIndex);
            listDeploymentsRequest.setPageSize(100);
            ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
            listDeploymentsHeaders.setWorkspace(workspaceId);
            ListDeploymentsResponse listDeploymentsResponse =
                    client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());

            if (listDeploymentsResponse.getBody() == null) {
                System.out.println("No deployments");
            } else if (listDeploymentsResponse.getBody().getData() == null && listDeploymentsResponse.getBody().getErrorMessage() != null) {
                System.out.println(listDeploymentsResponse.getBody().getErrorMessage());
                break;
            }
            for (Deployment deployment : listDeploymentsResponse.getBody().getData()) {
                String deploymentName = deployment.getName();
                String deploymentVersion = deployment.getEngineVersion();

                // 只看运行中的作业
                if (deployment.getJobSummary().getRunning() != 1) {
                    System.out.println(deploymentName + " not a running job, skip");
                    continue;
                }
                // 只看sql作业, jar作业需要用户自行确认
                if (!"sqlScript".equalsIgnoreCase(deployment.getArtifact().getKind())) {
                    System.out.println(deploymentName + " not sql job, skip");
                    continue;
                }
                // vvr-8.0.5-flink-1.17 版本之后,会有自动切换源表消费binlog为jdbc模式的逻辑
                if (deploymentVersion.compareTo("vvr-8.0.5-flink-1.17") >= 0) {
                    System.out.println(deploymentName + " check binlog/rpc: version is " + deploymentVersion + ", skip");
                    continue;
                }
                // vvr-6.0.7-flink-1.15 版本之后,会有自动切换rpc模式为jdbc模式的逻辑
                if (!binlog && deploymentVersion.compareTo("vvr-6.0.7-flink-1.15") >= 0) {
                    System.out.println(deploymentName + " check rpc: version is " + deploymentVersion + ", skip");
                    continue;
                }
                // 获取作业实例列表。
                ListJobsRequest listJobsRequest = new ListJobsRequest();
                listJobsRequest.setDeploymentId(deployment.getDeploymentId());
                listJobsRequest.setPageIndex(1);
                listJobsRequest.setPageSize(100);
                ListJobsHeaders listJobsHeaders = new ListJobsHeaders();
                listJobsHeaders.setWorkspace(workspaceId);
                ListJobsResponse listJobsResponse =
                        client.listJobsWithOptions(namespace, listJobsRequest, listJobsHeaders, new RuntimeOptions());

                for (int i = 0; i < listJobsResponse.getBody().getData().size(); i++) {
                    Job job = listJobsResponse.getBody().getData().get(i);
                    // 只看运行中的作业
                    if (!"RUNNING".equalsIgnoreCase(job.getStatus().getCurrentJobStatus())) {
                        continue;
                    }

                    // 获取作业实例。
                    GetJobHeaders getJobHeaders = new GetJobHeaders();
                    getJobHeaders.setWorkspace(workspaceId);
                    GetJobResponse getJobResponse =
                            client.getJobWithOptions(namespace, job.jobId, getJobHeaders, new RuntimeOptions());

                    String sql = getJobResponse.getBody().getData().getArtifact().getSqlArtifact().sqlScript;
                    if (binlog) {
                        checkBinlog(sql, deploymentName, deploymentVersion);
                    } else {
                        checkRpc(sql, deploymentName, deploymentVersion);
                    }
                }
            }
        }

        if (binlog) {
            System.out.println("\n\n--------------------以下是版本还小于8.0.5,也没有设置sdkmode = jdbc的hologres binlog 源表--------------------");
        } else {
            System.out.println("\n\n--------------------以下是版本还小于6.0.7,也没有设置sdkmode = jdbc的hologres rpc 维表/结果表----------------");
        }
        System.out.println("-------------------------------------------------------------------------------------------------------------");
        System.out.println("deploymentName                       version                              tableName");
        for (IncompatibleResult badResult : incompatibleResults) {
            System.out.println(badResult.deploymentName + " " + badResult.deploymentVersion + " " + badResult.tableName);
        }
    }