public DynamicTableSource createDynamicTableSource()

in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/AdbpgDynamicTableFactory.java [90:112]


    public DynamicTableSource createDynamicTableSource(Context context) {
        LOG.info("Start to create adbpg source.");
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        final ReadableConfig config = helper.getOptions();

        LOG.info("Try to get and validate configuration.");
        TableSchema tableSchema =
                TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
        int fieldNum = tableSchema.getFieldCount();
        String[] fieldNamesStr = new String[fieldNum];
        for (int i = 0; i < fieldNum; i++) {
            fieldNamesStr[i] = tableSchema.getFieldName(i).get();
        }
        LogicalType[] lts = new LogicalType[fieldNum];
        for (int i = 0; i < fieldNum; i++) {
            lts[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
        }

        AdbpgOptions.validateSource(config, tableSchema);
        LOG.info("Validation passed, adbpg source created successfully.");
        return new AdbpgDynamicTableSource(fieldNum, fieldNamesStr, lts, config, tableSchema);
    }