lib/core/elastic_connector_actions.rb (525 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
#
require 'active_support/core_ext/hash'
require 'connectors/connector_status'
require 'connectors/job_trigger_method'
require 'connectors/sync_status'
require 'utility'
require 'elastic-transport'
module Core
class JobAlreadyRunningError < StandardError
def initialize(connector_id)
super("Sync job for connector '#{connector_id}' is already running.")
end
end
class JobNotCreatedError < StandardError
def initialize(connector_id, response)
super("Sync job for connector '#{connector_id}' could not be created. Response: #{response}")
end
end
class ConnectorVersionChangedError < StandardError
def initialize(connector_id, seq_no, primary_term)
super("Version conflict: seq_no [#{seq_no}] and primary_term [#{primary_term}] do not match for connector '#{connector_id}'.")
end
end
class ElasticConnectorActions
class << self
def force_sync(connector_id)
update_connector_fields(connector_id, :scheduling => { :enabled => true }, :sync_now => true)
end
def create_connector(index_name, service_type)
body = {
:scheduling => { :enabled => true },
:index_name => index_name,
:service_type => service_type
}
response = client.index(:index => Utility::Constants::CONNECTORS_INDEX, :body => body)
response['_id']
end
def get_connector(connector_id)
# TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found
client.get(:index => Utility::Constants::CONNECTORS_INDEX, :id => connector_id, :ignore => 404).with_indifferent_access
end
def get_job(job_id)
# TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found
client.get(:index => Utility::Constants::JOB_INDEX, :id => job_id, :ignore => 404).with_indifferent_access
end
def connectors_meta
# TODO: remove the usage of with_indifferent_access. Ideally this should return a hash or nil if not found
alias_mappings = client.indices.get_mapping(:index => Utility::Constants::CONNECTORS_INDEX, :ignore => 404).with_indifferent_access
index = get_latest_index_in_alias(Utility::Constants::CONNECTORS_INDEX, alias_mappings.keys)
alias_mappings.dig(index, 'mappings', '_meta') || {
:extract_binary_content => true,
:name => 'ent-search-generic-ingestion',
:reduce_whitespace => true,
:run_ml_inference => false,
}
end
def search_connectors(query, page_size, offset)
client.search(
:index => Utility::Constants::CONNECTORS_INDEX,
:ignore => 404,
:body => {
:size => page_size,
:from => offset,
:query => query,
:sort => ['name']
}
)
end
def search_jobs(query, page_size, offset)
client.search(
:index => Utility::Constants::JOB_INDEX,
:ignore => 404,
:body => {
:size => page_size,
:from => offset,
:query => query,
:sort => ['created_at']
}
)
end
def delete_jobs_by_query(query)
client.delete_by_query(
:index => Utility::Constants::JOB_INDEX,
:body => { :query => query }
)
end
def delete_indices(indices)
client.indices.delete(:index => indices, :ignore_unavailable => true)
end
def update_connector_configuration(connector_id, configuration)
update_connector_fields(connector_id, :configuration => configuration)
end
def enable_connector_scheduling(connector_id, cron_expression)
payload = { :enabled => true, :interval => cron_expression }
update_connector_fields(connector_id, :scheduling => payload)
end
def disable_connector_scheduling(connector_id)
payload = { :enabled => false }
update_connector_fields(connector_id, :scheduling => payload)
end
def set_configurable_field(connector_id, field_name, label, value)
payload = { field_name => { :value => value, :label => label } }
update_connector_configuration(connector_id, payload)
end
def update_filtering_validation(connector_id, filter_validation_results)
return if filter_validation_results.empty?
filtering = get_connector(connector_id).dig(:_source, :filtering)
case filtering
when Hash
update_filter_validation(filtering, filter_validation_results)
when Array
return unless should_update_validations?(filter_validation_results, filtering)
filtering.each do |filter|
update_filter_validation(filter, filter_validation_results)
end
else
Utility::Logger.warn("Elasticsearch returned invalid filtering format: #{filtering}. Skipping validation.")
return
end
update_connector_fields(connector_id, { :filtering => filtering })
end
def update_connector_sync_now(connector_id, sync_now)
doc = connector_with_concurrency_control(connector_id)
body = { sync_now: sync_now, last_synced: Time.now }
update_connector_fields(
connector_id,
body,
doc[:seq_no],
doc[:primary_term]
)
end
def update_connector_sync_start(connector_id)
doc = connector_with_concurrency_control(connector_id)
body = {
last_sync_status: Connectors::SyncStatus::IN_PROGRESS,
last_sync_error: nil,
status: Connectors::ConnectorStatus::CONNECTED
}
update_connector_fields(
connector_id,
body,
doc[:seq_no],
doc[:primary_term]
)
end
def update_connector_custom_scheduling_last_synced(connector_id, schedule_key)
doc = connector_with_concurrency_control(connector_id)
body = {
:custom_scheduling => {
schedule_key => {
:last_synced => Time.now
}
}
}
update_connector_fields(
connector_id,
body,
doc[:seq_no],
doc[:primary_term]
)
end
def connector_with_concurrency_control(connector_id)
seq_no = nil
primary_term = nil
doc = client.get(
:index => Utility::Constants::CONNECTORS_INDEX,
:id => connector_id,
:ignore => 404,
:refresh => true
).tap do |response|
seq_no = response['_seq_no']
primary_term = response['_primary_term']
end
{ doc: doc, seq_no: seq_no, primary_term: primary_term }
end
def create_job(connector_settings:)
body = {
status: Connectors::SyncStatus::PENDING,
created_at: Time.now,
last_seen: Time.now,
trigger_method: connector_settings.sync_now? ? Connectors::JobTriggerMethod::ON_DEMAND : Connectors::JobTriggerMethod::SCHEDULED,
connector: {
id: connector_settings.id,
filtering: convert_connector_filtering_to_job_filtering(connector_settings.filtering),
index_name: connector_settings.index_name,
language: connector_settings[:language],
pipeline: connector_settings[:pipeline],
service_type: connector_settings.service_type,
configuration: connector_settings.configuration
}
}
index_response = client.index(index: Utility::Constants::JOB_INDEX, body: body, refresh: true)
return index_response if index_response['result'] == 'created'
raise JobNotCreatedError.new(connector_settings.id, index_response)
end
def convert_connector_filtering_to_job_filtering(connector_filtering)
return [] unless connector_filtering
connector_filtering = [connector_filtering] unless connector_filtering.is_a?(Array)
connector_filtering.each_with_object([]) do |filtering_domain, job_filtering|
snippet = filtering_domain.dig('active', 'advanced_snippet') || {}
job_filtering << {
'domain' => filtering_domain['domain'],
'rules' => filtering_domain.dig('active', 'rules'),
'advanced_snippet' => snippet['value'] || snippet,
'warnings' => [] # TODO: in https://github.com/elastic/enterprise-search-team/issues/3174
}
end
end
def update_connector_status(connector_id, status, error_message = nil)
if status == Connectors::ConnectorStatus::ERROR && error_message.nil?
raise ArgumentError, 'error_message is required when status is error'
end
body = {
:status => status,
:error => status == Connectors::ConnectorStatus::ERROR ? error_message : nil
}
update_connector_fields(connector_id, body)
end
def fetch_document_ids(index_name)
page_size = 1000
result = []
begin
pit_id = client.open_point_in_time(:index => index_name, :keep_alive => '1m', :expand_wildcards => 'all')['id']
body = {
:query => { :match_all => {} },
:sort => [{ :id => { :order => :asc } }],
:pit => {
:id => pit_id,
:keep_alive => '1m'
},
:size => page_size,
:_source => false
}
loop do
response = client.search(:body => body)
hits = response.dig('hits', 'hits') || []
ids = hits.map { |h| h['_id'] }
result += ids
break if hits.size < page_size
body[:search_after] = hits.last['sort']
body[:pit][:id] = response['pit_id']
end
ensure
client.close_point_in_time(:index => index_name, :body => { :id => pit_id })
end
result
end
def ensure_content_index_exists(index_name, use_icu_locale = false, language_code = nil)
settings = Utility::Elasticsearch::Index::TextAnalysisSettings.new(:language_code => language_code, :analysis_icu => use_icu_locale).to_h
mappings = Utility::Elasticsearch::Index::Mappings.default_text_fields_mappings(:connectors_index => true)
body_payload = { settings: settings, mappings: mappings }
ensure_index_exists(index_name, body_payload)
end
def ensure_index_exists(index_name, body = {})
if client.indices.exists?(:index => index_name)
return unless body[:mappings]
Utility::Logger.debug("Index #{index_name} already exists. Checking mappings...")
Utility::Logger.debug("New mappings: #{body[:mappings]}")
response = client.indices.get_mapping(:index => index_name)
existing = response[index_name]['mappings']
if existing.empty?
Utility::Logger.debug("Index #{index_name} has no mappings. Adding mappings...")
client.indices.put_mapping(:index => index_name, :body => body[:mappings], :expand_wildcards => 'all')
Utility::Logger.debug("Index #{index_name} mappings added.")
else
Utility::Logger.debug("Index #{index_name} already has mappings: #{existing}. Skipping...")
end
else
client.indices.create(:index => index_name, :body => body)
Utility::Logger.debug("Created index #{index_name}")
end
end
def system_index_body(alias_name: nil, mappings: nil)
body = {
:settings => {
:index => {
:hidden => true,
:number_of_replicas => 0,
:auto_expand_replicas => '0-5'
}
}
}
body[:aliases] = { alias_name => { :is_write_index => true } } unless alias_name.nil? || alias_name.empty?
body[:mappings] = mappings unless mappings.nil?
body
end
# DO NOT USE this method
# Creation of connector index should be handled by Kibana, this method is only used by ftest.rb
def ensure_connectors_index_exists
mappings = {
:dynamic => false,
:properties => {
:api_key_id => { :type => :keyword },
:configuration => { :type => :object },
:custom_schedule => { :type => :object },
:description => { :type => :text },
:error => { :type => :keyword },
:features => {
:properties => {
:filtering_advanced_config => { :type => :boolean },
:filtering_rules => { :type => :boolean }
}
},
:filtering => {
:properties => {
:domain => { :type => :keyword },
:active => {
:properties => {
:rules => {
:properties => {
:id => { :type => :keyword },
:policy => { :type => :keyword },
:field => { :type => :keyword },
:rule => { :type => :keyword },
:value => { :type => :keyword },
:order => { :type => :short },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:advanced_snippet => {
:properties => {
:value => { :type => :object },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:validation => {
:properties => {
:state => { :type => :keyword },
:errors => {
:properties => {
:ids => { :type => :keyword },
:messages => { :type => :text }
}
}
}
}
}
},
:draft => {
:properties => {
:rules => {
:properties => {
:id => { :type => :keyword },
:policy => { :type => :keyword },
:field => { :type => :keyword },
:rule => { :type => :keyword },
:value => { :type => :keyword },
:order => { :type => :short },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:advanced_snippet => {
:properties => {
:value => { :type => :object },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:validation => {
:properties => {
:state => { :type => :keyword },
:errors => {
:properties => {
:ids => { :type => :keyword },
:messages => { :type => :text }
}
}
}
}
}
}
}
},
:index_name => { :type => :keyword },
:is_native => { :type => :boolean },
:language => { :type => :keyword },
:last_seen => { :type => :date },
:last_sync_error => { :type => :keyword },
:last_sync_status => { :type => :keyword },
:last_synced => { :type => :date },
:last_deleted_document_count => { :type => :long },
:last_indexed_document_count => { :type => :long },
:name => { :type => :keyword },
:pipeline => {
:properties => {
:extract_binary_content => { :type => :boolean },
:name => { :type => :keyword },
:reduce_whitespace => { :type => :boolean },
:run_ml_inference => { :type => :boolean }
}
},
:scheduling => {
:properties => {
:enabled => { :type => :boolean },
:interval => { :type => :text }
}
},
:service_type => { :type => :keyword },
:status => { :type => :keyword },
:sync_now => { :type => :boolean }
}
}
ensure_index_exists("#{Utility::Constants::CONNECTORS_INDEX}-v1", system_index_body(:alias_name => Utility::Constants::CONNECTORS_INDEX, :mappings => mappings))
end
# DO NOT USE this method
# Creation of job index should be handled by Kibana, this method is only used by ftest.rb
def ensure_job_index_exists
mappings = {
:dynamic => false,
:properties => {
:cancelation_requested_at => { :type => :date },
:canceled_at => { :type => :date },
:completed_at => { :type => :date },
:connector => {
:properties => {
:configuration => { :type => :object },
:filtering => {
:properties => {
:domain => { :type => :keyword },
:rules => {
:properties => {
:id => { :type => :keyword },
:policy => { :type => :keyword },
:field => { :type => :keyword },
:rule => { :type => :keyword },
:value => { :type => :keyword },
:order => { :type => :short },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:advanced_snippet => {
:properties => {
:value => { :type => :object },
:created_at => { :type => :date },
:updated_at => { :type => :date }
}
},
:warnings => {
:properties => {
:ids => { :type => :keyword },
:messages => { :type => :text }
}
}
}
},
:id => { :type => :keyword },
:index_name => { :type => :keyword },
:language => { :type => :keyword },
:pipeline => {
:properties => {
:extract_binary_content => { :type => :boolean },
:name => { :type => :keyword },
:reduce_whitespace => { :type => :boolean },
:run_ml_inference => { :type => :boolean }
}
},
:service_type => { :type => :keyword }
}
},
:created_at => { :type => :date },
:deleted_document_count => { :type => :integer },
:error => { :type => :text },
:indexed_document_count => { :type => :integer },
:indexed_document_volume => { :type => :integer },
:last_seen => { :type => :date },
:metadata => { :type => :object },
:started_at => { :type => :date },
:status => { :type => :keyword },
:total_document_count => { :type => :integer },
:trigger_method => { :type => :keyword },
:worker_hostname => { :type => :keyword }
}
}
ensure_index_exists("#{Utility::Constants::JOB_INDEX}-v1", system_index_body(:alias_name => Utility::Constants::JOB_INDEX, :mappings => mappings))
end
def update_connector_fields(connector_id, doc = {}, seq_no = nil, primary_term = nil)
update_doc_fields(Utility::Constants::CONNECTORS_INDEX, connector_id, doc, seq_no, primary_term)
end
def update_job_fields(job_id, doc = {}, seq_no = nil, primary_term = nil)
update_doc_fields(Utility::Constants::JOB_INDEX, job_id, doc, seq_no, primary_term)
end
def document_count(index_name)
client.indices.refresh(:index => index_name, :ignore_unavailable => true)
client.count(:index => index_name, :ignore_unavailable => true)['count']
end
private
def should_update_validations?(domain_validations, filtering)
domains_present = filtering.collect { |filter| filter[:domain] }
domains_to_update = domain_validations.keys
# non-empty intersection -> domains to update present
!(domains_present & domains_to_update).empty?
end
def client
@client ||= Utility::EsClient.new(App::Config[:elasticsearch])
end
def get_latest_index_in_alias(alias_name, indicies)
index_versions = indicies.map { |index| index.gsub("#{alias_name}-v", '').to_i }
index_version = index_versions.max # gets the largest suffix number
"#{alias_name}-v#{index_version}"
end
def update_filter_validation(filter, domain_validations)
domain = filter[:domain]
if domain_validations.key?(domain)
new_validation_state = { :draft => { :validation => domain_validations[domain] } }
filter.deep_merge!(new_validation_state)
end
end
def update_doc_fields(index, id, doc = {}, seq_no = nil, primary_term = nil)
return if doc.empty?
update_args = {
:index => index,
:id => id,
:body => { :doc => doc },
:refresh => true,
:retry_on_conflict => 3
}
if seq_no && primary_term
update_args[:if_seq_no] = seq_no
update_args[:if_primary_term] = primary_term
update_args.delete(:retry_on_conflict)
end
begin
client.update(update_args)
rescue Elastic::Transport::Transport::Errors::Conflict
# VersionConflictException
# see https://www.elastic.co/guide/en/elasticsearch/reference/current/docs-index_.html#optimistic-concurrency-control-index
raise ConnectorVersionChangedError.new(id, seq_no, primary_term)
end
end
end
end
end