in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java [119:141]
public DynamicTableSource createDynamicTableSource(Context context) {
final ReadableConfig config = getValidatedConfig(context);
final String tableName =
config.getOptional(TABLE_NAME)
.orElse(context.getObjectIdentifier().getObjectName());
final KuduTableInfo tableInfo =
KuduTableUtils.createTableInfo(
tableName,
context.getCatalogTable().getResolvedSchema(),
context.getCatalogTable().toProperties());
final KuduReaderConfig.Builder readerConfigBuilder =
KuduReaderConfig.Builder.setMasters(config.get(MASTERS))
.setRowLimit(config.get(SCAN_ROW_SIZE));
return new KuduDynamicTableSource(
readerConfigBuilder,
tableInfo,
context.getPhysicalRowDataType(),
config.get(LookupOptions.MAX_RETRIES),
getLookupCache(config));
}