lib/core/elastic_connector_actions.rb (525 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true # require 'active_support/core_ext/hash' require 'connectors/connector_status' require 'connectors/job_trigger_method' require 'connectors/sync_status' require 'utility' require 'elastic-transport' module Core class JobAlreadyRunningError < StandardError def initialize(connector_id) super("Sync job for connector '#{connector_id}' is already running.") end end class JobNotCreatedError < StandardError def initialize(connector_id, response) super("Sync job for connector '#{connector_id}' could not be created. Response: #{response}") end end class ConnectorVersionChangedError < StandardError def initialize(connector_id, seq_no, primary_term) super("Version conflict: seq_no [#{seq_no}] and primary_term [#{primary_term}] do not match for connector '#{connector_id}'.") end end class ElasticConnectorActions class << self def force_sync(connector_id) update_connector_fields(connector_id, :scheduling => { :enabled => true }, :sync_now => true) end def create_connector(index_name, service_type) body = { :scheduling => { :enabled => true }, :index_name => index_name, :service_type => service_type } response = client.index(:index => Utility::Constants::CONNECTORS_INDEX, :body => body) response['_id'] end def get_connector(connector_id) # TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found client.get(:index => Utility::Constants::CONNECTORS_INDEX, :id => connector_id, :ignore => 404).with_indifferent_access end def get_job(job_id) # TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found client.get(:index => Utility::Constants::JOB_INDEX, :id => job_id, :ignore => 404).with_indifferent_access end def connectors_meta # TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found alias_mappings = client.indices.get_mapping(:index => Utility::Constants::CONNECTORS_INDEX, :ignore => 404).with_indifferent_access index = get_latest_index_in_alias(Utility::Constants::CONNECTORS_INDEX, alias_mappings.keys) alias_mappings.dig(index, 'mappings', '_meta') || { :extract_binary_content => true, :name => 'ent-search-generic-ingestion', :reduce_whitespace => true, :run_ml_inference => false, } end def search_connectors(query, page_size, offset) client.search( :index => Utility::Constants::CONNECTORS_INDEX, :ignore => 404, :body => { :size => page_size, :from => offset, :query => query, :sort => ['name'] } ) end def search_jobs(query, page_size, offset) client.search( :index => Utility::Constants::JOB_INDEX, :ignore => 404, :body => { :size => page_size, :from => offset, :query => query, :sort => ['created_at'] } ) end def delete_jobs_by_query(query) client.delete_by_query( :index => Utility::Constants::JOB_INDEX, :body => { :query => query } ) end def delete_indices(indices) client.indices.delete(:index => indices, :ignore_unavailable => true) end def update_connector_configuration(connector_id, configuration) update_connector_fields(connector_id, :configuration => configuration) end def enable_connector_scheduling(connector_id, cron_expression) payload = { :enabled => true, :interval => cron_expression } update_connector_fields(connector_id, :scheduling => payload) end def disable_connector_scheduling(connector_id) payload = { :enabled => false } update_connector_fields(connector_id, :scheduling => payload) end def set_configurable_field(connector_id, field_name, label, value) payload = { field_name => { :value => value, :label => label } } update_connector_configuration(connector_id, payload) end def update_filtering_validation(connector_id, filter_validation_results) return if filter_validation_results.empty? filtering = get_connector(connector_id).dig(:_source, :filtering) case filtering when Hash update_filter_validation(filtering, filter_validation_results) when Array return unless should_update_validations?(filter_validation_results, filtering) filtering.each do |filter| update_filter_validation(filter, filter_validation_results) end else Utility::Logger.warn("Elasticsearch returned invalid filtering format: #{filtering}. Skipping validation.") return end update_connector_fields(connector_id, { :filtering => filtering }) end def update_connector_sync_now(connector_id, sync_now) doc = connector_with_concurrency_control(connector_id) body = { sync_now: sync_now, last_synced: Time.now } update_connector_fields( connector_id, body, doc[:seq_no], doc[:primary_term] ) end def update_connector_sync_start(connector_id) doc = connector_with_concurrency_control(connector_id) body = { last_sync_status: Connectors::SyncStatus::IN_PROGRESS, last_sync_error: nil, status: Connectors::ConnectorStatus::CONNECTED } update_connector_fields( connector_id, body, doc[:seq_no], doc[:primary_term] ) end def update_connector_custom_scheduling_last_synced(connector_id, schedule_key) doc = connector_with_concurrency_control(connector_id) body = { :custom_scheduling => { schedule_key => { :last_synced => Time.now } } } update_connector_fields( connector_id, body, doc[:seq_no], doc[:primary_term] ) end def connector_with_concurrency_control(connector_id) seq_no = nil primary_term = nil doc = client.get( :index => Utility::Constants::CONNECTORS_INDEX, :id => connector_id, :ignore => 404, :refresh => true ).tap do |response| seq_no = response['_seq_no'] primary_term = response['_primary_term'] end { doc: doc, seq_no: seq_no, primary_term: primary_term } end def create_job(connector_settings:) body = { status: Connectors::SyncStatus::PENDING, created_at: Time.now, last_seen: Time.now, trigger_method: connector_settings.sync_now? ? Connectors::JobTriggerMethod::ON_DEMAND : Connectors::JobTriggerMethod::SCHEDULED, connector: { id: connector_settings.id, filtering: convert_connector_filtering_to_job_filtering(connector_settings.filtering), index_name: connector_settings.index_name, language: connector_settings[:language], pipeline: connector_settings[:pipeline], service_type: connector_settings.service_type, configuration: connector_settings.configuration } } index_response = client.index(index: Utility::Constants::JOB_INDEX, body: body, refresh: true) return index_response if index_response['result'] == 'created' raise JobNotCreatedError.new(connector_settings.id, index_response) end def convert_connector_filtering_to_job_filtering(connector_filtering) return [] unless connector_filtering connector_filtering = [connector_filtering] unless connector_filtering.is_a?(Array) connector_filtering.each_with_object([]) do |filtering_domain, job_filtering| snippet = filtering_domain.dig('active', 'advanced_snippet') || {} job_filtering << { 'domain' => filtering_domain['domain'], 'rules' => filtering_domain.dig('active', 'rules'), 'advanced_snippet' => snippet['value'] || snippet, 'warnings' => [] # TODO: in https://github.com/elastic/enterprise-search-team/issues/3174 } end end def update_connector_status(connector_id, status, error_message = nil) if status == Connectors::ConnectorStatus::ERROR && error_message.nil? raise ArgumentError, 'error_message is required when status is error' end body = { :status => status, :error => status == Connectors::ConnectorStatus::ERROR ? error_message : nil } update_connector_fields(connector_id, body) end def fetch_document_ids(index_name) page_size = 1000 result = [] begin pit_id = client.open_point_in_time(:index => index_name, :keep_alive => '1m', :expand_wildcards => 'all')['id'] body = { :query => { :match_all => {} }, :sort => [{ :id => { :order => :asc } }], :pit => { :id => pit_id, :keep_alive => '1m' }, :size => page_size, :_source => false } loop do response = client.search(:body => body) hits = response.dig('hits', 'hits') || [] ids = hits.map { |h| h['_id'] } result += ids break if hits.size < page_size body[:search_after] = hits.last['sort'] body[:pit][:id] = response['pit_id'] end ensure client.close_point_in_time(:index => index_name, :body => { :id => pit_id }) end result end def ensure_content_index_exists(index_name, use_icu_locale = false, language_code = nil) settings = Utility::Elasticsearch::Index::TextAnalysisSettings.new(:language_code => language_code, :analysis_icu => use_icu_locale).to_h mappings = Utility::Elasticsearch::Index::Mappings.default_text_fields_mappings(:connectors_index => true) body_payload = { settings: settings, mappings: mappings } ensure_index_exists(index_name, body_payload) end def ensure_index_exists(index_name, body = {}) if client.indices.exists?(:index => index_name) return unless body[:mappings] Utility::Logger.debug("Index #{index_name} already exists. Checking mappings...") Utility::Logger.debug("New mappings: #{body[:mappings]}") response = client.indices.get_mapping(:index => index_name) existing = response[index_name]['mappings'] if existing.empty? Utility::Logger.debug("Index #{index_name} has no mappings. Adding mappings...") client.indices.put_mapping(:index => index_name, :body => body[:mappings], :expand_wildcards => 'all') Utility::Logger.debug("Index #{index_name} mappings added.") else Utility::Logger.debug("Index #{index_name} already has mappings: #{existing}. Skipping...") end else client.indices.create(:index => index_name, :body => body) Utility::Logger.debug("Created index #{index_name}") end end def system_index_body(alias_name: nil, mappings: nil) body = { :settings => { :index => { :hidden => true, :number_of_replicas => 0, :auto_expand_replicas => '0-5' } } } body[:aliases] = { alias_name => { :is_write_index => true } } unless alias_name.nil? || alias_name.empty? body[:mappings] = mappings unless mappings.nil? body end # DO NOT USE this method # Creation of connector index should be handled by Kibana, this method is only used by ftest.rb def ensure_connectors_index_exists mappings = { :dynamic => false, :properties => { :api_key_id => { :type => :keyword }, :configuration => { :type => :object }, :custom_schedule => { :type => :object }, :description => { :type => :text }, :error => { :type => :keyword }, :features => { :properties => { :filtering_advanced_config => { :type => :boolean }, :filtering_rules => { :type => :boolean } } }, :filtering => { :properties => { :domain => { :type => :keyword }, :active => { :properties => { :rules => { :properties => { :id => { :type => :keyword }, :policy => { :type => :keyword }, :field => { :type => :keyword }, :rule => { :type => :keyword }, :value => { :type => :keyword }, :order => { :type => :short }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :advanced_snippet => { :properties => { :value => { :type => :object }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :validation => { :properties => { :state => { :type => :keyword }, :errors => { :properties => { :ids => { :type => :keyword }, :messages => { :type => :text } } } } } } }, :draft => { :properties => { :rules => { :properties => { :id => { :type => :keyword }, :policy => { :type => :keyword }, :field => { :type => :keyword }, :rule => { :type => :keyword }, :value => { :type => :keyword }, :order => { :type => :short }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :advanced_snippet => { :properties => { :value => { :type => :object }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :validation => { :properties => { :state => { :type => :keyword }, :errors => { :properties => { :ids => { :type => :keyword }, :messages => { :type => :text } } } } } } } } }, :index_name => { :type => :keyword }, :is_native => { :type => :boolean }, :language => { :type => :keyword }, :last_seen => { :type => :date }, :last_sync_error => { :type => :keyword }, :last_sync_status => { :type => :keyword }, :last_synced => { :type => :date }, :last_deleted_document_count => { :type => :long }, :last_indexed_document_count => { :type => :long }, :name => { :type => :keyword }, :pipeline => { :properties => { :extract_binary_content => { :type => :boolean }, :name => { :type => :keyword }, :reduce_whitespace => { :type => :boolean }, :run_ml_inference => { :type => :boolean } } }, :scheduling => { :properties => { :enabled => { :type => :boolean }, :interval => { :type => :text } } }, :service_type => { :type => :keyword }, :status => { :type => :keyword }, :sync_now => { :type => :boolean } } } ensure_index_exists("#{Utility::Constants::CONNECTORS_INDEX}-v1", system_index_body(:alias_name => Utility::Constants::CONNECTORS_INDEX, :mappings => mappings)) end # DO NOT USE this method # Creation of job index should be handled by Kibana, this method is only used by ftest.rb def ensure_job_index_exists mappings = { :dynamic => false, :properties => { :cancelation_requested_at => { :type => :date }, :canceled_at => { :type => :date }, :completed_at => { :type => :date }, :connector => { :properties => { :configuration => { :type => :object }, :filtering => { :properties => { :domain => { :type => :keyword }, :rules => { :properties => { :id => { :type => :keyword }, :policy => { :type => :keyword }, :field => { :type => :keyword }, :rule => { :type => :keyword }, :value => { :type => :keyword }, :order => { :type => :short }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :advanced_snippet => { :properties => { :value => { :type => :object }, :created_at => { :type => :date }, :updated_at => { :type => :date } } }, :warnings => { :properties => { :ids => { :type => :keyword }, :messages => { :type => :text } } } } }, :id => { :type => :keyword }, :index_name => { :type => :keyword }, :language => { :type => :keyword }, :pipeline => { :properties => { :extract_binary_content => { :type => :boolean }, :name => { :type => :keyword }, :reduce_whitespace => { :type => :boolean }, :run_ml_inference => { :type => :boolean } } }, :service_type => { :type => :keyword } } }, :created_at => { :type => :date }, :deleted_document_count => { :type => :integer }, :error => { :type => :text }, :indexed_document_count => { :type => :integer }, :indexed_document_volume => { :type => :integer }, :last_seen => { :type => :date }, :metadata => { :type => :object }, :started_at => { :type => :date }, :status => { :type => :keyword }, :total_document_count => { :type => :integer }, :trigger_method => { :type => :keyword }, :worker_hostname => { :type => :keyword } } } ensure_index_exists("#{Utility::Constants::JOB_INDEX}-v1", system_index_body(:alias_name => Utility::Constants::JOB_INDEX, :mappings => mappings)) end def update_connector_fields(connector_id, doc = {}, seq_no = nil, primary_term = nil) update_doc_fields(Utility::Constants::CONNECTORS_INDEX, connector_id, doc, seq_no, primary_term) end def update_job_fields(job_id, doc = {}, seq_no = nil, primary_term = nil) update_doc_fields(Utility::Constants::JOB_INDEX, job_id, doc, seq_no, primary_term) end def document_count(index_name) client.indices.refresh(:index => index_name, :ignore_unavailable => true) client.count(:index => index_name, :ignore_unavailable => true)['count'] end private def should_update_validations?(domain_validations, filtering) domains_present = filtering.collect { |filter| filter[:domain] } domains_to_update = domain_validations.keys # non-empty intersection -> domains to update present !(domains_present & domains_to_update).empty? end def client @client ||= Utility::EsClient.new(App::Config[:elasticsearch]) end def get_latest_index_in_alias(alias_name, indicies) index_versions = indicies.map { |index| index.gsub("#{alias_name}-v", '').to_i } index_version = index_versions.max # gets the largest suffix number "#{alias_name}-v#{index_version}" end def update_filter_validation(filter, domain_validations) domain = filter[:domain] if domain_validations.key?(domain) new_validation_state = { :draft => { :validation => domain_validations[domain] } } filter.deep_merge!(new_validation_state) end end def update_doc_fields(index, id, doc = {}, seq_no = nil, primary_term = nil) return if doc.empty? update_args = { :index => index, :id => id, :body => { :doc => doc }, :refresh => true, :retry_on_conflict => 3 } if seq_no && primary_term update_args[:if_seq_no] = seq_no update_args[:if_primary_term] = primary_term update_args.delete(:retry_on_conflict) end begin client.update(update_args) rescue Elastic::Transport::Transport::Errors::Conflict # VersionConflictException # see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#optimistic-concurrency-control-index raise ConnectorVersionChangedError.new(id, seq_no, primary_term) end end end end end