in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/JobTaskServiceImpl.java [308:395]
private JobTaskCheckRes checkNextTaskSchema(
PluginConfig config,
PluginConfig nextConfig,
Map<String, PluginConfig> pluginMap,
Map<String, String> edgeMap)
throws IOException {
Map<String, Object> options = nextConfig.getTransformOptions();
if (options != null && !options.isEmpty()) {
Transform transform = Transform.valueOf(nextConfig.getConnectorType().toUpperCase());
String transformOptionsStr = OBJECT_MAPPER.writeValueAsString(options);
List<TransformOption> transformOptions = new ArrayList<>();
switch (transform) {
case FIELDMAPPER:
FieldMapperTransformOptions fieldMapperTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (fieldMapperTransformOptions != null) {
fillTransformOptions(
transformOptions, fieldMapperTransformOptions.getRenameFields());
fillTransformOptions(
transformOptions, fieldMapperTransformOptions.getChangeOrders());
}
break;
case MULTIFIELDSPLIT:
SplitTransformOptions splitTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (splitTransformOptions != null) {
fillTransformOptions(transformOptions, splitTransformOptions.getSplits());
}
break;
case COPY:
CopyTransformOptions copyTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (copyTransformOptions != null) {
fillTransformOptions(transformOptions, copyTransformOptions.getCopyList());
}
break;
case SQL:
SQLTransformOptions sqlTransformOptions =
getTransformOption(transform, transformOptionsStr);
if (sqlTransformOptions != null) {
fillTransformOptions(
transformOptions,
Collections.singletonList(sqlTransformOptions.getSql()));
}
break;
case FILTERROWKIND:
case REPLACE:
default:
throw new SeatunnelException(
SeatunnelErrorEnum.UNSUPPORTED_CONNECTOR_TYPE,
"Unsupported Transform Option " + transform);
}
if (!transformOptions.isEmpty()) {
DatabaseTableSchemaReq databaseTableSchemaReq = config.getOutputSchema().get(0);
List<String> fields =
databaseTableSchemaReq.getFields().stream()
.map(TableField::getName)
.collect(Collectors.toList());
Optional<TransformOption> transformOption =
transformOptions.stream()
.filter(option -> !fields.contains(option.getSourceFieldName()))
.findFirst();
if (transformOption.isPresent()) {
String sourceFiledName = transformOption.get().getSourceFieldName();
return new JobTaskCheckRes(
false,
nextConfig.getPluginId(),
new SchemaError(
databaseTableSchemaReq.getDatabase(),
databaseTableSchemaReq.getTableName(),
sourceFiledName,
SchemaErrorType.MISS_FIELD),
null);
}
}
}
if (edgeMap.containsKey(nextConfig.getPluginId())) {
return checkNextTaskSchema(
nextConfig,
pluginMap.get(edgeMap.get(nextConfig.getPluginId())),
pluginMap,
edgeMap);
}
return null;
}