spec/lib/es/bulk_queue_spec.rb (124 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License 2.0;
# you may not use this file except in compliance with the Elastic License 2.0.
#
# frozen_string_literal: true
RSpec.describe(ES::BulkQueue) do
let(:subject) { described_class.new(count_threshold, size_threshold, system_logger) }
let(:system_logger) { double }
let(:count_threshold) { 50 }
let(:size_threshold) { 999_999_999 } # super high default
before(:each) do
allow(system_logger).to receive(:debug)
allow(system_logger).to receive(:error)
end
describe '#add' do
before(:each) do
allow(subject).to receive(:will_fit?)
.and_return(true)
end
context 'when new element won\'t fit any more' do
before(:each) do
allow(subject).to receive(:will_fit?)
.and_return(false)
end
it 'raises an error' do
expect { subject.add('some-op') }.to raise_error(Errors::BulkQueueOverflowError)
end
end
context 'when only operation is added' do
let(:op) { 'do: something' }
it 'adds only the operation to the buffer' do
subject.add(op)
expect(subject.pop_all).to include(op)
end
end
context 'when operation with payload is added' do
let(:op) { 'index: id=12' }
let(:payload) { 'text: something, counter: 15' }
it 'adds both operation and payload to the buffer' do
subject.add(op, payload)
buffer = subject.pop_all
expect(buffer).to include(op)
expect(buffer).to include(payload)
end
end
end
describe '#will_fit?' do
let(:op) { 'hello: world' }
context 'when thresholds are not reached' do
it 'returns true' do
expect(subject.will_fit?(op)).to eq(true)
end
end
context 'when too many items were added to the queue' do
let(:count_threshold) { 10 }
before(:each) do
4.times.each do |i|
subject.add("op: #{i}")
end
6.times.each do |i|
subject.add("op-w-payload: #{i}, payload: #{i}")
end
end
it 'returns false' do
expect(subject.will_fit?(op)).to eq(false)
end
end
context 'when size of items added to the queue is too big' do
let(:big_operation) { 'this_is: a big operation' }
let(:big_operation_bytesize) { 26 }
let(:size_threshold) { (big_operation_bytesize * 5) - 1 } # only 4 big operations will fit
before(:each) do
allow(subject).to receive(:bytesize).and_call_original
4.times do
subject.add(big_operation)
end
end
it 'returns false' do
expect(subject.will_fit?(big_operation)).to eq(false)
end
end
end
describe '#pop_all' do
context 'when queue is empty' do
it 'returns empty array' do
expect(subject.pop_all).to eq([])
end
end
context 'when some operations were added to the queue' do
before(:each) do
25.times do |i|
subject.add("some_op: #{i}")
end
end
it 'cleans up the queue' do
subject.pop_all
expect(subject.pop_all).to eq([])
end
end
end
describe '#current_stats' do
let(:op_count) { 15 }
let(:big_operation) { 'this_is: a big operation' }
let(:big_operation_bytesize) { 26 }
before(:each) do
allow(subject).to receive(:bytesize).and_call_original
op_count.times do
subject.add(big_operation)
end
end
it 'returns expected number of operations' do
expect(subject.current_stats[:current_op_count]).to eq(op_count)
end
it 'returns expected size of operations' do
expect(subject.current_stats[:current_buffer_size]).to eq(op_count * big_operation_bytesize)
end
context 'when queue is popped' do
before(:each) do
subject.pop_all
end
it 'returns expected number of operations' do
expect(subject.current_stats[:current_op_count]).to eq(0)
end
it 'returns expected size of operations' do
expect(subject.current_stats[:current_buffer_size]).to eq(0)
end
end
end
end