public DynamicTableSource createDynamicTableSource()

in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java [72:112]


    public DynamicTableSource createDynamicTableSource(Context context) {
        TableFactoryHelper helper = createTableFactoryHelper(this, context);
        helper.validateExcept(PROPERTIES_PREFIX);

        final ReadableConfig tableOptions = helper.getOptions();

        validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());

        String tableName = tableOptions.get(TABLE_NAME);
        Configuration hbaseConf = getHBaseConfiguration(tableOptions);
        String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
        HBaseTableSchema hbaseSchema =
                HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());

        LookupCache cache = null;

        // Backward compatible to legacy caching options
        if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
                && tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) {
            cache =
                    DefaultLookupCache.newBuilder()
                            .maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
                            .expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
                            .build();
        }

        if (tableOptions
                .get(LookupOptions.CACHE_TYPE)
                .equals(LookupOptions.LookupCacheType.PARTIAL)) {
            cache = DefaultLookupCache.fromConfig(tableOptions);
        }

        return new HBaseDynamicTableSource(
                hbaseConf,
                tableName,
                hbaseSchema,
                nullStringLiteral,
                tableOptions.get(LookupOptions.MAX_RETRIES),
                tableOptions.get(LOOKUP_ASYNC),
                cache);
    }