lib/active_job/queue_adapters/sqs_adapter/params.rb (63 lines of code) (raw):

# frozen_string_literal: true module ActiveJob module QueueAdapters class SqsAdapter # Build request parameter of Aws::SQS::Client # @api private class Params class << self def assured_delay_seconds(timestamp) delay = (timestamp.to_f - Time.now.to_f).floor delay = 0 if delay.negative? raise ArgumentError, 'Unable to queue a job with a delay great than 15 minutes' if delay > 15.minutes delay end end def initialize(job, body) @job = job @body = body || job.serialize end def queue_url @queue_url ||= Aws::ActiveJob::SQS.config.url_for(@job.queue_name) end def entry if Aws::ActiveJob::SQS.fifo?(queue_url) default_entry.merge(options_for_fifo) else default_entry end end private def default_entry { message_body: ActiveSupport::JSON.dump(@body), message_attributes: message_attributes } end def message_attributes { 'aws_sqs_active_job_class' => { string_value: @job.class.to_s, data_type: 'String' }, 'aws_sqs_active_job_version' => { string_value: Aws::ActiveJob::SQS::VERSION, data_type: 'String' } } end def options_for_fifo options = {} options[:message_deduplication_id] = Digest::SHA256.hexdigest(ActiveSupport::JSON.dump(deduplication_body)) message_group_id = @job.message_group_id if @job.respond_to?(:message_group_id) message_group_id ||= Aws::ActiveJob::SQS.config.message_group_id_for(@job.queue_name) options[:message_group_id] = message_group_id options end def deduplication_body ex_dedup_keys = @job.excluded_deduplication_keys if @job.respond_to?(:excluded_deduplication_keys) ex_dedup_keys ||= Aws::ActiveJob::SQS.config.excluded_deduplication_keys_for(@job.queue_name) @body.except(*ex_dedup_keys) end end end end end