in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoDynamicTableSource.java [92:115]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
final List<String> keyNames = new ArrayList<>(context.getKeys().length);
for (int[] innerKeyArr : context.getKeys()) {
Preconditions.checkArgument(
innerKeyArr.length == 1, "MongoDB only support non-nested look up keys yet");
keyNames.add(DataType.getFieldNames(producedDataType).get(innerKeyArr[0]));
}
final RowType rowType = (RowType) producedDataType.getLogicalType();
MongoRowDataLookupFunction lookupFunction =
new MongoRowDataLookupFunction(
connectionOptions,
lookupMaxRetries,
lookupRetryIntervalMs,
DataType.getFieldNames(producedDataType),
DataType.getFieldDataTypes(producedDataType),
keyNames,
rowType);
if (lookupCache != null) {
return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}