lib/core/connector_settings.rb (154 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'active_support/core_ext/hash/indifferent_access'
require 'connectors/connector_status'
require 'connectors/sync_status'
require 'core/elastic_connector_actions'
require 'utility'
module Core
class ConnectorSettings
DEFAULT_REQUEST_PIPELINE = 'ent-search-generic-ingestion'
DEFAULT_EXTRACT_BINARY_CONTENT = true
DEFAULT_REDUCE_WHITESPACE = true
DEFAULT_RUN_ML_INFERENCE = true
DEFAULT_FILTERING = {}
DEFAULT_PAGE_SIZE = 100
def self.fetch_by_id(connector_id)
es_response = ElasticConnectorActions.get_connector(connector_id)
return nil unless es_response[:found]
connectors_meta = ElasticConnectorActions.connectors_meta
new(es_response, connectors_meta)
end
def self.fetch_native_connectors(page_size = DEFAULT_PAGE_SIZE)
require 'connectors/registry' unless defined?(Connectors::REGISTRY)
query = {
bool: {
filter: [
{ term: { is_native: true } },
{ terms: { service_type: Connectors::REGISTRY.registered_connectors } }
]
}
}
fetch_connectors_by_query(query, page_size)
end
def self.fetch_crawler_connectors(page_size = DEFAULT_PAGE_SIZE)
query = { term: { service_type: Utility::Constants::CRAWLER_SERVICE_TYPE } }
fetch_connectors_by_query(query, page_size)
end
def self.fetch_all_connectors(page_size = DEFAULT_PAGE_SIZE)
query = { match_all: {} }
fetch_connectors_by_query(query, page_size)
end
def id
@elasticsearch_response[:_id]
end
def [](property_name)
# TODO: handle not found
@elasticsearch_response[:_source][property_name]
end
def features
self[:features] || {}
end
# .dig version is the modern features way of doing things,
# Right-hand of OR operator is legacy features support
# When this is fixed with a migration, we can go ahead
def filtering_rule_feature_enabled?
!!features.dig(:sync_rules, :basic, :enabled) || !!features[:filtering_rules]
end
def filtering_advanced_config_feature_enabled?
!!features.dig(:sync_rules, :advanced, :enabled) || !!features[:filtering_advanced_config]
end
def any_filtering_feature_enabled?
filtering_rule_feature_enabled? || filtering_advanced_config_feature_enabled?
end
def index_name
self[:index_name]
end
def connector_status
self[:status]
end
def connector_status_allows_sync?
Connectors::ConnectorStatus::STATUSES_ALLOWING_SYNC.include?(connector_status)
end
def service_type
self[:service_type]
end
def configuration
self[:configuration]
end
def scheduling_settings
self[:scheduling] || {}
end
def full_sync_scheduling
scheduling_settings[:full]
end
def custom_scheduling_settings
self[:custom_scheduling]
end
def sync_now?
self[:sync_now] == true
end
def last_synced
self[:last_synced]
end
def filtering
# assume for now, that first object in filtering array or a filter object itself is the only filtering object
filtering = @elasticsearch_response.dig(:_source, :filtering)
Utility::Filtering.extract_filter(filtering)
end
def request_pipeline
Utility::Common.return_if_present(@elasticsearch_response.dig(:_source, :pipeline, :name), @connectors_meta.dig(:pipeline, :default_name), DEFAULT_REQUEST_PIPELINE)
end
def formatted
properties = ["ID: #{id}"]
properties << "Service type: #{service_type}" if service_type
"connector (#{properties.join(', ')})"
end
def needs_service_type?
service_type.to_s.strip.empty?
end
def valid_index_name?
index_name&.start_with?(Utility::Constants::CONTENT_INDEX_PREFIX)
end
def ready_for_sync?
Connectors::REGISTRY.registered?(service_type) &&
valid_index_name? &&
connector_status_allows_sync?
end
def running?
@elasticsearch_response[:_source][:last_sync_status] == Connectors::SyncStatus::IN_PROGRESS
end
def update_last_sync!(job)
# if job is nil, connector still needs to be updated, to avoid it stuck at in_progress
job_status = job&.status || Connectors::SyncStatus::ERROR
job_error = job.nil? ? 'Could\'t find the job' : job.error
job_error ||= 'unknown error' if job_status == Connectors::SyncStatus::ERROR
connector_status = (job_status == Connectors::SyncStatus::ERROR ? Connectors::ConnectorStatus::ERROR : Connectors::ConnectorStatus::CONNECTED)
doc = {
:last_sync_status => job_status,
:last_synced => Time.now,
:last_sync_error => job_error,
:status => connector_status,
:error => job_error
}
if job&.terminated?
doc[:last_indexed_document_count] = job[:indexed_document_count]
doc[:last_deleted_document_count] = job[:deleted_document_count]
end
Core::ElasticConnectorActions.update_connector_fields(id, doc)
end
private
def initialize(es_response, connectors_meta)
@elasticsearch_response = es_response.with_indifferent_access
@connectors_meta = connectors_meta.with_indifferent_access
end
def self.fetch_connectors_by_query(query, page_size)
connectors_meta = ElasticConnectorActions.connectors_meta
results = []
offset = 0
loop do
response = ElasticConnectorActions.search_connectors(query, page_size, offset)
hits = response.dig('hits', 'hits') || []
total = response.dig('hits', 'total', 'value') || 0
results += hits.map do |hit|
Core::ConnectorSettings.new(hit, connectors_meta)
end
break if results.size >= total
offset += hits.size
end
results
end
end
end