in lib/logstash/outputs/opensearch/http_client_builder.rb [14:114]
def self.build(logger, hosts, params)
client_settings = {
:pool_max => params["pool_max"],
:pool_max_per_route => params["pool_max_per_route"],
:check_connection_timeout => params["validate_after_inactivity"],
:http_compression => params["http_compression"],
:headers => params["custom_headers"] || {}
}
client_settings[:proxy] = params["proxy"] if params["proxy"]
common_options = {
:client_settings => client_settings,
:metric => params["metric"],
:resurrect_delay => params["resurrect_delay"]
}
if params["sniffing"]
common_options[:sniffing] = true
common_options[:sniffer_delay] = params["sniffing_delay"]
end
common_options[:timeout] = params["timeout"] if params["timeout"]
common_options[:target_bulk_bytes] = params["target_bulk_bytes"]
if params["path"]
client_settings[:path] = dedup_slashes("/#{params["path"]}/")
end
common_options[:bulk_path] = if params["bulk_path"]
dedup_slashes("/#{params["bulk_path"]}")
else
dedup_slashes("/#{params["path"]}/_bulk")
end
common_options[:sniffing_path] = if params["sniffing_path"]
dedup_slashes("/#{params["sniffing_path"]}")
else
dedup_slashes("/#{params["path"]}/_nodes/http")
end
common_options[:healthcheck_path] = if params["healthcheck_path"]
dedup_slashes("/#{params["healthcheck_path"]}")
else
dedup_slashes("/#{params["path"]}")
end
if params["parameters"]
client_settings[:parameters] = params["parameters"]
end
logger.debug? && logger.debug("Normalizing http path", :path => params["path"], :normalized => client_settings[:path])
client_settings.merge! setup_ssl(logger, params)
common_options.merge! setup_basic_auth(logger, params)
external_version_types = ["external", "external_gt", "external_gte"]
raise(
LogStash::ConfigurationError,
"External versioning requires the presence of a version number."
) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil
raise(
LogStash::ConfigurationError,
"External versioning is not supported by the create action."
) if params['action'] == 'create' and external_version_types.include?(params.fetch('version_type', ''))
raise( LogStash::ConfigurationError,
"doc_as_upsert and scripted_upsert are mutually exclusive."
) if params["doc_as_upsert"] and params["scripted_upsert"]
raise(
LogStash::ConfigurationError,
"Specifying action => 'update' needs a document_id."
) if params['action'] == 'update' and params.fetch('document_id', '') == ''
raise(
LogStash::ConfigurationError,
"External versioning is not supported by the update action."
) if params['action'] == 'update' and external_version_types.include?(params.fetch('version_type', ''))
update_options = {
:doc_as_upsert => params["doc_as_upsert"],
:script_var_name => params["script_var_name"],
:script_type => params["script_type"],
:script_lang => params["script_lang"],
:scripted_upsert => params["scripted_upsert"]
}
common_options.merge! update_options if params["action"] == 'update'
create_http_client(common_options.merge(:hosts => hosts,
:logger => logger,
:auth_type => params["auth_type"]
))
end