lib/core/job_cleanup.rb (49 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'core'
require 'utility/logger'
module Core
class JobCleanUp
class << self
def execute(connector_id = nil)
process_orphaned_jobs
process_idle_jobs(connector_id)
end
private
def process_orphaned_jobs
Utility::Logger.debug('Start cleaning up orphaned jobs...')
all_connectors = ConnectorSettings.fetch_all_connectors
orphaned_jobs = ConnectorJob.orphaned_jobs(all_connectors.map(&:id))
if orphaned_jobs.empty?
Utility::Logger.debug('No orphaned jobs found. Skipping...')
return
end
# delete content indicies in case they are re-created by sync job
content_indices = (orphaned_jobs.map(&:index_name) - all_connectors.map(&:index_name)).compact.uniq
ElasticConnectorActions.delete_indices(content_indices) if content_indices.any?
result = ConnectorJob.delete_jobs(orphaned_jobs)
Utility::Logger.error("Error found when deleting jobs: #{result['failures']}") if result['failures']&.any?
Utility::Logger.info("Successfully deleted #{result['deleted']} out of #{result['total']} orphaned jobs.")
end
def process_idle_jobs(connector_id = nil)
Utility::Logger.debug("Start cleaning up idle jobs for #{connector_id ? "connector #{connector_id}" : 'native connectors'}...")
idle_jobs = ConnectorJob.idle_jobs(connector_id)
if idle_jobs.empty?
Utility::Logger.debug('No idle jobs found. Skipping...')
return
end
marked_count = 0
idle_jobs.each do |job|
job.error!('The job has not seen any update for some time.')
Utility::Logger.debug("Successfully marked job #{job.id} as error.")
job_id = job.id
job = ConnectorJob.fetch_by_id(job_id)
Utility::Logger.warn("Could not found job by id #{job_id}") if job.nil?
Utility::Logger.warn("Could not found connector by id #{job.connector_id}") if job && job.connector.nil?
job&.connector&.update_last_sync!(job)
marked_count += 1
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e)
end
Utility::Logger.info("Successfully marked #{marked_count} out of #{idle_jobs.count} idle jobs as error.")
end
end
end
end