in flink-connector-hbase-2.2/src/main/java/org/apache/flink/connector/hbase2/source/HBaseDynamicTableSource.java [62:93]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
checkArgument(
context.getKeys().length == 1 && context.getKeys()[0].length == 1,
"Currently, HBase table can only be lookup by single rowkey.");
checkArgument(
hbaseSchema.getRowKeyName().isPresent(),
"HBase schema must have a row key when used in lookup mode.");
checkArgument(
DataType.getFieldNames(hbaseSchema.convertToDataType())
.get(context.getKeys()[0][0])
.equals(hbaseSchema.getRowKeyName().get()),
"Currently, HBase table only supports lookup by rowkey field.");
if (lookupAsync) {
HBaseRowDataAsyncLookupFunction asyncLookupFunction =
new HBaseRowDataAsyncLookupFunction(
conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes);
if (cache != null) {
return PartialCachingAsyncLookupProvider.of(asyncLookupFunction, cache);
} else {
return AsyncLookupFunctionProvider.of(asyncLookupFunction);
}
} else {
HBaseRowDataLookupFunction lookupFunction =
new HBaseRowDataLookupFunction(
conf, tableName, hbaseSchema, nullStringLiteral, maxRetryTimes);
if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}
}