public DynamicTableSource createDynamicTableSource()

in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableFactory.java [119:141]


    public DynamicTableSource createDynamicTableSource(Context context) {
        final ReadableConfig config = getValidatedConfig(context);

        final String tableName =
                config.getOptional(TABLE_NAME)
                        .orElse(context.getObjectIdentifier().getObjectName());
        final KuduTableInfo tableInfo =
                KuduTableUtils.createTableInfo(
                        tableName,
                        context.getCatalogTable().getResolvedSchema(),
                        context.getCatalogTable().toProperties());

        final KuduReaderConfig.Builder readerConfigBuilder =
                KuduReaderConfig.Builder.setMasters(config.get(MASTERS))
                        .setRowLimit(config.get(SCAN_ROW_SIZE));

        return new KuduDynamicTableSource(
                readerConfigBuilder,
                tableInfo,
                context.getPhysicalRowDataType(),
                config.get(LookupOptions.MAX_RETRIES),
                getLookupCache(config));
    }