gems/aws-sdk-sqs/spec/queue_poller_spec.rb (440 lines of code) (raw):
# frozen_string_literal: true
require_relative 'spec_helper'
module Aws
module SQS
describe QueuePoller do
let(:queue_url) { 'https://sqs.us-east-1.amazonaws.com/12345/test' }
let(:client) { Client.new(stub_responses: true) }
let(:options) { { client: client } }
let(:poller) { QueuePoller.new(queue_url, options) }
def sample_message(n = nil)
suffix = n ? "-#{n}" : ''
{
message_id: "id#{suffix}",
receipt_handle: "rh#{suffix}",
body: "body#{suffix}"
}
end
describe 'configuration' do
it 'raises an error on unknown configuration options' do
expect do
QueuePoller.new(queue_url, client: client, bad: 'option')
end.to raise_error(ArgumentError, 'invalid option :bad')
expect do
QueuePoller.new(queue_url, client: client).poll(bad: 'option') { |m| }
end.to raise_error(ArgumentError, 'invalid option :bad')
end
it 'is immutable' do
expect(poller.default_config).to be_frozen
expect(poller.default_config.request_params).to be_frozen
end
it 'has reasonable defaults' do
expect(poller.queue_url).to eq(queue_url)
expect(poller.default_config.idle_timeout).to be(nil)
expect(poller.default_config.skip_delete).to be(false)
expect(poller.default_config.before_request).to be(nil)
expect(poller.default_config.after_empty_receive).to be(nil)
expect(poller.default_config.request_params)
.to eq(
wait_time_seconds: 20,
max_number_of_messages: 1,
visibility_timeout: nil,
attribute_names: ['All'],
message_attribute_names: ['All']
)
end
it 'accepts configuration options to the constructor' do
client = double('client')
callback = double('callback')
poller = QueuePoller.new(
queue_url,
{
client: client,
idle_timeout: 60,
skip_delete: true,
before_request: callback,
after_empty_receive: callback,
wait_time_seconds: 10,
max_number_of_messages: 10,
visibility_timeout: 10,
attribute_names: ['attr-name'],
message_attribute_names: ['msg-attr-name']
}
)
expect(poller.client).to be(client)
expect(poller.default_config.idle_timeout).to eq(60)
expect(poller.default_config.skip_delete).to be(true)
expect(poller.default_config.before_request).to be(callback)
expect(poller.default_config.after_empty_receive).to be(callback)
expect(poller.default_config.request_params)
.to eq(
wait_time_seconds: 10,
max_number_of_messages: 10,
visibility_timeout: 10,
attribute_names: ['attr-name'],
message_attribute_names: ['msg-attr-name']
)
end
context 'max_number_of_messages validation' do
subject do
QueuePoller.new(
queue_url,
client: client,
max_number_of_messages: max_number_of_messages
)
end
let(:max_number_of_messages) { 1 }
it 'accepts a positive integer' do
expect(subject.default_config.request_params[:max_number_of_messages]).to eq(1)
end
[0, nil, 1.1, '1'].each do |value|
context "with `max_number_of_messages: #{value.inspect}`" do
let(:max_number_of_messages) { value }
it 'raises an error' do
expect { subject }.to raise_error(ArgumentError, /positive integer/)
end
end
end
end
end
describe '#poll' do
it 'receives messages in a loop' do
expect(client).to receive(:receive_message)
.exactly(2).times
.with(
{
queue_url: queue_url,
wait_time_seconds: 20,
max_number_of_messages: 1,
visibility_timeout: nil,
attribute_names: ['All'],
message_attribute_names: ['All']
}
)
.and_return(client.stub_data(:receive_message))
poller.before_request do |stats|
throw :stop_polling if stats.request_count >= 2
end
poller.poll { |msg| }
end
it 'yields received messages to the block' do
client.stub_responses(
:receive_message,
[
{ messages: [sample_message] },
{ messages: [] }
]
)
yielded = nil
poller.poll(idle_timeout: 0) do |msg|
yielded = msg
end
expect(yielded.body).to eq('body')
end
it 'yields an array when max messages is greater than 1' do
client.stub_responses(
:receive_message,
[
{ messages: [
sample_message(1),
sample_message(2)
] },
{ messages: [] }
]
)
yielded = nil
poller.poll(idle_timeout: 0, max_number_of_messages: 2) do |messages|
yielded = messages
end
expect(yielded.map(&:body)).to eq(%w[body-1 body-2])
end
it 'does not have duplicated messages and given the '\
'most recently received duplicated message' do
message_one = sample_message(1)
message_dup = message_one.dup.merge(receipt_handle: 'foo')
client.stub_responses(
:receive_message,
[{ messages: [
message_one,
message_dup
] },
{ messages: [] }]
)
yielded_arr = []
poller.poll(idle_timeout: 0) do |msg|
yielded_arr << msg
end
expect(yielded_arr.count).to eq(1)
expect(yielded_arr[0].receipt_handle).to eq(message_dup[:receipt_handle])
end
describe 'message deletion' do
it 'deletes the message at the end of the block' do
expect(client).to receive(:delete_message_batch)
.with(
queue_url: queue_url,
entries: [{ id: 'id', receipt_handle: 'rh' }]
)
client.stub_responses(
:receive_message,
[
{ messages: [sample_message] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0) { |msg| }
end
it 'supports batch deletion' do
expect(client).to receive(:delete_message_batch)
.with(
queue_url: queue_url,
entries: [
{ id: 'id-1', receipt_handle: 'rh-1' },
{ id: 'id-2', receipt_handle: 'rh-2' }
]
)
client.stub_responses(
:receive_message,
[
{ messages: [
sample_message(1),
sample_message(2)
] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0) { |msg| }
end
it 'can skip default delete behavior' do
expect(client).not_to receive(:delete_message_batch)
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0, skip_delete: true) { |msg| }
end
it 'skips delete when :skip_delete is thrown' do
expect(client).not_to receive(:delete_message_batch)
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0) { |_msg| throw :skip_delete }
end
it 'provides the ability to manually delete messages' do
expect(client).to receive(:delete_message).with(
queue_url: queue_url,
receipt_handle: 'rh'
)
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0, skip_delete: true) do |msg|
poller.delete_message(msg)
end
end
it 'provides the ability to manually delete message batches' do
expect(client).to receive(:delete_message_batch)
.with(
queue_url: queue_url,
entries: [
{ id: 'id-1', receipt_handle: 'rh-1' },
{ id: 'id-2', receipt_handle: 'rh-2' }
]
)
client.stub_responses(
:receive_message, [
{ messages: [
sample_message(1),
sample_message(2)
] },
{ messages: [] }
]
)
poller.poll(idle_timeout: 0, max_number_of_messages: 10, skip_delete: true) do |messages|
poller.delete_messages(messages)
end
end
end
describe 'after_empty_receive callback' do
it 'calls the callback when an empty message set is received' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] }
]
)
resp = nil
callback = proc {|stats| }
expect(callback).to receive(:call).once { throw :stop_polling }
poller.after_empty_receive(&callback)
poller.poll{ |msg| }
end
end
describe 'visibility timeouts' do
it 'provides a method to update the visibility timeout of a message' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] }
]
)
resp = nil
poller.poll(idle_timeout: 0) do |msg|
resp = poller.change_message_visibility_timeout(msg, 60)
end
expect(resp.context.operation_name.to_s).to eq('change_message_visibility')
expect(resp.context.params)
.to eq(
queue_url: queue_url,
receipt_handle: 'rh',
visibility_timeout: 60
)
end
end
describe 'stop polling' do
it 'polls until :stop_polling is thrown from #before_request' do
expect(client).to receive(:receive_message)
.exactly(10).times
.and_return(client.stub_data(:receive_message))
poller.before_request do |stats|
throw :stop_polling if stats.request_count == 10
end
poller.poll { |msg| }
end
it 'polls until :idle_timeout seconds have past without messages' do
now = Time.now
allow(Time).to receive(:now).and_return(now)
one_minute_later = now + 61
expect(client).to receive(:receive_message)
.exactly(10).times
.and_return(client.stub_data(:receive_message))
poller.before_request do |stats|
if stats.request_count == 9
allow(Time).to receive(:now)
.and_return(one_minute_later)
end
end
poller.poll(idle_timeout: 60) { |msg| }
end
end
describe 'tracking stats' do
it 'counts the number of requests made' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [] },
{ messages: [sample_message] },
{ messages: [] },
{ messages: [sample_message] }
]
)
poller.before_request do |stats|
throw :stop_polling if stats.received_message_count == 3
end
stats = poller.poll { |msg| }
expect(stats.request_count).to eq(5)
end
it 'counts the number of messages yielded in single mode' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [] }
]
)
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats.received_message_count).to eq(5)
end
it 'counts the number of messages yielded in batch mode' do
client.stub_responses(
:receive_message, [
{ messages: [
sample_message,
sample_message,
sample_message
] },
{ messages: [
sample_message,
sample_message,
sample_message
] },
{ messages: [] }
]
)
stats = poller.poll(idle_timeout: 0, max_number_of_messages: 3) { |msgs| }
expect(stats.received_message_count).to eq(6)
end
it 'tracks when a message was most recently received' do
client.stub_responses(
:receive_message,
[
{ messages: [sample_message] },
{ messages: [] }
]
)
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats.last_message_received_at).to be_kind_of(Time)
end
it 'has a nil value for last_message_received_at with no messages' do
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats.last_message_received_at).to be(nil)
end
it 'tracks when polling started' do
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats.polling_started_at).to be_kind_of(Time)
end
it 'tracks when polling stops' do
started = Time.now
poller.before_request do |stats|
expect(stats.polling_stopped_at).to be(nil)
end
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats.polling_stopped_at > started).to be(true)
end
it 'yields a stats object to #poll' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [] }
]
)
yielded = nil
poller.poll(idle_timeout: 0) do |_msg, stats|
yielded = stats
end
expect(yielded).to be_kind_of(QueuePoller::PollerStats)
expect(yielded.request_count).to eq(4)
expect(yielded.received_message_count).to eq(3)
expect(yielded.last_message_received_at).to be_kind_of(Time)
end
it 'returns a stats object' do
client.stub_responses(
:receive_message, [
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [sample_message] },
{ messages: [] }
]
)
stats = poller.poll(idle_timeout: 0) { |msg| }
expect(stats).to be_kind_of(QueuePoller::PollerStats)
expect(stats.request_count).to eq(4)
expect(stats.received_message_count).to eq(3)
expect(stats.last_message_received_at).to be_kind_of(Time)
end
end
end
end
end
end