in flink-connector-kudu/src/main/java/org/apache/flink/connector/kudu/table/KuduDynamicTableSource.java [85:104]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
checkArgument(innerKeyArr.length == 1, "Kudu only supports non-nested lookup keys");
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
KuduRowDataLookupFunction lookupFunction =
new KuduRowDataLookupFunction(
keyNames,
tableInfo,
configBuilder.build(),
DataType.getFieldNames(physicalRowDataType),
lookupMaxRetryTimes);
return cache == null
? LookupFunctionProvider.of(lookupFunction)
: PartialCachingLookupProvider.of(lookupFunction, cache);
}