in flink-connector-elasticsearch-base/src/main/java/org/apache/flink/streaming/connectors/elasticsearch/ElasticsearchSinkBase.java [206:290]
public ElasticsearchSinkBase(
ElasticsearchApiCallBridge<C> callBridge,
Map<String, String> userConfig,
ElasticsearchSinkFunction<T> elasticsearchSinkFunction,
ActionRequestFailureHandler failureHandler) {
this.callBridge = checkNotNull(callBridge);
this.elasticsearchSinkFunction = checkNotNull(elasticsearchSinkFunction);
this.failureHandler = checkNotNull(failureHandler);
// we eagerly check if the user-provided sink function and failure handler is serializable;
// otherwise, if they aren't serializable, users will merely get a non-informative error
// message
// "ElasticsearchSinkBase is not serializable"
checkArgument(
InstantiationUtil.isSerializable(elasticsearchSinkFunction),
"The implementation of the provided ElasticsearchSinkFunction is not serializable. "
+ "The object probably contains or references non-serializable fields.");
checkArgument(
InstantiationUtil.isSerializable(failureHandler),
"The implementation of the provided ActionRequestFailureHandler is not serializable. "
+ "The object probably contains or references non-serializable fields.");
// extract and remove bulk processor related configuration from the user-provided config,
// so that the resulting user config only contains configuration related to the
// Elasticsearch client.
checkNotNull(userConfig);
// copy config so we can remove entries without side-effects
userConfig = new HashMap<>(userConfig);
ParameterTool params = ParameterTool.fromMap(userConfig);
if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS)) {
bulkProcessorFlushMaxActions = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_ACTIONS);
} else {
bulkProcessorFlushMaxActions = null;
}
if (params.has(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB)) {
bulkProcessorFlushMaxSizeMb = params.getInt(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_MAX_SIZE_MB);
} else {
bulkProcessorFlushMaxSizeMb = null;
}
if (params.has(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS)) {
bulkProcessorFlushIntervalMillis = params.getLong(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_INTERVAL_MS);
} else {
bulkProcessorFlushIntervalMillis = null;
}
boolean bulkProcessorFlushBackoffEnable =
params.getBoolean(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE, true);
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_ENABLE);
if (bulkProcessorFlushBackoffEnable) {
this.bulkProcessorFlushBackoffPolicy = new BulkFlushBackoffPolicy();
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)) {
bulkProcessorFlushBackoffPolicy.setBackoffType(
FlushBackoffType.valueOf(params.get(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE)));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_TYPE);
}
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES)) {
bulkProcessorFlushBackoffPolicy.setMaxRetryCount(
params.getInt(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_RETRIES);
}
if (params.has(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY)) {
bulkProcessorFlushBackoffPolicy.setDelayMillis(
params.getLong(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY));
userConfig.remove(CONFIG_KEY_BULK_FLUSH_BACKOFF_DELAY);
}
} else {
bulkProcessorFlushBackoffPolicy = null;
}
}