in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/lookup/ElasticsearchRowDataLookupFunction.java [145:182]
public Collection<RowData> lookup(RowData keyRow) {
BoolQueryBuilder lookupCondition = new BoolQueryBuilder();
for (int i = 0; i < lookupKeys.length; i++) {
lookupCondition.must(
new TermQueryBuilder(lookupKeys[i], converters[i].toExternal(keyRow, i)));
}
searchSourceBuilder.query(lookupCondition);
searchRequest.source(searchSourceBuilder);
for (int retry = 0; retry <= maxRetryTimes; retry++) {
try {
ArrayList<RowData> rows = new ArrayList<>();
Tuple2<String, String[]> searchResponse = callBridge.search(client, searchRequest);
if (searchResponse.f1.length > 0) {
String[] result = searchResponse.f1;
for (String s : result) {
RowData row = parseSearchResult(s);
rows.add(row);
}
rows.trimToSize();
return rows;
}
} catch (IOException e) {
LOG.error(String.format("Elasticsearch search error, retry times = %d", retry), e);
if (retry >= maxRetryTimes) {
throw new FlinkRuntimeException("Execution of Elasticsearch search failed.", e);
}
try {
Thread.sleep(1000L * retry);
} catch (InterruptedException e1) {
LOG.warn(
"Interrupted while waiting to retry failed elasticsearch search, aborting");
throw new FlinkRuntimeException(e1);
}
}
}
return Collections.emptyList();
}