in flink-vvp-connector-adbpg/src/main/java/org/apache/flink/connector/jdbc/table/AdbpgDynamicTableSource.java [53:82]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext lookupContext) {
String[] keyNames = new String[lookupContext.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = lookupContext.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = fieldNamesStr[innerKeyArr[0]];
}
List<String> nameList = Arrays.asList(fieldNamesStr);
LogicalType[] keyTypes =
Arrays.stream(keyNames)
.map(s -> {
Preconditions.checkArgument(
nameList.contains(s),
"keyName %s can't find in fieldNames %s.",
s,
nameList);
return lts[nameList.indexOf(s)];
})
.toArray(LogicalType[]::new);
return TableFunctionProvider.of(
new AdbpgRowDataLookupFunction(
fieldNum,
fieldNamesStr,
lts,
keyNames,
keyTypes,
config));
}