public TableSchemaReq derivationSQL()

in seatunnel-server/seatunnel-app/src/main/java/org/apache/seatunnel/app/service/impl/SchemaDerivationServiceImpl.java [66:136]


    public TableSchemaReq derivationSQL(long jobVersionId, String inputPluginId, SQL sql) {

        PluginConfig pluginConfig = jobTaskService.getSingleTask(jobVersionId, inputPluginId);
        TableTransformFactory factory =
                FactoryUtil.discoverFactory(
                        Thread.currentThread().getContextClassLoader(),
                        TableTransformFactory.class,
                        "Sql");
        List<DatabaseTableSchemaReq> tableSchemaReqs = pluginConfig.getOutputSchema();
        if (tableSchemaReqs.isEmpty()) {
            throw new IllegalArgumentException("outputSchema is empty, please add input plugin");
        }
        DatabaseTableSchemaReq tableSchema = tableSchemaReqs.get(0);
        TableSchema.Builder builder = TableSchema.builder();
        List<String> primaryKeys = new ArrayList<>();
        for (TableField f : tableSchema.getFields()) {
            if (f.getPrimaryKey()) {
                primaryKeys.add(f.getName());
            }
            builder.column(
                    PhysicalColumn.of(
                            f.getName(),
                            stringToDataType(f.getOutputDataType()),
                            0,
                            f.getNullable(),
                            f.getDefaultValue(),
                            f.getComment()));
        }
        builder.primaryKey(PrimaryKey.of("PrimaryKeys", primaryKeys));

        CatalogTable table =
                CatalogTable.of(
                        TableIdentifier.of(
                                "default", tableSchema.getDatabase(), tableSchema.getTableName()),
                        builder.build(),
                        Collections.emptyMap(),
                        Collections.emptyList(),
                        tableSchema.getTableName());
        Map<String, Object> config = new HashMap<>();
        config.put(SQLTransform.KEY_QUERY.key(), sql.getQuery());
        TableTransformFactoryContext context =
                new TableTransformFactoryContext(
                        Collections.singletonList(table),
                        ReadonlyConfig.fromMap(config),
                        Thread.currentThread().getContextClassLoader());
        TableTransform<SeaTunnelRow> transform = factory.createTransform(context);
        SQLTransform sqlTransform = (SQLTransform) transform.createTransform();
        CatalogTable result = sqlTransform.getProducedCatalogTable();
        List<String> primaryKeysList = new ArrayList<>();
        if (result.getTableSchema().getPrimaryKey() != null) {
            primaryKeysList.addAll(result.getTableSchema().getPrimaryKey().getColumnNames());
        }
        List<TableField> fields = new ArrayList<>();
        for (Column column : result.getTableSchema().getColumns()) {
            TableField field = new TableField();
            field.setName(column.getName());
            field.setComment(column.getComment());
            field.setDefaultValue(
                    column.getDefaultValue() != null ? column.getDefaultValue().toString() : null);
            field.setNullable(column.isNullable());
            field.setOutputDataType(column.getDataType().toString());
            field.setPrimaryKey(primaryKeysList.contains(column.getName()));
            field.setType(column.getDataType().toString());
            fields.add(field);
        }

        TableSchemaReq tableSchemaRes = new TableSchemaReq();
        tableSchemaRes.setFields(fields);
        tableSchemaRes.setTableName(tableSchema.getTableName());
        return tableSchemaRes;
    }