in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoRowDataLookupFunction.java [72:98]
public MongoRowDataLookupFunction(
MongoConnectionOptions connectionOptions,
int maxRetries,
long retryIntervalMs,
List<String> fieldNames,
List<DataType> fieldTypes,
List<String> keyNames,
RowType rowType) {
checkNotNull(fieldNames, "No fieldNames supplied.");
checkNotNull(fieldTypes, "No fieldTypes supplied.");
checkNotNull(keyNames, "No keyNames supplied.");
this.connectionOptions = checkNotNull(connectionOptions);
this.maxRetries = maxRetries;
this.retryIntervalMs = retryIntervalMs;
this.fieldNames = fieldNames;
this.mongoRowConverter = BsonToRowDataConverters.createConverter(rowType);
this.keyNames = keyNames;
LogicalType[] keyTypes =
this.keyNames.stream()
.map(s -> fieldTypes.get(fieldNames.indexOf(s)).getLogicalType())
.toArray(LogicalType[]::new);
this.lookupKeyRowConverter =
RowDataToBsonConverters.createConverter(
RowType.of(keyTypes, keyNames.toArray(new String[0])));
}