public Collection lookup()

in flink-connector-mongodb/src/main/java/org/apache/flink/connector/mongodb/table/MongoRowDataLookupFunction.java [111:148]


    public Collection<RowData> lookup(RowData keyRow) {
        for (int retry = 0; retry <= maxRetries; retry++) {
            try {
                BsonDocument lookupValues = lookupKeyRowConverter.convert(keyRow);

                List<Bson> filters =
                        keyNames.stream()
                                .map(name -> eq(name, lookupValues.get(name)))
                                .collect(Collectors.toList());
                Bson query = and(filters);

                Bson projection = project(fieldNames);

                try (MongoCursor<BsonDocument> cursor =
                        getMongoCollection().find(query).projection(projection).cursor()) {
                    List<RowData> rows = new ArrayList<>();
                    while (cursor.hasNext()) {
                        RowData row = mongoRowConverter.convert(cursor.next());
                        rows.add(row);
                    }
                    return rows;
                }
            } catch (MongoException e) {
                LOG.debug("MongoDB lookup error, retry times = {}", retry, e);
                if (retry == maxRetries) {
                    LOG.error("MongoDB lookup error", e);
                    throw new RuntimeException("Execution of MongoDB lookup failed.", e);
                }
                try {
                    Thread.sleep(retryIntervalMs * (retry + 1));
                } catch (InterruptedException e1) {
                    Thread.currentThread().interrupt();
                    throw new RuntimeException(e1);
                }
            }
        }
        return Collections.emptyList();
    }