in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java [56:85]
public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
// Elasticsearch only support non-nested look up keys
String[] keyNames = new String[context.getKeys().length];
for (int i = 0; i < keyNames.length; i++) {
int[] innerKeyArr = context.getKeys()[i];
Preconditions.checkArgument(
innerKeyArr.length == 1, "Elasticsearch only support non-nested look up keys");
keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
}
NetworkClientConfig networkClientConfig = buildNetworkClientConfig();
ElasticsearchRowDataLookupFunction<?> lookupFunction =
new ElasticsearchRowDataLookupFunction<>(
this.format.createRuntimeDecoder(context, physicalRowDataType),
lookupMaxRetryTimes,
config.getIndex(),
docType,
DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
keyNames,
config.getHosts(),
networkClientConfig,
apiCallBridge);
if (lookupCache != null) {
return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
} else {
return LookupFunctionProvider.of(lookupFunction);
}
}