lib/es/client.rb (198 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License 2.0; # you may not use this file except in compliance with the Elastic License 2.0. # # frozen_string_literal: true require 'fileutils' require 'elasticsearch' require 'active_support/core_ext/integer/time' require 'uri' module ES class Client < ::Elasticsearch::Client USER_AGENT = 'elastic-web-crawler-' DEFAULT_RETRY_ON_FAILURE = 3 # retry count DEFAULT_DELAY_ON_RETRY = 2 # in seconds DEFAULT_REQUEST_TIMEOUT = 10 # in seconds FAILED_BULKS_DIR = 'output/failed_payloads' class IndexingFailedError < StandardError def initialize(message, error = nil) super(message) @cause = error end attr_reader :cause end def initialize(es_config, system_logger, crawler_version, crawl_id, &) @system_logger = system_logger @crawl_id = crawl_id super(connection_config(es_config, crawler_version), &) end def connection_config(es_config, crawler_version) config = { request_timeout: es_config.fetch(:request_timeout, DEFAULT_REQUEST_TIMEOUT), reload_on_failure: es_config.fetch(:reload_on_failure, false), transport_options: { headers: { 'user-agent': "#{USER_AGENT}#{crawler_version}", 'X-elastic-product-origin': 'crawler' } } } @max_retries, @retry_delay = get_retry_configuration(es_config) config.merge!(configure_host_port(es_config)) config.merge!(configure_auth(es_config)) config.deep_merge!(configure_ssl(es_config)) config.merge!(configure_compression(es_config)) config end def bulk(payload = {}) execute_with_retry(description: 'Bulk index') do raise_if_necessary(super(payload)) end rescue StandardError => e store_failed_payload(payload) raise e end # Perform a search query with pagination and return a formatted response. # ES paginates search results using a combination of `sort` and `search_after`. # We repeat search queries until the response is empty, which is how we know pagination is complete. def paginated_search(index_name, query) results = [] loop do response = execute_with_retry(description: 'Search') do search(index: [index_name], body: query) end hits = response['hits']['hits'] return results if hits.empty? results.push(*hits) query['search_after'] = hits.last['sort'] end end def delete_by_query(index:, body:, refresh: true) execute_with_retry(description: 'Delete by query') do super(index:, body:, refresh:) end end private def configure_host_port(es_config) host = es_config[:host] port = es_config[:port] uri = URI.parse(host) scheme = uri.scheme host = uri.host || host # Port from separate argument takes precedence over port in hostname port ||= uri.port { scheme:, host:, port: }.compact end def get_retry_configuration(es_config) retry_count = es_config.fetch(:retry_on_failure, DEFAULT_RETRY_ON_FAILURE) # Handle alternative retry count values if retry_count == false retry_count = 0 elsif retry_count == true || !retry_count.is_a?(Integer) || retry_count.negative? retry_count = DEFAULT_RETRY_ON_FAILURE end delay_on_retry = es_config.fetch(:delay_on_retry, DEFAULT_DELAY_ON_RETRY) delay_on_retry = DEFAULT_DELAY_ON_RETRY unless delay_on_retry.is_a?(Integer) && delay_on_retry.positive? @system_logger.debug( "Elasticsearch client retry configuration: #{retry_count} retries with #{delay_on_retry}s delay" ) [retry_count, delay_on_retry] end def configure_auth(es_config) if es_config[:api_key] @system_logger.info('ES connections will be authorized with configured API key') { api_key: es_config[:api_key] } elsif es_config[:username] || es_config[:password] @system_logger.info('ES connections will use configured username/password') { user: es_config[:username], password: es_config[:password] } else @system_logger.info('ES connections will use no authentication') {} end end def configure_ssl(es_config) # See: https://www.rubydoc.info/gems/faraday/Faraday/SSLOptions ssl_config = { ca_fingerprint: es_config[:ca_fingerprint], transport_options: {} }.compact if es_config[:ssl_verify] == false if es_config[:ca_path] || es_config[:ca_file] || es_config[:verify_hostname] @system_logger.warn( 'SSL verification is disabled, but SSL verification options are configured. These options will be ignored.' ) end ssl_config[:transport_options][:ssl] = { verify: false } else # SSL Verification is enabled (or default) ssl_config[:transport_options][:ssl] = { ca_file: es_config[:ca_file], ca_path: es_config[:ca_path], verify: es_config[:ssl_verify] }.compact end @system_logger.debug("ES connection SSL config: #{ssl_config}") ssl_config end def configure_compression(es_config) compress = es_config[:compression] != false @system_logger.debug("ES connection compression is #{compress ? 'enabled' : 'disabled'}") { compression: compress } end def raise_if_necessary(response) # rubocop:disable Metrics/MethodLength, Metrics/PerceivedComplexity if response['errors'] first_error = nil ops = %w[index delete] response['items'].each do |item| ops.each do |op| next unless item.key?(op) && item[op].key?('error') first_error = item break end end @system_logger.warn("Errors found in bulk response. Full response: #{response}") if first_error # TODO: add trace logging # TODO: consider logging all errors instead of just first raise IndexingFailedError, "Failed to index documents into Elasticsearch with an error '#{first_error.to_json}'." else raise IndexingFailedError, "Failed to index documents into Elasticsearch due to unknown error. Full response: #{response}" end else @system_logger.debug('No errors found in bulk response.') end response end def store_failed_payload(payload) dir = "#{FAILED_BULKS_DIR}/#{@crawl_id}" FileUtils.mkdir_p(dir) unless File.directory?(dir) filename = Time.now.strftime('%Y%m%d%H%M%S') full_path = File.join(dir, filename) File.open(full_path, 'w') do |file| payload[:body].each do |item| file.puts(item) end end @system_logger.warn("Saved failed bulk payload to #{full_path}") end def execute_with_retry(description:) # rubocop:disable Metrics/MethodLength try = 0 max_tries = 1 + @max_retries begin yield rescue StandardError => e try += 1 if try < max_tries wait_time = @retry_delay**try @system_logger.warn( "#{description} attempt #{try}/#{max_tries} failed: '#{e.message}'. Retrying in #{wait_time.to_f}s.." ) sleep(wait_time) retry else log_final_failure(description:, tries: try, error: e) raise e end end end def log_final_failure(description:, tries:, error:) if @max_retries.nonzero? @system_logger.error("#{description} failed after #{tries} attempts: '#{error.message}'.") else @system_logger.error("#{description} failed: '#{error.message}'. Retries disabled.") end end end end