public LookupRuntimeProvider getLookupRuntimeProvider()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicSource.java [56:85]


    public LookupRuntimeProvider getLookupRuntimeProvider(LookupContext context) {
        // Elasticsearch only support non-nested look up keys
        String[] keyNames = new String[context.getKeys().length];
        for (int i = 0; i < keyNames.length; i++) {
            int[] innerKeyArr = context.getKeys()[i];
            Preconditions.checkArgument(
                    innerKeyArr.length == 1, "Elasticsearch only support non-nested look up keys");
            keyNames[i] = DataType.getFieldNames(physicalRowDataType).get(innerKeyArr[0]);
        }

        NetworkClientConfig networkClientConfig = buildNetworkClientConfig();

        ElasticsearchRowDataLookupFunction<?> lookupFunction =
                new ElasticsearchRowDataLookupFunction<>(
                        this.format.createRuntimeDecoder(context, physicalRowDataType),
                        lookupMaxRetryTimes,
                        config.getIndex(),
                        docType,
                        DataType.getFieldNames(physicalRowDataType).toArray(new String[0]),
                        DataType.getFieldDataTypes(physicalRowDataType).toArray(new DataType[0]),
                        keyNames,
                        config.getHosts(),
                        networkClientConfig,
                        apiCallBridge);
        if (lookupCache != null) {
            return PartialCachingLookupProvider.of(lookupFunction, lookupCache);
        } else {
            return LookupFunctionProvider.of(lookupFunction);
        }
    }