in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/HBase2DynamicTableFactory.java [71:111]
public DynamicTableSource createDynamicTableSource(Context context) {
TableFactoryHelper helper = createTableFactoryHelper(this, context);
helper.validateExcept(PROPERTIES_PREFIX);
final ReadableConfig tableOptions = helper.getOptions();
validatePrimaryKey(context.getPhysicalRowDataType(), context.getPrimaryKeyIndexes());
String tableName = tableOptions.get(TABLE_NAME);
Configuration hbaseConf = getHBaseConfiguration(tableOptions);
String nullStringLiteral = tableOptions.get(NULL_STRING_LITERAL);
HBaseTableSchema hbaseSchema =
HBaseTableSchema.fromDataType(context.getPhysicalRowDataType());
LookupCache cache = null;
// Backward compatible to legacy caching options
if (tableOptions.get(LOOKUP_CACHE_MAX_ROWS) > 0
&& tableOptions.get(LOOKUP_CACHE_TTL).compareTo(Duration.ZERO) > 0) {
cache =
DefaultLookupCache.newBuilder()
.maximumSize(tableOptions.get(LOOKUP_CACHE_MAX_ROWS))
.expireAfterWrite(tableOptions.get(LOOKUP_CACHE_TTL))
.build();
}
if (tableOptions
.get(LookupOptions.CACHE_TYPE)
.equals(LookupOptions.LookupCacheType.PARTIAL)) {
cache = DefaultLookupCache.fromConfig(tableOptions);
}
return new HBaseDynamicTableSource(
hbaseConf,
tableName,
hbaseSchema,
nullStringLiteral,
tableOptions.get(LookupOptions.MAX_RETRIES),
tableOptions.get(LOOKUP_ASYNC),
cache);
}