lib/utility/es_client.rb (66 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'logger'
require 'elasticsearch'
module Utility
class EsClient < ::Elasticsearch::Client
class IndexingFailedError < StandardError
def initialize(message, error = nil)
super(message)
@cause = error
end
attr_reader :cause
end
def initialize(es_config, &block)
super(connection_configs(es_config), &block)
end
def connection_configs(es_config)
configs = {}
configs[:api_key] = es_config[:api_key] if es_config[:api_key]
if es_config[:cloud_id]
configs[:cloud_id] = es_config[:cloud_id]
elsif es_config[:hosts]
configs[:hosts] = es_config[:hosts]
else
raise 'Either elasticsearch.cloud_id or elasticsearch.hosts should be configured.'
end
configs[:retry_on_failure] = es_config[:retry_on_failure] || false
configs[:request_timeout] = es_config[:request_timeout] || nil
configs[:log] = es_config[:log] || false
configs[:trace] = es_config[:trace] || false
# transport options
configs[:transport_options] = es_config[:transport_options] if es_config[:transport_options]
configs[:ca_fingerprint] = es_config[:ca_fingerprint] if es_config[:ca_fingerprint]
# headers
# these are necessary for cloud-hosted native connectors
configs[:headers] = es_config[:headers].to_h if es_config[:headers]
# if log or trace is activated, we use the application logger
configs[:logger] = if configs[:log] || configs[:trace]
Utility::Logger.logger
else
# silence!
::Logger.new(IO::NULL)
end
configs
end
def bulk(arguments = {})
raise_if_necessary(super(arguments))
end
private
def raise_if_necessary(response)
if response['errors']
first_error = nil
response['items'].each do |item|
%w[index delete].each do |op|
if item.has_key?(op) && item[op].has_key?('error')
first_error = item
break
end
end
end
if first_error
trace_id = Utility::Logger.generate_trace_id
Utility::Logger.error("Failed to index documents into Elasticsearch. First error in response is: #{first_error.to_json}")
short_message = Utility::Logger.abbreviated_message(first_error.to_json)
raise IndexingFailedError.new("Failed to index documents into Elasticsearch with an error '#{short_message}'. Look up the error ID [#{trace_id}] in the application logs to see the full error message.")
else
raise IndexingFailedError.new('Failed to index documents into Elasticsearch due to unknown error. Try enabling tracing for Elasticsearch and checking the logs.')
end
end
response
end
end
end