in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/lookup/ElasticsearchRowDataLookupFunction.java [83:125]
public ElasticsearchRowDataLookupFunction(
DeserializationSchema<RowData> deserializationSchema,
int maxRetryTimes,
String index,
String type,
String[] producedNames,
DataType[] producedTypes,
String[] lookupKeys,
List<HttpHost> hosts,
NetworkClientConfig networkClientConfig,
ElasticsearchApiCallBridge<C> callBridge) {
checkNotNull(deserializationSchema, "No DeserializationSchema supplied.");
checkNotNull(maxRetryTimes, "No maxRetryTimes supplied.");
checkNotNull(producedNames, "No fieldNames supplied.");
checkNotNull(producedTypes, "No fieldTypes supplied.");
checkNotNull(lookupKeys, "No keyNames supplied.");
checkNotNull(hosts, "No hosts supplied.");
checkNotNull(networkClientConfig, "No networkClientConfig supplied.");
checkNotNull(callBridge, "No ElasticsearchApiCallBridge supplied.");
this.deserializationSchema = deserializationSchema;
this.maxRetryTimes = maxRetryTimes;
this.index = index;
this.type = type;
this.producedNames = producedNames;
this.lookupKeys = lookupKeys;
this.converters = new DataFormatConverters.DataFormatConverter[lookupKeys.length];
Map<String, Integer> nameToIndex =
IntStream.range(0, producedNames.length)
.boxed()
.collect(Collectors.toMap(i -> producedNames[i], i -> i));
for (int i = 0; i < lookupKeys.length; i++) {
Integer position = nameToIndex.get(lookupKeys[i]);
Preconditions.checkArgument(
position != null, "Lookup keys %s not selected", Arrays.toString(lookupKeys));
converters[i] = DataFormatConverters.getConverterForDataType(producedTypes[position]);
}
this.networkClientConfig = networkClientConfig;
this.hosts = hosts;
this.callBridge = callBridge;
}