in flink-connector-jdbc-core/src/main/java/org/apache/flink/connector/jdbc/core/table/source/JdbcDynamicTableSource.java [101:126]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// JDBC only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "JDBC only support non-nested look up keys");
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
final RowType rowType = (RowType) physicalRowDataType.getLogicalType();
JdbcRowDataLookupFunction lookupFunction =
new JdbcRowDataLookupFunction(
options,
lookupMaxRetryTimes,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
rowType,
resolvedPredicates,
pushdownParams);
if (cache != null) {
return PartialCachingLookupProvider.of(lookupFunction, cache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}