lib/core/scheduler.rb (160 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'time' require 'fugit' require 'core/connector_settings' require 'core/elastic_connector_actions' require 'core/filtering/validation_status' require 'utility/cron' require 'utility/logger' require 'utility/exception_tracking' module Core class Scheduler def initialize(poll_interval, heartbeat_interval) @poll_interval = poll_interval @heartbeat_interval = heartbeat_interval @is_shutting_down = false end def connector_settings raise 'Not implemented' end def when_triggered loop do connector_settings.each do |cs| if sync_triggered?(cs) yield cs, :sync end if heartbeat_triggered?(cs) yield cs, :heartbeat end if configuration_triggered?(cs) yield cs, :configuration end if filtering_validation_triggered?(cs) yield cs, :filter_validation end end rescue *Utility::AUTHORIZATION_ERRORS => e log_authorization_error(e) rescue StandardError => e log_standard_error(e) ensure if @is_shutting_down break end sleep_for_poll_interval end end def shutdown Utility::Logger.info("Shutting down scheduler #{self.class.name}.") @is_shutting_down = true end private def sync_triggered?(connector_settings, time_at_poll_start = Time.now) unless connector_settings.valid_index_name? Utility::Logger.warn("The index name of #{connector_settings.formatted} is invalid.") return false end unless connector_settings.connector_status_allows_sync? Utility::Logger.info("#{connector_settings.formatted.capitalize} is in status \"#{connector_settings.connector_status}\" and won't sync yet. Connector needs to be in one of the following statuses: #{Connectors::ConnectorStatus::STATUSES_ALLOWING_SYNC} to run.") return false end # Sync when sync_now flag is true for the connector if connector_settings.sync_now? Utility::Logger.info("#{connector_settings.formatted.capitalize} is manually triggered to sync now.") return true end schedule_triggered?(connector_settings.full_sync_scheduling, connector_settings.formatted, time_at_poll_start) end def heartbeat_triggered?(connector_settings) last_seen = connector_settings[:last_seen] return true if last_seen.nil? || last_seen.empty? last_seen = begin Time.parse(last_seen) rescue StandardError Utility::Logger.warn("Unable to parse last_seen #{last_seen}") nil end return true unless last_seen last_seen + @heartbeat_interval < Time.now end def configuration_triggered?(connector_settings) connector_settings.needs_service_type? || connector_settings.connector_status == Connectors::ConnectorStatus::CREATED end def filtering_validation_triggered?(connector_settings) unless connector_settings.any_filtering_feature_enabled? Utility::Logger.debug("#{connector_settings.formatted} all filtering features are disabled. Skip filtering validation.") return false end filtering = connector_settings.filtering unless filtering.present? Utility::Logger.debug("#{connector_settings.formatted} does not contain filtering to be validated.") return false end draft_filters = filtering[:draft] unless draft_filters.present? Utility::Logger.debug("#{connector_settings.formatted} does not contain a draft filter to be validated.") return false end validation = draft_filters[:validation] unless validation.present? Utility::Logger.warn("#{connector_settings.formatted} does not contain a validation object inside draft filtering. Check connectors index.") return false end unless validation[:state] == Core::Filtering::ValidationStatus::EDITED Utility::Logger.debug("#{connector_settings.formatted} filtering validation needs to be in state #{Core::Filtering::ValidationStatus::EDITED} to be able to validate it.") return false end true end def connector_registered?(service_type) if Connectors::REGISTRY.registered?(service_type) true else Utility::Logger.warn("The service type (#{service_type}) is not supported.") false end end def schedule_triggered?(scheduling_settings, identifier, time_at_poll_start = Time.now) # Don't sync if sync is explicitly disabled unless scheduling_settings.present? && scheduling_settings[:enabled] == true Utility::Logger.debug("#{identifier.capitalize} scheduling is disabled.") return false end current_schedule = scheduling_settings[:interval] # Don't sync if there is no actual scheduling interval if current_schedule.nil? || current_schedule.empty? Utility::Logger.warn("No sync schedule configured for #{identifier}.") return false end current_schedule = begin Utility::Cron.quartz_to_crontab(current_schedule) rescue StandardError => e Utility::ExceptionTracking.log_exception(e, "Unable to convert quartz (#{current_schedule}) to crontab.") return false end cron_parser = Fugit::Cron.parse(current_schedule) # Don't sync if the scheduling interval is non-parsable unless cron_parser Utility::Logger.error("Unable to parse sync schedule for #{identifier}: expression #{current_schedule} is not a valid Quartz Cron definition.") return false end next_trigger_time = cron_parser.next_time(time_at_poll_start) # Sync if next trigger happens before the next poll poll_window = time_at_poll_start + @poll_interval if next_trigger_time <= poll_window Utility::Logger.info("#{identifier.capitalize} sync is triggered by cron schedule #{current_schedule}.") return true else # log that a sync was not triggered, share the next trigger time and when poll interval was meant to end Utility::Logger.debug("Sync for #{identifier.capitalize} not triggered as #{next_trigger_time} occurs after the poll window #{poll_window}. Poll window began at #{time_at_poll_start}, poll interval is #{@poll_interval} seconds.") end false end def sleep_for_poll_interval if @poll_interval > 0 && !@is_shutting_down Utility::Logger.debug("Sleeping for #{@poll_interval} seconds in #{self.class}.") sleep(@poll_interval) end end def log_authorization_error(e) Utility::ExceptionTracking.log_exception(e, 'Could not retrieve connectors settings due to authorization error.') end def log_standard_error(e) Utility::ExceptionTracking.log_exception(e, 'Sync failed due to unexpected error.') end end end