utils/replay_dlq.rb (90 lines of code) (raw):

#!/usr/bin/env ruby require 'aws-sdk' require 'optimist' require 'awesome_print' require 'json' # looks up the URL for the given queue_name. def lookup_queue(client, queue_name) raise StandardError, "No queue name to search for!" if queue_name==nil results = client.list_queues(queue_name_prefix: queue_name) if results.queue_urls.length==0 raise StandardError, "No queue found for #{queue_name}" elsif results.length > 1 puts("WARNING: found #{results.length} queues for #{queue_name}:") results.queue_urls.each { |entry| puts("\t#{entry}") } end results.queue_urls[0] end def get_next_page(client, queue_url, page_size) msgList = [] while msgList.length < page_size msgs = client.receive_message({ queue_url: queue_url, max_number_of_messages: 10 }).messages break if msgs.length==0 msgList.concat(msgs) end #ap msgList msgList end #convert the MEssage objects in the list to Hashes for bulk send def convert_message_list(msgList) msgList.map do |entry| { id: entry.message_id, message_body: entry.body, message_attributes: entry.message_attributes } end end def parse_body(msg) data = JSON.parse(msg.body) JSON.parse(data["Message"]) end def filter_unparseable(msg_list) msg_list.select do |entry| begin parse_body(entry) true rescue JSON::ParserError puts("WARNING: Unparseable message found: #{entry.body}") false end end end # push the list of messages to the source queue_url, and delete each one as it is successfully pushed def push_to_source(client, queue_url, source_queue_url, msgList, keep=false) # SQS supports only bulks of up to 10 messages msgList.each_slice(10) do |msgSubList| results = client.send_message_batch({ queue_url: queue_url, entries: convert_message_list(msgSubList) }) puts("#{results.successful.length} messages sent, #{results.failed.length} messages failed") successful_ids = results.successful.map { |entry| entry.id } puts("debug: successful_ids: #{successful_ids}") successful_messages = msgList.select { |entry| successful_ids.include?(entry.message_id)} #puts("debug: successful_messages: #{successful_messages}") unless keep successful_messages.each { |entry| puts("debug: deleting message #{entry.message_id}; #{entry.receipt_handle} from #{source_queue_url}") client.delete_message({queue_url: source_queue_url, receipt_handle: entry.receipt_handle}) } end end end ### START MAIN opts = Optimist::options do opt :dlq, "name of dead-letter queue to read", :type=>:string opt :target, "target queue to write to", :type=>:string opt :limit, "limit to this number of messages", :type=>:integer, :default=>999999 opt :pagesize,"shift this many messages at a time", :type=>:integer, :default=>30 opt :region, "work in this region", :type=>:string, :default=>"eu-west-1" opt :keep, "don't delete messages from the DLQ after copying. Intended for use in testing.", :type=>:boolean, :default=>false end client = Aws::SQS::Client.new({:region=>opts.region}) dlq_url = lookup_queue(client, opts.dlq) target_url = lookup_queue(client, opts.target) ctr=0 while ctr<=opts.limit puts("Getting up to #{opts.pagesize} messages...") msgList = filter_unparseable(get_next_page(client, dlq_url, opts.pagesize)) puts("Received #{msgList.length} messages") ctr+=msgList.length break if msgList.length==0 push_to_source(client,target_url, dlq_url, msgList, keep: opts.keep) end puts("Relayed #{ctr} messages with a limit of #{opts.limit}")