in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/connector/elasticsearch/table/ElasticsearchDynamicTableFactoryBase.java [135:158]
public DynamicTableSource createDynamicTableSource(Context context) {
final FactoryUtil.TableFactoryHelper helper =
FactoryUtil.createTableFactoryHelper(this, context);
final ReadableConfig options = helper.getOptions();
final DecodingFormat<DeserializationSchema<RowData>> format =
helper.discoverDecodingFormat(
DeserializationFormatFactory.class,
org.apache.flink.connector.elasticsearch.table.ElasticsearchConnectorOptions
.FORMAT_OPTION);
ElasticsearchConfiguration config = getConfiguration(helper);
helper.validate();
validateConfiguration(config);
return new ElasticsearchDynamicSource(
format,
config,
context.getPhysicalRowDataType(),
options.get(MAX_RETRIES),
capitalize(factoryIdentifier),
getElasticsearchApiCallBridge(),
getLookupCache(options),
getDocumentType(config));
}