public DynamicTableSource createDynamicTableSource()

in flink-connector-kudu/src/main/java/org/apache/flink/connectors/kudu/table/dynamic/KuduDynamicTableSourceSinkFactory.java [197:222]


    public DynamicTableSource createDynamicTableSource(Context context) {
        ReadableConfig config = getReadableConfig(context);
        String masterAddresses = config.get(KUDU_MASTERS);

        int scanRowSize = config.get(KUDU_SCAN_ROW_SIZE);
        long kuduCacheMaxRows = config.get(KUDU_LOOKUP_CACHE_MAX_ROWS);
        long kuduCacheTtl = config.get(KUDU_LOOKUP_CACHE_TTL);
        int kuduMaxReties = config.get(KUDU_LOOKUP_MAX_RETRIES);

        // build kudu lookup options
        KuduLookupOptions kuduLookupOptions = KuduLookupOptions.Builder.options().withCacheMaxSize(kuduCacheMaxRows)
                .withCacheExpireMs(kuduCacheTtl)
                .withMaxRetryTimes(kuduMaxReties)
                .build();

        TableSchema schema = context.getCatalogTable().getSchema();
        TableSchema physicalSchema = KuduTableUtils.getSchemaWithSqlTimestamp(schema);
        KuduTableInfo tableInfo = KuduTableUtils.createTableInfo(config.get(KUDU_TABLE), schema,
                context.getCatalogTable().toProperties());

        KuduReaderConfig.Builder configBuilder = KuduReaderConfig.Builder
                .setMasters(masterAddresses)
                .setRowLimit(scanRowSize);
        return new KuduDynamicTableSource(configBuilder, tableInfo, physicalSchema, physicalSchema.getFieldNames(),
                kuduLookupOptions);
    }