in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java [105:185]
private void checkConfigIntegrity(JobVersion version, JobTaskInfo jobTaskInfo) {
if (StringUtils.isEmpty(version.getEnv())) {
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
"job env can't be empty, please change config");
}
Map<String, PluginConfig> pluginConfigMap =
jobTaskInfo.getPlugins().stream()
.collect(Collectors.toMap(PluginConfig::getPluginId, Function.identity()));
List<String> allPluginIdsFromEdge =
Stream.concat(
jobTaskInfo.getEdges().stream().map(Edge::getInputPluginId),
jobTaskInfo.getEdges().stream().map(Edge::getTargetPluginId))
.collect(Collectors.toList());
jobTaskInfo.getPlugins().stream()
.filter(p -> !allPluginIdsFromEdge.contains(p.getPluginId()))
.findAny()
.ifPresent(
p -> {
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
"plugin '" + p.getName() + "' is not used in any edge");
});
jobTaskInfo
.getEdges()
.forEach(
e -> {
jobTaskInfo.getPlugins().stream()
.filter(
p ->
Objects.equals(
p.getPluginId(), e.getInputPluginId()))
.findFirst()
.orElseThrow(
() ->
new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
"input plugin not found"));
jobTaskInfo.getPlugins().stream()
.filter(
p ->
Objects.equals(
p.getPluginId(), e.getTargetPluginId()))
.findFirst()
.orElseThrow(
() ->
new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
"target plugin not found"));
});
List<String> inputTransformId =
jobTaskInfo.getEdges().stream()
.map(Edge::getInputPluginId)
.filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM))
.collect(Collectors.toList());
List<String> targetTransformId =
jobTaskInfo.getEdges().stream()
.map(Edge::getTargetPluginId)
.filter(e -> pluginConfigMap.get(e).getType().equals(PluginType.TRANSFORM))
.collect(Collectors.toList());
if (!new HashSet<>(inputTransformId).containsAll(targetTransformId)
|| !new HashSet<>(targetTransformId).containsAll(inputTransformId)) {
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG, "transform plugin must be connected");
}
for (Edge edge : jobTaskInfo.getEdges()) {
if (!pluginTypeMatch(
pluginConfigMap.get(edge.getInputPluginId()).getType(),
pluginConfigMap.get(edge.getTargetPluginId()).getType())) {
throw new SeatunnelException(
SeatunnelErrorEnum.ERROR_CONFIG,
"plugin line not match, please check plugin line");
}
}
}