self.build

in lib/logstash/outputs/amazon_es/http_client_builder.rb [8:113]


    def self.build(logger, hosts, params)
      client_settings = {
        :pool_max => params["pool_max"],
        :pool_max_per_route => params["pool_max_per_route"],
        :check_connection_timeout => params["validate_after_inactivity"],
        :http_compression => params["http_compression"],
        :headers => params["custom_headers"]
      }
      
      client_settings[:proxy] = params["proxy"] if params["proxy"]
      
      common_options = {
        :client_settings => client_settings,
        :metric => params["metric"],
        :resurrect_delay => params["resurrect_delay"]
      }

      if params["sniffing"]
        common_options[:sniffing] = true
        common_options[:sniffer_delay] = params["sniffing_delay"]
      end

      common_options[:timeout] = params["timeout"] if params["timeout"]

      if params["path"]
        client_settings[:path] = dedup_slashes("/#{params["path"]}/")
      end

      common_options[:bulk_path] = if params["bulk_path"]
         dedup_slashes("/#{params["bulk_path"]}")
      else
         dedup_slashes("/#{params["path"]}/_bulk")
      end

      common_options[:sniffing_path] = if params["sniffing_path"]
         dedup_slashes("/#{params["sniffing_path"]}")
      else
         dedup_slashes("/#{params["path"]}/_nodes/http")
      end

      common_options[:healthcheck_path] = if params["healthcheck_path"]
         dedup_slashes("/#{params["healthcheck_path"]}")
      else
         dedup_slashes("/#{params["path"]}")
      end

      if params["parameters"]
        client_settings[:parameters] = params["parameters"]
      end

      logger.debug? && logger.debug("Normalizing http path", :path => params["path"], :normalized => client_settings[:path])

      client_settings.merge! setup_ssl(logger, params)
      common_options.merge! setup_basic_auth(logger, params)

      external_version_types = ["external", "external_gt", "external_gte"]
      
      raise(
        LogStash::ConfigurationError,
        "External versioning requires the presence of a version number."
      ) if external_version_types.include?(params.fetch('version_type', '')) and params.fetch("version", nil) == nil
 

      
      raise(
        LogStash::ConfigurationError,
        "External versioning is not supported by the create action."
      ) if params['action'] == 'create' and external_version_types.include?(params.fetch('version_type', ''))

      
      raise( LogStash::ConfigurationError,
        "doc_as_upsert and scripted_upsert are mutually exclusive."
      ) if params["doc_as_upsert"] and params["scripted_upsert"]

      raise(
        LogStash::ConfigurationError,
        "Specifying action => 'update' needs a document_id."
      ) if params['action'] == 'update' and params.fetch('document_id', '') == ''

      raise(
        LogStash::ConfigurationError,
        "External versioning is not supported by the update action. See https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-update.html."
      ) if params['action'] == 'update' and external_version_types.include?(params.fetch('version_type', ''))

      
      update_options = {
        :doc_as_upsert => params["doc_as_upsert"],
        :script_var_name => params["script_var_name"],
        :script_type => params["script_type"],
        :script_lang => params["script_lang"],
        :scripted_upsert => params["scripted_upsert"]
      }
      common_options.merge! update_options if params["action"] == 'update'
      create_http_client(common_options.merge(:hosts => hosts,
                                              :logger => logger,
                                              :protocol => params["protocol"],
                                              :port => params["port"],
                                              :region => params["region"],
                                              :service_name => params["service_name"],
                                              :aws_access_key_id => params["aws_access_key_id"],
                                              :aws_secret_access_key => params["aws_secret_access_key"],
                                              :max_bulk_bytes => params["max_bulk_bytes"],
                                              :skip_healthcheck => params["skip_healthcheck"])
                                              )
    end