in lib/aws/rails/sqs_active_job/poller.rb [66:100]
def poll
queue_url = Aws::Rails::SqsActiveJob.config.queue_url_for(@options[:queue])
@logger.info "Polling on: #{@options[:queue]} => #{queue_url}"
client = Aws::Rails::SqsActiveJob.config.client
@poller = Aws::SQS::QueuePoller.new(queue_url, client: client)
poller_options = {
skip_delete: true,
max_number_of_messages: @options[:max_messages],
visibility_timeout: @options[:visibility_timeout]
}
if Aws::Rails::SqsActiveJob.fifo?(queue_url)
poller_options[:max_number_of_messages] = 1
end
single_message = poller_options[:max_number_of_messages] == 1
@poller.poll(poller_options) do |msgs|
msgs = [msgs] if single_message
@logger.info "Processing batch of #{msgs.length} messages"
msgs.each do |msg|
@executor.execute(Aws::SQS::Message.new(
queue_url: queue_url,
receipt_handle: msg.receipt_handle,
data: msg,
client: client
))
end
end
end