lib/gitlab/sidekiq_middleware/pause_control/pause_control_service.rb (87 lines of code) (raw):
# frozen_string_literal: true
module Gitlab
module SidekiqMiddleware
module PauseControl
class PauseControlService
# Class for managing queues for paused workers
# When a worker is paused all jobs are saved in a separate sorted sets in redis
LIMIT = 1000
PROJECT_CONTEXT_KEY = "#{Gitlab::ApplicationContext::LOG_KEY}.project".freeze
def initialize(worker_name)
@worker_name = worker_name
worker_name = @worker_name.underscore
@redis_set_key = "sidekiq:pause_control:paused_jobs:zset:{#{worker_name}}"
@redis_score_key = "sidekiq:pause_control:paused_jobs:score:{#{worker_name}}"
end
class << self
def add_to_waiting_queue!(worker_name, args, context)
new(worker_name).add_to_waiting_queue!(args, context)
end
def has_jobs_in_waiting_queue?(worker_name)
new(worker_name).has_jobs_in_waiting_queue?
end
def resume_processing!(worker_name)
new(worker_name).resume_processing!
end
def queue_size(worker_name)
new(worker_name).queue_size
end
end
def add_to_waiting_queue!(args, context)
with_redis do |redis|
redis.zadd(redis_set_key, generate_unique_score(redis), serialize(args, context))
end
end
def queue_size
with_redis { |redis| redis.zcard(redis_set_key) }
end
def has_jobs_in_waiting_queue?
with_redis { |redis| redis.exists?(redis_set_key) } # rubocop:disable CodeReuse/ActiveRecord
end
def resume_processing!(iterations: 1)
with_redis do |redis|
iterations.times do
jobs_with_scores = next_batch_from_waiting_queue(redis)
break if jobs_with_scores.empty?
parsed_jobs = jobs_with_scores.map { |j, _| deserialize(j) }
parsed_jobs.each { |j| send_to_processing_queue(j) }
remove_jobs_from_waiting_queue(redis, jobs_with_scores)
end
size = queue_size
redis.del(redis_score_key, redis_set_key) if size == 0
size
end
end
private
attr_reader :worker_name, :redis_set_key, :redis_score_key
def with_redis(&blk)
Gitlab::Redis::SharedState.with(&blk) # rubocop:disable CodeReuse/ActiveRecord
end
def serialize(args, context)
{
args: args,
# Only include part of the context that would not prevent deduplication
context: context.slice(PROJECT_CONTEXT_KEY)
}.to_json
end
def deserialize(json)
Gitlab::Json.parse(json)
end
def send_to_processing_queue(job)
Gitlab::ApplicationContext.with_raw_context(job['context']) do
args = job['args']
Gitlab::SidekiqLogging::PauseControlLogger.instance.resumed_log(worker_name, args)
worker_name.safe_constantize&.perform_async(*args)
end
end
def generate_unique_score(redis)
redis.incr(redis_score_key)
end
def next_batch_from_waiting_queue(redis)
redis.zrangebyscore(redis_set_key, '-inf', '+inf', limit: [0, LIMIT], with_scores: true)
end
def remove_jobs_from_waiting_queue(redis, jobs_with_scores)
first_score = jobs_with_scores.first.last
last_score = jobs_with_scores.last.last
redis.zremrangebyscore(redis_set_key, first_score, last_score)
end
end
end
end
end