utils/replay_dlq.rb (90 lines of code) (raw):
#!/usr/bin/env ruby
require 'aws-sdk'
require 'optimist'
require 'awesome_print'
require 'json'
# looks up the URL for the given queue_name.
def lookup_queue(client, queue_name)
raise StandardError, "No queue name to search for!" if queue_name==nil
results = client.list_queues(queue_name_prefix: queue_name)
if results.queue_urls.length==0
raise StandardError, "No queue found for #{queue_name}"
elsif results.length > 1
puts("WARNING: found #{results.length} queues for #{queue_name}:")
results.queue_urls.each { |entry| puts("\t#{entry}") }
end
results.queue_urls[0]
end
def get_next_page(client, queue_url, page_size)
msgList = []
while msgList.length < page_size
msgs = client.receive_message({
queue_url: queue_url,
max_number_of_messages: 10
}).messages
break if msgs.length==0
msgList.concat(msgs)
end
#ap msgList
msgList
end
#convert the MEssage objects in the list to Hashes for bulk send
def convert_message_list(msgList)
msgList.map do |entry|
{
id: entry.message_id,
message_body: entry.body,
message_attributes: entry.message_attributes
}
end
end
def parse_body(msg)
data = JSON.parse(msg.body)
JSON.parse(data["Message"])
end
def filter_unparseable(msg_list)
msg_list.select do |entry|
begin
parse_body(entry)
true
rescue JSON::ParserError
puts("WARNING: Unparseable message found: #{entry.body}")
false
end
end
end
# push the list of messages to the source queue_url, and delete each one as it is successfully pushed
def push_to_source(client, queue_url, source_queue_url, msgList, keep=false)
# SQS supports only bulks of up to 10 messages
msgList.each_slice(10) do |msgSubList|
results = client.send_message_batch({
queue_url: queue_url,
entries: convert_message_list(msgSubList)
})
puts("#{results.successful.length} messages sent, #{results.failed.length} messages failed")
successful_ids = results.successful.map { |entry| entry.id }
puts("debug: successful_ids: #{successful_ids}")
successful_messages = msgList.select { |entry| successful_ids.include?(entry.message_id)}
#puts("debug: successful_messages: #{successful_messages}")
unless keep
successful_messages.each { |entry|
puts("debug: deleting message #{entry.message_id}; #{entry.receipt_handle} from #{source_queue_url}")
client.delete_message({queue_url: source_queue_url, receipt_handle: entry.receipt_handle})
}
end
end
end
### START MAIN
opts = Optimist::options do
opt :dlq, "name of dead-letter queue to read", :type=>:string
opt :target, "target queue to write to", :type=>:string
opt :limit, "limit to this number of messages", :type=>:integer, :default=>999999
opt :pagesize,"shift this many messages at a time", :type=>:integer, :default=>30
opt :region, "work in this region", :type=>:string, :default=>"eu-west-1"
opt :keep, "don't delete messages from the DLQ after copying. Intended for use in testing.", :type=>:boolean, :default=>false
end
client = Aws::SQS::Client.new({:region=>opts.region})
dlq_url = lookup_queue(client, opts.dlq)
target_url = lookup_queue(client, opts.target)
ctr=0
while ctr<=opts.limit
puts("Getting up to #{opts.pagesize} messages...")
msgList = filter_unparseable(get_next_page(client, dlq_url, opts.pagesize))
puts("Received #{msgList.length} messages")
ctr+=msgList.length
break if msgList.length==0
push_to_source(client,target_url, dlq_url, msgList, keep: opts.keep)
end
puts("Relayed #{ctr} messages with a limit of #{opts.limit}")