in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java [66:136]
public TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL sql) {
PluginConfig pluginConfig = jobTaskService.getSingleTask(jobVersionId, inputPluginId);
TableTransformFactory factory =
FactoryUtil.discoverFactory(
Thread.currentThread().getContextClassLoader(),
TableTransformFactory.class,
"Sql");
List<DatabaseTableSchemaReq> tableSchemaReqs = pluginConfig.getOutputSchema();
if (tableSchemaReqs.isEmpty()) {
throw new IllegalArgumentException("outputSchema is empty, please add input plugin");
}
DatabaseTableSchemaReq tableSchema = tableSchemaReqs.get(0);
TableSchema.Builder builder = TableSchema.builder();
List<String> primaryKeys = new ArrayList<>();
for (TableField f : tableSchema.getFields()) {
if (f.getPrimaryKey()) {
primaryKeys.add(f.getName());
}
builder.column(
PhysicalColumn.of(
f.getName(),
stringToDataType(f.getOutputDataType()),
0,
f.getNullable(),
f.getDefaultValue(),
f.getComment()));
}
builder.primaryKey(PrimaryKey.of("PrimaryKeys", primaryKeys));
CatalogTable table =
CatalogTable.of(
TableIdentifier.of(
"default", tableSchema.getDatabase(), tableSchema.getTableName()),
builder.build(),
Collections.emptyMap(),
Collections.emptyList(),
tableSchema.getTableName());
Map<String, Object> config = new HashMap<>();
config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery());
TableTransformFactoryContext context =
new TableTransformFactoryContext(
Collections.singletonList(table),
ReadonlyConfig.fromMap(config),
Thread.currentThread().getContextClassLoader());
TableTransform<SeaTunnelRow> transform = factory.createTransform(context);
SQLTransform sqlTransform = (SQLTransform) transform.createTransform();
CatalogTable result = sqlTransform.getProducedCatalogTable();
List<String> primaryKeysList = new ArrayList<>();
if (result.getTableSchema().getPrimaryKey() != null) {
primaryKeysList.addAll(result.getTableSchema().getPrimaryKey().getColumnNames());
}
List<TableField> fields = new ArrayList<>();
for (Column column : result.getTableSchema().getColumns()) {
TableField field = new TableField();
field.setName(column.getName());
field.setComment(column.getComment());
field.setDefaultValue(
column.getDefaultValue() != null ? column.getDefaultValue().toString() : null);
field.setNullable(column.isNullable());
field.setOutputDataType(column.getDataType().toString());
field.setPrimaryKey(primaryKeysList.contains(column.getName()));
field.setType(column.getDataType().toString());
fields.add(field);
}
TableSchemaReq tableSchemaRes = new TableSchemaReq();
tableSchemaRes.setFields(fields);
tableSchemaRes.setTableName(tableSchema.getTableName());
return tableSchemaRes;
}