public DynamicTableSource createDynamicTableSource()

in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java [135:158]


    public DynamicTableSource createDynamicTableSource(Context context) {
        final FactoryUtil.TableFactoryHelper helper =
                FactoryUtil.createTableFactoryHelper(this, context);
        final ReadableConfig options = helper.getOptions();
        final DecodingFormat<DeserializationSchema<RowData>> format =
                helper.discoverDecodingFormat(
                        DeserializationFormatFactory.class,
                        org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions
                                .FORMAT_OPTION);

        ElasticsearchConfiguration config = getConfiguration(helper);
        helper.validate();
        validateConfiguration(config);

        return new ElasticsearchDynamicSource(
                format,
                config,
                context.getPhysicalRowDataType(),
                options.get(MAX_RETRIES),
                capitalize(factoryIdentifier),
                getElasticsearchApiCallBridge(),
                getLookupCache(options),
                getDocumentType(config));
    }