spec/core/ingestion/es_sink_spec.rb (276 lines of code) (raw):
require 'core/ingestion/es_sink'
require 'utility/logger'
require 'core/connector_settings'
require 'utility/es_client'
require 'spec_helper'
RSpec::Matchers.define :array_of_size do |x|
match { |actual| actual.size == x }
end
describe Core::Ingestion::EsSink do
subject { described_class.new(index_name, request_pipeline, bulk_queue) }
let(:index_name) { 'some-index-name' }
let(:request_pipeline) { Core::ConnectorSettings::DEFAULT_REQUEST_PIPELINE }
let(:es_client) { double }
let(:bulk_queue) { double }
let(:serializer) { double }
let(:document) { { :id => 15 } }
let(:serialized_document) { "id: #{document[:id]}, text: 'hoho, haha!'" }
let(:deleted_id) { 25 }
context 'when all private methods are implemented' do
before(:each) do
allow(Utility::EsClient).to receive(:new).and_return(es_client)
allow(es_client).to receive(:bulk)
# I attempted to test with this class mocked but it just made things much much harder
allow(bulk_queue).to receive(:will_fit?).and_return(true)
allow(bulk_queue).to receive(:add)
allow(bulk_queue).to receive(:pop_all)
allow(Elasticsearch::API).to receive(:serializer).and_return(serializer)
allow(serializer).to receive(:dump).and_return('')
allow(serializer).to receive(:dump).with(document).and_return(serialized_document)
end
context '#ingest' do
context 'when ingested document is nil' do
let(:document) { nil }
let(:serialized_document) { '' }
it 'does not add document to the queue' do
expect(bulk_queue).to_not receive(:add)
subject.ingest(document)
end
it 'produces a warning' do
expect(Utility::Logger).to receive(:warn)
subject.ingest(document)
end
end
context 'when ingested document is empty' do
let(:document) { {} }
it 'does not add document to the queue' do
expect(bulk_queue).to_not receive(:add)
subject.ingest(document)
end
it 'produces a warning' do
expect(Utility::Logger).to receive(:warn)
subject.ingest(document)
end
end
context 'when ingested document is too big' do
let(:document_id) { 15 }
let(:document) { { 'id' => document_id, 'something' => 'something' } }
let(:serialized_document) { 'id: 15, something: something' }
let(:max_allowed_document_size) { 5 } # 5 bytes
subject { described_class.new(index_name, request_pipeline, bulk_queue, max_allowed_document_size) }
it 'does not add the document to the queue' do
expect(bulk_queue).to_not receive(:add)
subject.ingest(document)
end
it 'produces a warn statement' do
expect(Utility::Logger).to receive(:warn).with(/#{document_id}/)
subject.ingest(document)
end
end
context 'when ingested document is not empty' do
context 'when bulk queue still has capacity' do
it 'does not immediately send the document into elasticsearch' do
expect(es_client).to_not receive(:bulk)
subject.ingest(document)
end
end
context 'when bulk queue reports that it is full' do
let(:document) do
{
:id => 1,
'text' => 'hoho, haha!'
}
end
let(:another_document) do
{
:id => 2,
:text => 'work work!'
}
end
let(:serialized_document) { "id: #{document[:id]}, text: 'hoho, haha!'" }
let(:another_serialized_document) { "id: #{another_document[:id]}, text: 'work work!'" }
before(:each) do
# emulated behaviour is:
# Queue will be full once first item is added to it
allow(bulk_queue).to receive(:will_fit?).and_return(true, false)
allow(bulk_queue).to receive(:pop_all).and_return([serialized_document])
allow(serializer).to receive(:dump).with(document).and_return(serialized_document)
allow(serializer).to receive(:dump).with(another_document).and_return(another_serialized_document)
end
it 'sends a bulk request with data returned from bulk queue' do
expect(es_client).to receive(:bulk)
.once
subject.ingest(document)
subject.ingest(another_document)
end
it 'pops existing documents before adding a new one' do
expect(bulk_queue).to receive(:add)
.with(anything, serialized_document)
.ordered
expect(bulk_queue).to receive(:pop_all)
.ordered
expect(bulk_queue).to receive(:add)
.with(anything, another_serialized_document)
.ordered
subject.ingest(document)
subject.ingest(another_document)
end
end
end
end
context '#ingest_multiple' do
let(:document1) { { :id => 1 } }
let(:document2) { { :id => 2 } }
let(:document3) { { :id => 3 } }
it 'calls ingest on each ingested document' do
expect(subject).to receive(:ingest).with(document1)
expect(subject).to receive(:ingest).with(document2)
expect(subject).to receive(:ingest).with(document3)
subject.ingest_multiple([document1, document2, document3])
end
end
context '#delete' do
context 'when id is not provided' do
let(:deleted_id) { nil }
it 'does not add operation to the queue' do
expect(bulk_queue).to_not receive(:add)
subject.delete(deleted_id)
end
end
context 'when id is provided' do
let(:deleted_id) { 'something-nice!' }
it 'adds an operation to the queue' do
expect(bulk_queue).to receive(:add)
subject.delete(deleted_id)
end
context('when bulk queue still has capacity') do
let(:id) { 15 }
it 'does not immediately send the document into elasticsearch' do
expect(es_client).to_not receive(:bulk)
subject.delete(id)
end
end
context 'when bulk queue reports that it is full' do
let(:delete_id) { 10 }
let(:serialized_delete_op) { 'delete: 10' }
let(:another_delete_id) { 11 }
let(:another_serialized_delete_op) { 'delete: 11' }
before(:each) do
# emulated behaviour is:
# Queue will be full once first item is added to it
allow(bulk_queue).to receive(:will_fit?).and_return(true, false)
allow(bulk_queue).to receive(:pop_all).and_return(serialized_delete_op)
allow(serializer).to receive(:dump)
.with({ 'delete' => hash_including('_id' => delete_id) })
.and_return(serialized_delete_op)
allow(serializer).to receive(:dump)
.with({ 'delete' => hash_including('_id' => another_delete_id) })
.and_return(another_serialized_delete_op)
end
it 'sends out one batch of changes' do
expect(es_client).to receive(:bulk)
.once
.with(hash_including(:body => a_string_including(serialized_delete_op)))
subject.delete(delete_id)
subject.delete(another_delete_id)
end
it 'pops existing documents before adding a new one' do
expect(bulk_queue).to receive(:add)
.with(serialized_delete_op)
.ordered
expect(bulk_queue).to receive(:pop_all)
.ordered
expect(bulk_queue).to receive(:add)
.with(another_serialized_delete_op)
.ordered
subject.delete(delete_id)
subject.delete(another_delete_id)
end
end
end
end
context '#delete_multiple' do
let(:id1) { 1 }
let(:id2) { 2 }
let(:id3) { 3 }
it 'calls ingest on each ingested document' do
expect(subject).to receive(:delete).with(id1)
expect(subject).to receive(:delete).with(id2)
expect(subject).to receive(:delete).with(id3)
subject.delete_multiple([id1, id2, id3])
end
end
context '#flush' do
let(:operation) { 'bulk: delete something \n insert something else' }
before(:each) do
allow(bulk_queue).to receive(:pop_all)
.and_return(operation)
end
it 'sends data from bulk queue to elasticsearch' do
expect(es_client).to receive(:bulk)
.with(hash_including(:body => operation))
subject.flush
end
end
context '#ingestion_stats' do
context 'when flush was not triggered' do
before(:each) do
15.times.each do |id|
subject.ingest({ :id => id })
end
25.times.each do |id|
subject.delete(id)
end
end
it 'returns empty stats' do
stats = subject.ingestion_stats
expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:deleted_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
end
end
context 'when flush was triggered' do
let(:operation) { 'bulk: delete something \n insert something else' }
before(:each) do
allow(bulk_queue).to receive(:pop_all)
.and_return(operation)
end
context 'when nothing was ingested yet' do
it 'returns empty stats' do
stats = subject.ingestion_stats
expect(stats[:indexed_document_count]).to eq(0)
expect(stats[:deleted_document_count]).to eq(0)
expect(stats[:indexed_document_volume]).to eq(0)
end
end
context 'when some documents were ingested' do
let(:document_count) { 5 }
let(:serialized_object) { 'doesnt matter' }
before(:each) do
allow(serializer).to receive(:dump).and_return(serialized_object)
document_count.times.each do |id|
subject.ingest({ :id => id })
end
subject.flush
end
it 'returns expected indexed_document_count' do
stats = subject.ingestion_stats
expect(stats[:indexed_document_count]).to eq(document_count)
end
it 'returns expected indexed_document_volume' do
stats = subject.ingestion_stats
expect(stats[:indexed_document_volume]).to eq(document_count * serialized_object.bytesize)
end
end
context 'when some documents were deleted' do
let(:deleted_count) { 5 }
before(:each) do
deleted_count.times.each do |id|
subject.delete(id)
end
subject.flush
end
it 'returns expected deleted_document_count' do
stats = subject.ingestion_stats
expect(stats[:deleted_document_count]).to eq(deleted_count)
end
end
end
end
end
end