lib/connectors/base/connector.rb (97 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'active_support/core_ext/hash/indifferent_access' require 'app/config' require 'bson' require 'core/ingestion' require 'connectors/tolerable_error_helper' require 'core/filtering/advanced_snippet/advanced_snippet_validator' require 'core/filtering/simple_rules/validation/no_conflicting_policies_rules_validator' require 'core/filtering/simple_rules/validation/single_rule_against_schema_validator' require 'core/filtering/transform/filter_transformer_facade' require 'core/filtering/transform/transformation_target' require 'core/filtering/filter_validator' require 'core/filtering/processing_stage' require 'core/filtering/validation_status' require 'utility' require 'utility/filtering' module Connectors module Base class Connector def self.display_name raise 'Not implemented for this connector' end # Used as a framework util method, don't override def self.configurable_fields_indifferent_access configurable_fields.with_indifferent_access end def self.configurable_fields {} end def self.service_type raise 'Not implemented for this connector' end def self.kibana_features [ { :feature => :sync_rules, :subfeature => :basic, :enabled => true }, { :feature => :sync_rules, :subfeature => :advanced, :enabled => true } ] end def self.advanced_snippet_validators Core::Filtering::AdvancedSnippet::AdvancedSnippetValidator end def self.simple_rules_validators { Core::Filtering::ProcessingStage::ALL => [ Core::Filtering::SimpleRules::Validation::SingleRuleAgainstSchemaValidator, Core::Filtering::SimpleRules::Validation::NoConflictingPoliciesRulesValidator ] } end def self.filter_transformers { Core::Filtering::Transform::TransformationTarget::ADVANCED_SNIPPET => [], Core::Filtering::Transform::TransformationTarget::RULES => [] } end def self.validate_filtering(filtering = {}) filter = Utility::Filtering.extract_filter(filtering) filter_validator = Core::Filtering::FilterValidator.new(snippet_validator_classes: advanced_snippet_validators, rules_validator_classes: simple_rules_validators, rules_pre_processing_active: Utility::Filtering.rule_pre_processing_active?(filter)) filter_validator.is_filter_valid(filter) end attr_reader :rules, :advanced_filter_config def initialize(configuration: {}, job_description: nil) error_monitor = Utility::ErrorMonitor.new @tolerable_error_helper = Connectors::TolerableErrorHelper.new(error_monitor) @configuration = job_description&.configuration&.dup || configuration&.dup || {} @job_description = job_description&.dup filter = Utility::Filtering.extract_filter(@job_description&.filtering) filter = Core::Filtering::Transform::FilterTransformerFacade.new(filter, self.class.filter_transformers).transform @rules = filter[:rules] || [] # regression bug, we need to keep indifferent access here until we get rid of symbols in the connectors @advanced_filter_config = filter[:advanced_snippet]&.with_indifferent_access || {} end def yield_documents; end def yield_with_handling_tolerable_errors(identifier: nil, &block) @tolerable_error_helper.yield_single_document(identifier: identifier, &block) end def do_health_check raise 'Not implemented for this connector' end def do_health_check! do_health_check rescue StandardError => e Utility::ExceptionTracking.log_exception(e, "Connector for service #{self.class.service_type} failed the health check for 3rd-party service.") raise Utility::HealthCheckFailedError.new, e.message end def is_healthy? do_health_check true rescue StandardError => e Utility::ExceptionTracking.log_exception(e, "Connector for service #{self.class.service_type} failed the health check for 3rd-party service.") false end def metadata {} end end end end