in holo-utils/find-incompatible-flink-jobs/src/main/java/com/alibaba/hologres/FindIncompatibleFlinkJobs.java [29:141]
public static void main(String[] args) throws Exception {
if (args.length != 5) {
System.out.println("Need 5 params. Please usage: java -cp find-incompatible-flink-jobs-1.0.jar " +
"com.alibaba.hologres.FindIncompatibleFlinkJobs <region> <workspace_url> <access_key_id> <access_key_secret> <binlog/rpc>");
return;
}
String endpoint = REGION.get(args[0]);
if (endpoint == null) {
System.out.println("Invalid region");
return;
}
String[] extractedResults = extractWorkspaceAndNamespace(args[1]);
if (extractedResults == null) {
System.out.println("Invalid url");
return;
}
String workspaceId = extractedResults[0];
String namespace = extractedResults[1];
Config config = new com.aliyun.teaopenapi.models.Config();
config.setEndpoint(endpoint);
config.setAccessKeyId(args[2]);
config.setAccessKeySecret(args[3]);
// 检查binlog的可能不兼容作业,还是检查rpc模式的可能不兼容作业
boolean binlog = "binlog".equals(args[4]);
Client client = new Client(config);
for (int pageIndex = 1; pageIndex <= 20; pageIndex++) {
// 获取已部署作业列表。
ListDeploymentsRequest listDeploymentsRequest = new ListDeploymentsRequest();
listDeploymentsRequest.setPageIndex(pageIndex);
listDeploymentsRequest.setPageSize(100);
ListDeploymentsHeaders listDeploymentsHeaders = new ListDeploymentsHeaders();
listDeploymentsHeaders.setWorkspace(workspaceId);
ListDeploymentsResponse listDeploymentsResponse =
client.listDeploymentsWithOptions(namespace, listDeploymentsRequest, listDeploymentsHeaders, new RuntimeOptions());
if (listDeploymentsResponse.getBody() == null) {
System.out.println("No deployments");
} else if (listDeploymentsResponse.getBody().getData() == null && listDeploymentsResponse.getBody().getErrorMessage() != null) {
System.out.println(listDeploymentsResponse.getBody().getErrorMessage());
break;
}
for (Deployment deployment : listDeploymentsResponse.getBody().getData()) {
String deploymentName = deployment.getName();
String deploymentVersion = deployment.getEngineVersion();
// 只看运行中的作业
if (deployment.getJobSummary().getRunning() != 1) {
System.out.println(deploymentName + " not a running job, skip");
continue;
}
// 只看sql作业, jar作业需要用户自行确认
if (!"sqlScript".equalsIgnoreCase(deployment.getArtifact().getKind())) {
System.out.println(deploymentName + " not sql job, skip");
continue;
}
// vvr-8.0.5-flink-1.17 版本之后,会有自动切换源表消费binlog为jdbc模式的逻辑
if (deploymentVersion.compareTo("vvr-8.0.5-flink-1.17") >= 0) {
System.out.println(deploymentName + " check binlog/rpc: version is " + deploymentVersion + ", skip");
continue;
}
// vvr-6.0.7-flink-1.15 版本之后,会有自动切换rpc模式为jdbc模式的逻辑
if (!binlog && deploymentVersion.compareTo("vvr-6.0.7-flink-1.15") >= 0) {
System.out.println(deploymentName + " check rpc: version is " + deploymentVersion + ", skip");
continue;
}
// 获取作业实例列表。
ListJobsRequest listJobsRequest = new ListJobsRequest();
listJobsRequest.setDeploymentId(deployment.getDeploymentId());
listJobsRequest.setPageIndex(1);
listJobsRequest.setPageSize(100);
ListJobsHeaders listJobsHeaders = new ListJobsHeaders();
listJobsHeaders.setWorkspace(workspaceId);
ListJobsResponse listJobsResponse =
client.listJobsWithOptions(namespace, listJobsRequest, listJobsHeaders, new RuntimeOptions());
for (int i = 0; i < listJobsResponse.getBody().getData().size(); i++) {
Job job = listJobsResponse.getBody().getData().get(i);
// 只看运行中的作业
if (!"RUNNING".equalsIgnoreCase(job.getStatus().getCurrentJobStatus())) {
continue;
}
// 获取作业实例。
GetJobHeaders getJobHeaders = new GetJobHeaders();
getJobHeaders.setWorkspace(workspaceId);
GetJobResponse getJobResponse =
client.getJobWithOptions(namespace, job.jobId, getJobHeaders, new RuntimeOptions());
String sql = getJobResponse.getBody().getData().getArtifact().getSqlArtifact().sqlScript;
if (binlog) {
checkBinlog(sql, deploymentName, deploymentVersion);
} else {
checkRpc(sql, deploymentName, deploymentVersion);
}
}
}
}
if (binlog) {
System.out.println("\n\n--------------------以下是版本还小于8.0.5,也没有设置sdkmode = jdbc的hologres binlog 源表--------------------");
} else {
System.out.println("\n\n--------------------以下是版本还小于6.0.7,也没有设置sdkmode = jdbc的hologres rpc 维表/结果表----------------");
}
System.out.println("-------------------------------------------------------------------------------------------------------------");
System.out.println("deploymentName version tableName");
for (IncompatibleResult badResult : incompatibleResults) {
System.out.println(badResult.deploymentName + " " + badResult.deploymentVersion + " " + badResult.tableName);
}
}