lib/core/connector_job.rb (190 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'active_support/core_ext/hash/indifferent_access'
require 'connectors/sync_status'
require 'core/connector_settings'
require 'core/elastic_connector_actions'
require 'utility'
module Core
class ConnectorJob
DEFAULT_PAGE_SIZE = 100
IDLE_THRESHOLD = 60
def self.fetch_by_id(job_id)
es_response = ElasticConnectorActions.get_job(job_id)
return nil unless es_response[:found]
new(es_response)
end
def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE)
status_term = { status: Connectors::SyncStatus::PENDING_STATUSES }
query = { bool: { must: [{ terms: status_term }] } }
return fetch_jobs_by_query(query, page_size) if connectors_ids.empty?
query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } }
fetch_jobs_by_query(query, page_size)
end
def self.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE)
query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } }
fetch_jobs_by_query(query, page_size)
end
def self.delete_jobs(jobs)
query = { terms: { '_id': jobs.map(&:id) } }
ElasticConnectorActions.delete_jobs_by_query(query)
end
def self.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE)
connector_ids = if connector_id
[connector_id]
else
ConnectorSettings.fetch_native_connectors.map(&:id)
end
query = {
bool: {
filter: [
{ terms: { 'connector.id': connector_ids } },
{ terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } },
{ range: { last_seen: { lte: "now-#{IDLE_THRESHOLD}s" } } }
]
}
}
fetch_jobs_by_query(query, page_size)
end
def self.enqueue(_connector_id)
nil
end
def id
@elasticsearch_response[:_id]
end
def [](property_name)
@elasticsearch_response[:_source][property_name]
end
def error
self[:error]
end
def status
self[:status]
end
def in_progress?
status == Connectors::SyncStatus::IN_PROGRESS
end
def canceling?
status == Connectors::SyncStatus::CANCELING
end
def suspended?
status == Connectors::SyncStatus::SUSPENDED
end
def canceled?
status == Connectors::SyncStatus::CANCELED
end
def pending?
Connectors::SyncStatus::PENDING_STATUSES.include?(status)
end
def active?
Connectors::SyncStatus::ACTIVE_STATUSES.include?(status)
end
def terminated?
Connectors::SyncStatus::TERMINAL_STATUSES.include?(status)
end
def connector_snapshot
self[:connector] || {}
end
def connector_id
connector_snapshot[:id]
end
def index_name
connector_snapshot[:index_name]
end
def language
connector_snapshot[:language]
end
def service_type
connector_snapshot[:service_type]
end
def configuration
connector_snapshot[:configuration]
end
def filtering
connector_snapshot[:filtering]
end
def pipeline
connector_snapshot[:pipeline] || {}
end
def extract_binary_content?
pipeline[:extract_binary_content]
end
def reduce_whitespace?
pipeline[:reduce_whitespace]
end
def run_ml_inference?
pipeline[:run_ml_inference]
end
def connector
@connector ||= ConnectorSettings.fetch_by_id(connector_id)
end
def update_metadata(ingestion_stats = {}, connector_metadata = {})
ingestion_stats ||= {}
doc = { :last_seen => Time.now }.merge(ingestion_stats)
doc[:metadata] = connector_metadata if connector_metadata&.any?
ElasticConnectorActions.update_job_fields(id, doc)
end
def done!(ingestion_stats = {}, connector_metadata = {})
terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, connector_metadata)
end
def error!(message, ingestion_stats = {}, connector_metadata = {})
terminate!(Connectors::SyncStatus::ERROR, message, ingestion_stats, connector_metadata)
end
def cancel!(ingestion_stats = {}, connector_metadata = {})
terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, connector_metadata)
end
def with_concurrency_control
response = ElasticConnectorActions.get_job(id)
yield response, response['_seq_no'], response['_primary_term']
end
def make_running!
with_concurrency_control do |es_doc, seq_no, primary_term|
now = Time.now
doc = {
status: Connectors::SyncStatus::IN_PROGRESS,
started_at: now,
last_seen: now,
worker_hostname: Socket.gethostname
}
ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term)
end
end
def es_source
@elasticsearch_response[:_source]
end
private
def self.fetch_jobs_by_query(query, page_size)
results = []
offset = 0
loop do
response = ElasticConnectorActions.search_jobs(query, page_size, offset)
hits = response.dig('hits', 'hits') || []
total = response.dig('hits', 'total', 'value') || 0
results += hits.map { |hit| new(hit) }
break if results.size >= total
offset += hits.size
end
results
end
def initialize(es_response)
# TODO: remove the usage of with_indifferent_access. The initialize method should expect a hash argument
@elasticsearch_response = es_response.with_indifferent_access
end
def terminate!(status, error = nil, ingestion_stats = {}, connector_metadata = {})
ingestion_stats ||= {}
ingestion_stats[:total_document_count] = ElasticConnectorActions.document_count(index_name)
doc = {
:last_seen => Time.now,
:completed_at => Time.now,
:status => status,
:error => error
}.merge(ingestion_stats)
doc[:canceled_at] = Time.now if status == Connectors::SyncStatus::CANCELED
doc[:metadata] = connector_metadata if connector_metadata&.any?
ElasticConnectorActions.update_job_fields(id, doc)
end
def seq_no
@elasticsearch_response[:_seq_no]
end
def primary_term
@elasticsearch_response[:_primary_term]
end
end
end