spec/lib/es/client_spec.rb (480 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
# frozen_string_literal: true
require 'elasticsearch'
require 'elastic/transport/transport/errors'
RSpec.describe(ES::Client) do
let(:system_logger) { double }
let(:host) { 'http://notreallyaserver' }
let(:port) { '9200' }
let(:elastic_product_headers) { { 'x-elastic-product': 'Elasticsearch' } }
let(:index_name) { 'fantastic_index_name' }
let(:config) do
{
elasticsearch: {
username: 'user',
password: 'pw',
api_key: 'key',
host: 'http://notreallyaserver',
port: '9200'
}
}.deep_symbolize_keys
end
let(:subject) { described_class.new(config[:elasticsearch], system_logger, '0.0.0-test', 'crawl-id') }
before(:each) do
stub_request(:get, "#{host}:#{port}/")
.to_return(status: 403, body: '', headers: {})
stub_request(:get, "#{host}:#{port}/_cluster/health")
# TODO: make a factory or something for system_logger mocks
allow(system_logger).to receive(:info)
allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:warn)
allow(system_logger).to receive(:error)
end
describe 'retry configuration' do
context 'with default settings' do
let(:config) { { elasticsearch: { host:, port: } } }
it 'sets default retry values' do
expect(subject.instance_variable_get(:@max_retries)).to eq(3)
expect(subject.instance_variable_get(:@retry_delay)).to eq(2)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 3 retries with 2s delay'
)
end
end
context 'with custom retry settings' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 5, delay_on_retry: 1 } } }
it 'sets custom retry values' do
expect(subject.instance_variable_get(:@max_retries)).to eq(5)
expect(subject.instance_variable_get(:@retry_delay)).to eq(1)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 5 retries with 1s delay'
)
end
end
context 'with retry_on_failure: true' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: true } } }
it 'sets default retry count' do
expect(subject.instance_variable_get(:@max_retries)).to eq(3)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 3 retries with 2s delay'
)
end
end
context 'with retry_on_failure: false' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: false } } }
it 'sets zero retries' do
expect(subject.instance_variable_get(:@max_retries)).to eq(0)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 0 retries with 2s delay'
)
end
end
context 'with invalid retry_on_failure' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 'invalid' } } }
it 'sets default retry count' do
expect(subject.instance_variable_get(:@max_retries)).to eq(3)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 3 retries with 2s delay'
)
end
end
context 'with invalid delay_on_retry' do
let(:config) { { elasticsearch: { host:, port:, delay_on_retry: 'invalid' } } }
it 'sets default retry delay' do
expect(subject.instance_variable_get(:@retry_delay)).to eq(2)
expect(system_logger).to have_received(:debug).with(
'Elasticsearch client retry configuration: 3 retries with 2s delay'
)
end
end
end
describe '#connection_config' do
context 'when configuring Elasticsearch client' do
it 'handles full URL with scheme, host and port' do
new_config = {
host: 'https://localhost:9201'
}
result = subject.connection_config(new_config, '0.0.0-foo')
expect(result[:scheme]).to eq('https')
expect(result[:host]).to eq('localhost')
expect(result[:port]).to eq(9201)
end
it 'handles URL with scheme and host' do
new_config = {
host: 'https://localhost'
}
result = subject.connection_config(new_config, '0.0.0-foo')
expect(result[:scheme]).to eq('https')
expect(result[:host]).to eq('localhost')
end
it 'handles host with port' do
new_config = {
host: 'localhost',
port: 9201
}
result = subject.connection_config(new_config, '0.0.0-foo')
expect(result[:scheme]).to be_nil
expect(result[:host]).to eq('localhost')
expect(result[:port]).to eq(9201)
end
it 'handles host only' do
new_config = {
host: 'localhost'
}
result = subject.connection_config(new_config, '0.0.0-foo')
expect(result[:scheme]).to be_nil
expect(result[:host]).to eq('localhost')
expect(result[:port]).to be_nil
end
it 'gives precedence to separate port over port in host' do
new_config = {
host: 'https://localhost:9201',
port: 9300
}
result = subject.connection_config(new_config, '0.0.0-foo')
expect(result[:port]).to eq(9300)
end
end
context 'when ssl verification is not fully enabled' do
it 'configures Elasticsearch client with ssl verification disabled' do
config[:elasticsearch][:ssl_verify] = false
result = subject.connection_config(config[:elasticsearch], '0.0.0-foo')
expect(result[:transport_options][:ssl][:verify]).to eq(false)
expect(result[:transport_options][:ssl][:ca_path]).to be_nil
expect(result[:transport_options][:ssl][:ca_fingerprint]).to be_nil
end
end
context 'when ca_file is configured' do
let(:ca_file) { '/my/local/certificate.crt' }
it 'configures Elasticsearch client with ca_file' do
config[:elasticsearch][:ca_file] = ca_file
result = subject.connection_config(config[:elasticsearch], '0.0.0-foo')
expect(result[:transport_options][:ssl][:ca_file]).to eq(ca_file)
end
end
context 'when ca_path is configured' do
let(:ca_path) { '/my/local/certificates' }
it 'configures Elasticsearch client with ca_path' do
config[:elasticsearch][:ca_path] = ca_path
result = subject.connection_config(config[:elasticsearch], '0.0.0-foo')
expect(result[:transport_options][:ssl][:ca_path]).to eq(ca_path)
end
end
context 'when ca_fingerprint is configured' do
let(:ca_fingerprint) { '64F2593F...' }
it 'configures Elasticsearch client with ca_fingerprint' do
config[:elasticsearch][:ca_fingerprint] = ca_fingerprint
result = subject.connection_config(config[:elasticsearch], '0.0.0-foo')
expect(result[:ca_fingerprint]).to eq(ca_fingerprint)
# Also ensure that SSL Verification has not been implicitly disabled
expect(result[:transport_options][:ssl][:verify]).to be_nil
end
end
context 'when API key is not present' do
it 'initialises with username and password' do
config[:elasticsearch][:api_key] = nil
result = subject.connection_config(config[:elasticsearch], '0.0.0-foo')
expect(result[:api_key]).to be_nil
expect(result[:user]).to eq('user')
expect(result[:password]).to eq('pw')
expect(result[:transport_options][:headers][:'user-agent']).to eq('elastic-web-crawler-0.0.0-foo')
end
end
context 'when API key is present' do
it 'overrides username and password' do
result = subject.connection_config(config[:elasticsearch], '0.0.0-bar')
expect(result[:host]).to eq('notreallyaserver')
expect(result[:port]).to eq('9200')
expect(result[:api_key]).to eq('key')
expect(result[:username]).to be_nil
expect(result[:password]).to be_nil
expect(result[:transport_options][:headers][:'user-agent']).to eq('elastic-web-crawler-0.0.0-bar')
end
end
xcontext 'when headers are present' do
# TODO: implement when we support headers in config
let(:headers) do
{
something: 'something'
}
end
it 'configures Elasticsearch client with headers' do
config[:elasticsearch]['headers'] = headers
result = subject.connection_config(config[:elasticsearch], '0.0.0-test')
expect(result['headers']).to eq(headers)
end
end
context 'when headers are not present' do
it 'configures Elasticsearch client with no headers' do
config[:elasticsearch][:headers] = nil
result = subject.connection_config(config[:elasticsearch], '0.0.0-test')
expect(result).to_not have_key(:headers)
end
end
context 'when compression setting is not present' do
it 'defaults compression to true' do
config[:elasticsearch].delete(:compression) # Ensure it's not set
result = subject.connection_config(config[:elasticsearch], '0.0.0-test')
expect(result[:compression]).to be true
end
end
context 'when compression setting is true' do
it 'sets compression to true' do
config[:elasticsearch][:compression] = true
result = subject.connection_config(config[:elasticsearch], '0.0.0-test')
expect(result[:compression]).to be true
end
end
context 'when compression setting is false' do
it 'sets compression to false' do
config[:elasticsearch][:compression] = false
result = subject.connection_config(config[:elasticsearch], '0.0.0-test')
expect(result[:compression]).to be false
end
end
end
describe '#execute_with_retry' do
let(:description) { 'Test Operation' }
let(:block_result) { { success: true } }
let(:block_spy) { spy('block') }
let(:error_class) { Class.new(StandardError) }
before do
allow(block_spy).to receive(:call).and_return(block_result)
end
def execute_retry(&block)
subject.send(:execute_with_retry, description:, &block)
end
context 'with default settings (3 retries, 2s delay)' do
let(:config) { { elasticsearch: { host:, port: } } } # Use default retry config
it 'succeeds on the first try without sleeping or logging warnings' do
expect(subject).not_to receive(:sleep)
expect(system_logger).not_to receive(:warn)
expect(system_logger).not_to receive(:error)
result = execute_retry { block_spy.call }
expect(result).to eq(block_result)
expect(block_spy).to have_received(:call).once
end
end
context 'when success requires retries' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 2, delay_on_retry: 1 } } }
let(:attempts) { instance_double(Proc) }
let(:attempt_counter) { double(count: 0) }
before do
allow(attempt_counter).to receive(:count).and_return(0, 1, 2) # Simulate 3 calls total
allow(block_spy).to receive(:call) do
count = attempt_counter.count
raise error_class, "Failed attempt #{count + 1}" if count < 2
block_result # Success on the 3rd attempt (index 2)
end
end
it 'succeeds after retrying, sleeps with exponential backoff, and logs warnings' do
expect(system_logger).to receive(:warn).with(
%r{#{description} attempt 1/3 failed: 'Failed attempt 1'. Retrying in 1.0s..}
).ordered
expect(subject).to receive(:sleep).with(1.0**1).ordered
expect(system_logger).to receive(:warn).with(
%r{#{description} attempt 2/3 failed: 'Failed attempt 2'. Retrying in 1.0s..}
).ordered
expect(subject).to receive(:sleep).with(1.0**2).ordered
expect(system_logger).not_to receive(:error)
result = execute_retry { block_spy.call }
expect(result).to eq(block_result)
expect(block_spy).to have_received(:call).exactly(3).times
end
end
context 'when all retries fail' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 2, delay_on_retry: 1 } } }
before do
allow(block_spy).to receive(:call).and_raise(error_class, 'Persistent failure')
end
it 'raises the original error after exhausting retries, sleeps, logs warnings and final error' do
expect(system_logger).to receive(:warn).with(
%r{#{description} attempt 1/3 failed: 'Persistent failure'. Retrying in 1.0s..}
).ordered
expect(subject).to receive(:sleep).with(1.0**1).ordered
expect(system_logger).to receive(:warn).with(
%r{#{description} attempt 2/3 failed: 'Persistent failure'. Retrying in 1.0s..}
).ordered
expect(subject).to receive(:sleep).with(1.0**2).ordered
expect(system_logger).to receive(:error).with(
/#{description} failed after 3 attempts: 'Persistent failure'/
).ordered
expect do
execute_retry { block_spy.call }
end.to raise_error(error_class, 'Persistent failure')
expect(block_spy).to have_received(:call).exactly(3).times
end
end
context 'when retries are disabled' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: false } } }
before do
allow(block_spy).to receive(:call).and_raise(error_class, 'Immediate failure')
end
it 'fails immediately, logs specific error, and does not sleep or log warnings' do
expect(subject).not_to receive(:sleep)
expect(system_logger).not_to receive(:warn)
expect(system_logger).to receive(:error).with(
/#{description} failed: 'Immediate failure'. Retries disabled./
)
expect do
execute_retry { block_spy.call }
end.to raise_error(error_class, 'Immediate failure')
expect(block_spy).to have_received(:call).once
end
end
end
describe '#bulk' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 1, delay_on_retry: 1 } } }
let(:payload) do
{
body: [
{ index: { _index: index_name, _id: '123' } },
{ id: '123', title: 'Foo', body: 'bar' }
]
}
end
context 'when successful' do
before :each do
stub_request(:post, "#{host}:#{port}/_bulk").to_return(status: 200, headers: elastic_product_headers)
end
it 'sends bulk request' do
result = subject.bulk(payload)
expect(result.status).to eq(200)
end
end
context 'when the underlying client call fails' do
let(:error) { Elastic::Transport::Transport::ServerError.new('[500] {"error":"boom"}') }
let(:file_double) { double('File', puts: nil, close: nil) }
before do
allow(subject.transport).to receive(:perform_request).and_return(
double(status: 200, body: '{"version":{"number":"8.13.0"}}', headers: elastic_product_headers)
)
allow(subject.transport).to receive(:perform_request).with('POST', '_bulk', any_args).and_raise(error)
allow(File).to receive(:open).with(%r{output/failed_payloads/crawl-id/\d{14}}, 'w').and_yield(file_double)
end
it 'saves the payload after persistent errors' do
expect(subject).to receive(:execute_with_retry).with(description: 'Bulk index').and_call_original
expect { subject.bulk(payload) }.to raise_error(error)
expect(file_double).to have_received(:puts).with(payload[:body].first)
expect(file_double).to have_received(:puts).with(payload[:body].second)
end
end
end
describe '#paginated_search' do
let(:size) { Crawler::OutputSink::Elasticsearch::SEARCH_PAGINATION_SIZE }
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 1, delay_on_retry: 1 } } }
let(:query) do
{
_source: ['url'],
query: {
range: {
last_crawled_at: {
lt: Time.now.rfc3339
}
}
},
size:,
sort: [{ last_crawled_at: 'asc' }]
}.deep_stringify_keys
end
let(:hit1) do
{
_id: '1234',
_source: { url: 'https://www.elastic.co/search-labs' },
sort: [1]
}.deep_stringify_keys
end
let(:hit2) do
{
_id: '5678',
_source: { url: 'https://www.elastic.co/search-labs/tutorials' },
sort: [2]
}.deep_stringify_keys
end
let(:empty_response) do
{
hits: {
hits: []
}
}.deep_stringify_keys
end
let(:full_response) do
{ hits: { hits: [hit1, hit2] } }.deep_stringify_keys
end
context 'when successful' do
before do
allow(subject)
.to receive(:search).and_return(full_response, empty_response)
end
it 'sends search requests without error' do
expect(subject).to receive(:search).twice
results = subject.paginated_search(index_name, query)
expect(results).to match_array([hit1, hit2])
end
end
context 'when successful with pagination' do
let(:size) { 1 }
let(:first_response) do
{
hits: {
hits: [hit1]
}
}.deep_stringify_keys
end
let(:second_response) do
{
hits: {
hits: [hit2]
}
}.deep_stringify_keys
end
before do
allow(subject)
.to receive(:search).and_return(first_response, second_response, empty_response)
end
it 'sends search requests without error' do
expect(subject).to receive(:search).exactly(3).times
results = subject.paginated_search(index_name, query)
expect(results).to match_array([hit1, hit2])
end
end
context 'when the underlying search call fails' do
let(:error) { Elastic::Transport::Transport::ServerError.new('[503] {"error":"unavailable"}') }
before do
allow(subject).to receive(:search).and_raise(error)
end
it 'calls execute_with_retry and raises the error' do
expect(subject).to receive(:execute_with_retry).with(description: 'Search').and_call_original
expect { subject.paginated_search(index_name, query) }.to raise_error(error)
end
end
end
describe '#delete_by_query' do
let(:config) { { elasticsearch: { host:, port:, retry_on_failure: 1, delay_on_retry: 1 } } }
let(:delete_url) { %r{#{host}:#{port}/#{index_name}/_delete_by_query} }
let(:query) { { query: { match_all: {} } } }
let(:error_response) do
{ status: 500, body: '{"error":"delete_failed"}', headers: { 'Content-Type' => 'application/json' } }
end
let(:success_response) do
{ status: 200, body: '{"deleted": 10}',
headers: elastic_product_headers.merge('Content-Type' => 'application/json') }
end
context 'when successful' do
before do
stub_request(:post, delete_url).to_return(success_response)
end
it 'calls execute_with_retry and performs the delete' do
expect(subject).to receive(:execute_with_retry).with(description: 'Delete by query').and_call_original
expect { subject.delete_by_query(index: index_name, body: query) }.not_to raise_error
end
end
context 'when the underlying delete call fails' do
let(:error) { Elastic::Transport::Transport::ServerError.new('[500] {"error":"delete_failed"}') }
before do
allow(subject.transport).to receive(:perform_request).with(
'POST', "#{index_name}/_delete_by_query", { refresh: true }, query, anything
).and_raise(error)
end
it 'calls execute_with_retry and raises the error' do
expect(subject).to receive(:execute_with_retry).with(description: 'Delete by query').and_call_original
expect { subject.delete_by_query(index: index_name, body: query) }.to raise_error(error)
end
end
end
end