lib/core/connector_job.rb (190 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'active_support/core_ext/hash/indifferent_access' require 'connectors/sync_status' require 'core/connector_settings' require 'core/elastic_connector_actions' require 'utility' module Core class ConnectorJob DEFAULT_PAGE_SIZE = 100 IDLE_THRESHOLD = 60 def self.fetch_by_id(job_id) es_response = ElasticConnectorActions.get_job(job_id) return nil unless es_response[:found] new(es_response) end def self.pending_jobs(connectors_ids: [], page_size: DEFAULT_PAGE_SIZE) status_term = { status: Connectors::SyncStatus::PENDING_STATUSES } query = { bool: { must: [{ terms: status_term }] } } return fetch_jobs_by_query(query, page_size) if connectors_ids.empty? query[:bool][:must] << { terms: { 'connector.id' => connectors_ids } } fetch_jobs_by_query(query, page_size) end def self.orphaned_jobs(connector_ids = [], page_size = DEFAULT_PAGE_SIZE) query = { bool: { must_not: { terms: { 'connector.id': connector_ids } } } } fetch_jobs_by_query(query, page_size) end def self.delete_jobs(jobs) query = { terms: { '_id': jobs.map(&:id) } } ElasticConnectorActions.delete_jobs_by_query(query) end def self.idle_jobs(connector_id = nil, page_size = DEFAULT_PAGE_SIZE) connector_ids = if connector_id [connector_id] else ConnectorSettings.fetch_native_connectors.map(&:id) end query = { bool: { filter: [ { terms: { 'connector.id': connector_ids } }, { terms: { status: Connectors::SyncStatus::ACTIVE_STATUSES } }, { range: { last_seen: { lte: "now-#{IDLE_THRESHOLD}s" } } } ] } } fetch_jobs_by_query(query, page_size) end def self.enqueue(_connector_id) nil end def id @elasticsearch_response[:_id] end def [](property_name) @elasticsearch_response[:_source][property_name] end def error self[:error] end def status self[:status] end def in_progress? status == Connectors::SyncStatus::IN_PROGRESS end def canceling? status == Connectors::SyncStatus::CANCELING end def suspended? status == Connectors::SyncStatus::SUSPENDED end def canceled? status == Connectors::SyncStatus::CANCELED end def pending? Connectors::SyncStatus::PENDING_STATUSES.include?(status) end def active? Connectors::SyncStatus::ACTIVE_STATUSES.include?(status) end def terminated? Connectors::SyncStatus::TERMINAL_STATUSES.include?(status) end def connector_snapshot self[:connector] || {} end def connector_id connector_snapshot[:id] end def index_name connector_snapshot[:index_name] end def language connector_snapshot[:language] end def service_type connector_snapshot[:service_type] end def configuration connector_snapshot[:configuration] end def filtering connector_snapshot[:filtering] end def pipeline connector_snapshot[:pipeline] || {} end def extract_binary_content? pipeline[:extract_binary_content] end def reduce_whitespace? pipeline[:reduce_whitespace] end def run_ml_inference? pipeline[:run_ml_inference] end def connector @connector ||= ConnectorSettings.fetch_by_id(connector_id) end def update_metadata(ingestion_stats = {}, connector_metadata = {}) ingestion_stats ||= {} doc = { :last_seen => Time.now }.merge(ingestion_stats) doc[:metadata] = connector_metadata if connector_metadata&.any? ElasticConnectorActions.update_job_fields(id, doc) end def done!(ingestion_stats = {}, connector_metadata = {}) terminate!(Connectors::SyncStatus::COMPLETED, nil, ingestion_stats, connector_metadata) end def error!(message, ingestion_stats = {}, connector_metadata = {}) terminate!(Connectors::SyncStatus::ERROR, message, ingestion_stats, connector_metadata) end def cancel!(ingestion_stats = {}, connector_metadata = {}) terminate!(Connectors::SyncStatus::CANCELED, nil, ingestion_stats, connector_metadata) end def with_concurrency_control response = ElasticConnectorActions.get_job(id) yield response, response['_seq_no'], response['_primary_term'] end def make_running! with_concurrency_control do |es_doc, seq_no, primary_term| now = Time.now doc = { status: Connectors::SyncStatus::IN_PROGRESS, started_at: now, last_seen: now, worker_hostname: Socket.gethostname } ElasticConnectorActions.update_job_fields(es_doc[:_id], doc, seq_no, primary_term) end end def es_source @elasticsearch_response[:_source] end private def self.fetch_jobs_by_query(query, page_size) results = [] offset = 0 loop do response = ElasticConnectorActions.search_jobs(query, page_size, offset) hits = response.dig('hits', 'hits') || [] total = response.dig('hits', 'total', 'value') || 0 results += hits.map { |hit| new(hit) } break if results.size >= total offset += hits.size end results end def initialize(es_response) # TODO: remove the usage of with_indifferent_access. The initialize method should expect a hash argument @elasticsearch_response = es_response.with_indifferent_access end def terminate!(status, error = nil, ingestion_stats = {}, connector_metadata = {}) ingestion_stats ||= {} ingestion_stats[:total_document_count] = ElasticConnectorActions.document_count(index_name) doc = { :last_seen => Time.now, :completed_at => Time.now, :status => status, :error => error }.merge(ingestion_stats) doc[:canceled_at] = Time.now if status == Connectors::SyncStatus::CANCELED doc[:metadata] = connector_metadata if connector_metadata&.any? ElasticConnectorActions.update_job_fields(id, doc) end def seq_no @elasticsearch_response[:_seq_no] end def primary_term @elasticsearch_response[:_primary_term] end end end