in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoRowDataLookupFunction.java [111:148]
public Collection<RowData> lookup(RowData keyRow) {
for (int retry = 0; retry <= maxRetries; retry++) {
try {
BsonDocument lookupValues = lookupKeyRowConverter.convert(keyRow);
List<Bson> filters =
keyNames.stream()
.map(name -> eq(name, lookupValues.get(name)))
.collect(Collectors.toList());
Bson query = and(filters);
Bson projection = project(fieldNames);
try (MongoCursor<BsonDocument> cursor =
getMongoCollection().find(query).projection(projection).cursor()) {
List<RowData> rows = new ArrayList<>();
while (cursor.hasNext()) {
RowData row = mongoRowConverter.convert(cursor.next());
rows.add(row);
}
return rows;
}
} catch (MongoException e) {
LOG.debug("MongoDB lookup error, retry times = {}", retry, e);
if (retry == maxRetries) {
LOG.error("MongoDB lookup error", e);
throw new RuntimeException("Execution of MongoDB lookup failed.", e);
}
try {
Thread.sleep(retryIntervalMs * (retry + 1));
} catch (InterruptedException e1) {
Thread.currentThread().interrupt();
throw new RuntimeException(e1);
}
}
}
return Collections.emptyList();
}