in streampark-console/streampark-console-service/src/main/java/org/apache/streampark/console/core/service/impl/ResourceServiceImpl.java [268:370]
public RestResponse checkResource(Resource resourceParam) throws JsonProcessingException {
ResourceType type = resourceParam.getResourceType();
Map<String, Serializable> resp = new HashMap<>(0);
resp.put("state", 0);
switch (type) {
case FLINK_APP:
// check main.
File jarFile;
try {
jarFile = getResourceJar(resourceParam);
} catch (Exception e) {
// get jarFile error
resp.put("state", 1);
resp.put("exception", Utils.stringifyException(e));
return RestResponse.success().data(resp);
}
Manifest manifest = Utils.getJarManifest(jarFile);
String mainClass = manifest.getMainAttributes().getValue("Main-Class");
if (mainClass == null) {
// main class is null
resp.put("state", 2);
return RestResponse.success().data(resp);
}
return RestResponse.success().data(resp);
case CONNECTOR:
// 1) get connector id
FlinkConnectorResource connectorResource;
ApiAlertException.throwIfFalse(
ResourceType.CONNECTOR.equals(resourceParam.getResourceType()),
"getConnectorId method error, resource not flink connector.");
List<File> jars;
File connector = null;
List<String> factories;
Dependency dependency = Dependency.toDependency(resourceParam.getResource());
// 1) get connector jar
if (!dependency.getPom().isEmpty()) {
Artifact artifact = dependency.toArtifact().get(0);
try {
jars = MavenTool.resolveArtifacts(artifact);
} catch (Exception e) {
// connector download is null
resp.put("state", 1);
resp.put("exception", Utils.stringifyException(e));
return RestResponse.success().data(resp);
}
String fileName = String.format("%s-%s.jar", artifact.artifactId(), artifact.version());
Optional<File> file = jars.stream().filter(x -> x.getName().equals(fileName)).findFirst();
if (file.isPresent()) {
connector = file.get();
}
} else {
// 2) jar
String jar = dependency.getJar().get(0).split(":")[1];
File file = new File(jar);
connector = file;
jars = Collections.singletonList(file);
}
// 2) parse connector Factory
try {
factories = getConnectorFactory(connector);
} catch (Exception e) {
// flink connector invalid
resp.put("state", 2);
resp.put("exception", Utils.stringifyException(e));
return RestResponse.success().data(resp);
}
// 3) get connector resource
connectorResource = getConnectorResource(jars, factories);
if (connectorResource == null) {
// connector is null
resp.put("state", 3);
return RestResponse.success().data(resp);
}
// 2) check connector exists
boolean exists =
existsFlinkConnector(resourceParam.getId(), connectorResource.getFactoryIdentifier());
if (exists) {
resp.put("state", 4);
resp.put("name", connectorResource.getFactoryIdentifier());
return RestResponse.success(resp);
}
if (resourceParam.getId() != null) {
Resource resource = getById(resourceParam.getId());
if (!resource.getResourceName().equals(connectorResource.getFactoryIdentifier())) {
resp.put("state", 5);
return RestResponse.success().data(resp);
}
}
resp.put("state", 0);
resp.put("connector", JacksonUtils.write(connectorResource));
return RestResponse.success().data(resp);
}
return RestResponse.success().data(resp);
}