public Collection lookup()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/lookup/ElasticsearchRowDataLookupFunction.java [145:182]


    public Collection<RowData> lookup(RowData keyRow) {
        BoolQueryBuilder lookupCondition = new BoolQueryBuilder();
        for (int i = 0; i < lookupKeys.length; i++) {
            lookupCondition.must(
                    new TermQueryBuilder(lookupKeys[i], converters[i].toExternal(keyRow, i)));
        }
        searchSourceBuilder.query(lookupCondition);
        searchRequest.source(searchSourceBuilder);

        for (int retry = 0; retry <= maxRetryTimes; retry++) {
            try {
                ArrayList<RowData> rows = new ArrayList<>();
                Tuple2<String, String[]> searchResponse = callBridge.search(client, searchRequest);
                if (searchResponse.f1.length > 0) {
                    String[] result = searchResponse.f1;
                    for (String s : result) {
                        RowData row = parseSearchResult(s);
                        rows.add(row);
                    }
                    rows.trimToSize();
                    return rows;
                }
            } catch (IOException e) {
                LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e);
                if (retry >= maxRetryTimes) {
                    throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
                }
                try {
                    Thread.sleep(1000L * retry);
                } catch (InterruptedException e1) {
                    LOG.warn(
                            "Interrupted while waiting to retry failed elasticsearch search, aborting");
                    throw new FlinkRuntimeException(e1);
                }
            }
        }
        return Collections.emptyList();
    }