in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/AdbpgDynamicTableFactory.java [90:112]
public DynamicTableSource createDynamicTableSource(Context context) {
LOG.info("Start to create adbpg source.");
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig config = helper.getOptions();
LOG.info("Try to get and validate configuration.");
TableSchema tableSchema =
TableSchemaUtils.getPhysicalSchema(context.getCatalogTable().getSchema());
int fieldNum = tableSchema.getFieldCount();
String[] fieldNamesStr = new String[fieldNum];
for (int i = 0; i < fieldNum; i++) {
fieldNamesStr[i] = tableSchema.getFieldName(i).get();
}
LogicalType[] lts = new LogicalType[fieldNum];
for (int i = 0; i < fieldNum; i++) {
lts[i] = tableSchema.getFieldDataType(i).get().getLogicalType();
}
AdbpgOptions.validateSource(config, tableSchema);
LOG.info("Validation passed, adbpg source created successfully.");
return new AdbpgDynamicTableSource(fieldNum, fieldNamesStr, lts, config, tableSchema);
}