in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java [316:409]
private JobTaskCheckRes checkNextTaskSchema(
PluginConfig config,
PluginConfig nextConfig,
Map<String, PluginConfig> pluginMap,
Map<String, List<String>> edgeMap)
throws IOException {
Map<String, Object> options = nextConfig.getTransformOptions();
if (options != null && !options.isEmpty()) {
Transform transform = Transform.valueOf(nextConfig.getConnectorType().toUpperCase());
String transformOptionsStr = JsonUtils.toJsonString(options);
List<TransformOption> transformOptions = new ArrayList<>();
switch (transform) {
case FIELDMAPPER:
FieldMapperTransformOptions fieldMapperTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (fieldMapperTransformOptions != null) {
fillTransformOptions(
transformOptions, fieldMapperTransformOptions.getRenameFields());
fillTransformOptions(
transformOptions, fieldMapperTransformOptions.getChangeOrders());
}
break;
case MULTIFIELDSPLIT:
SplitTransformOptions splitTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (splitTransformOptions != null) {
fillTransformOptions(transformOptions, splitTransformOptions.getSplits());
}
break;
case COPY:
CopyTransformOptions copyTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (copyTransformOptions != null) {
fillTransformOptions(transformOptions, copyTransformOptions.getCopyList());
}
break;
case SQL:
SQLTransformOptions sqlTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (sqlTransformOptions != null) {
fillTransformOptions(
transformOptions,
Collections.singletonList(sqlTransformOptions.getSql()));
}
break;
case JSONPATH:
break;
case FILTERROWKIND:
case REPLACE:
default:
throw new SeatunnelException(
SeatunnelErrorEnum.UNSUPPORTED_CONNECTOR_TYPE,
"Unsupported Transform Option " + transform);
}
if (!transformOptions.isEmpty()) {
DatabaseTableSchemaReq databaseTableSchemaReq = config.getOutputSchema().get(0);
List<String> fields =
databaseTableSchemaReq.getFields().stream()
.map(TableField::getName)
.collect(Collectors.toList());
Optional<TransformOption> transformOption =
transformOptions.stream()
.filter(option -> !fields.contains(option.getSourceFieldName()))
.findFirst();
if (transformOption.isPresent()) {
String sourceFiledName = transformOption.get().getSourceFieldName();
return new JobTaskCheckRes(
false,
nextConfig.getPluginId(),
new SchemaError(
databaseTableSchemaReq.getDatabase(),
databaseTableSchemaReq.getTableName(),
sourceFiledName,
SchemaErrorType.MISS_FIELD),
null);
}
}
}
List<String> nextConfigIds = edgeMap.get(nextConfig.getPluginId());
if (nextConfigIds != null) {
for (String nextConfigId : nextConfigIds) {
JobTaskCheckRes res =
checkNextTaskSchema(
nextConfig, pluginMap.get(nextConfigId), pluginMap, edgeMap);
if (res != null) {
return res;
}
}
}
return null;
}