in flink-connector-elasticsearch7/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/table/Elasticsearch7DynamicSink.java [133:184]
public SinkFunctionProvider getSinkRuntimeProvider(Context context) {
return () -> {
SerializationSchema<RowData> format =
this.format.createRuntimeEncoder(context, schema.toRowDataType());
final RowElasticsearchSinkFunction upsertFunction =
new RowElasticsearchSinkFunction(
IndexGeneratorFactory.createIndexGenerator(
config.getIndex(), schema, localTimeZoneId),
null, // this is deprecated in es 7+
format,
XContentType.JSON,
REQUEST_FACTORY,
KeyExtractor.createKeyExtractor(schema, config.getKeyDelimiter()));
final ElasticsearchSink.Builder<RowData> builder =
builderProvider.createBuilder(config.getHosts(), upsertFunction);
builder.setFailureHandler(config.getFailureHandler());
builder.setBulkFlushMaxActions(config.getBulkFlushMaxActions());
builder.setBulkFlushMaxSizeMb((int) (config.getBulkFlushMaxByteSize() >> 20));
builder.setBulkFlushInterval(config.getBulkFlushInterval());
builder.setBulkFlushBackoff(config.isBulkFlushBackoffEnabled());
config.getBulkFlushBackoffType().ifPresent(builder::setBulkFlushBackoffType);
config.getBulkFlushBackoffRetries().ifPresent(builder::setBulkFlushBackoffRetries);
config.getBulkFlushBackoffDelay().ifPresent(builder::setBulkFlushBackoffDelay);
// we must overwrite the default factory which is defined with a lambda because of a bug
// in shading lambda serialization shading see FLINK-18006
if (config.getUsername().isPresent()
&& config.getPassword().isPresent()
&& !StringUtils.isNullOrWhitespaceOnly(config.getUsername().get())
&& !StringUtils.isNullOrWhitespaceOnly(config.getPassword().get())) {
builder.setRestClientFactory(
new AuthRestClientFactory(
config.getPathPrefix().orElse(null),
config.getUsername().get(),
config.getPassword().get()));
} else {
builder.setRestClientFactory(
new DefaultRestClientFactory(config.getPathPrefix().orElse(null)));
}
final ElasticsearchSink<RowData> sink = builder.build();
if (config.isDisableFlushOnCheckpoint()) {
sink.disableFlushOnCheckpoint();
}
return sink;
};
}