lib/app/config.rb (109 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'config'
require_relative '../utility/logger'
# We look for places in this order:
# - CONNECTORS_CONFIG environment variable
# - here: /../../config/connectors.yml
CONFIG_FILE = ENV['CONNECTORS_CONFIG'] || File.join(__dir__, '../..', 'config', 'connectors.yml')
puts "Parsing #{CONFIG_FILE} configuration file."
::Config.setup do |config|
config.evaluate_erb_in_yaml = false
config.use_env = true
config.env_prefix = ''
config.schema do
required(:version).value(:string)
required(:repository).value(:string)
required(:revision).value(:string)
required(:elasticsearch).hash do
optional(:cloud_id).value(:string)
optional(:hosts).value(:string)
optional(:api_key).value(:string)
optional(:retry_on_failure).value(:integer)
optional(:request_timeout).value(:integer)
optional(:disable_warnings).value(:bool?)
optional(:trace).value(:bool?)
optional(:log).value(:bool?)
optional(:ca_fingerprint).value(:string)
optional(:transport_options).value(:hash)
optional(:headers).value(:hash)
end
optional(:thread_pool).hash do
optional(:min_threads).value(:integer, gteq?: 0)
optional(:max_threads).value(:integer, gteq?: 0)
optional(:max_queue).value(:integer, gteq?: 0)
end
required(:native_mode).value(:bool?)
optional(:connector_id).value(:string)
optional(:service_type).value(:string)
required(:log_level).value(:string)
required(:ecs_logging).value(:bool?)
optional(:poll_interval).value(:integer)
optional(:termination_timeout).value(:integer)
optional(:heartbeat_interval).value(:integer)
optional(:job_cleanup_interval).value(:integer)
optional(:max_ingestion_queue_size).value(:integer) # items
optional(:max_ingestion_queue_bytes).value(:integer) # bytes
end
end
::Config.load_and_set_settings(CONFIG_FILE)
module App
DEFAULT_PASSWORD = 'changeme'
# If it's on cloud (i.e. EnvVar ENT_SEARCH_CONFIG_PATH is set), elasticsearch config in ent-search will be used.
def self.ent_search_es_config
ent_search_config_path = ENV['ENT_SEARCH_CONFIG_PATH']
unless ent_search_config_path
Utility::Logger.info('ENT_SEARCH_CONFIG_PATH is not found, use connector service config.')
return nil
end
Utility::Logger.info("Found ENT_SEARCH_CONFIG_PATH, loading ent-search config from #{ent_search_config_path}")
ent_search_config = begin
YAML.load_file(ent_search_config_path)
rescue StandardError => e
Utility::Logger.error("Failed to load ent-search config #{ent_search_config_path}: #{e.message}")
return nil
end
unless ent_search_config.is_a?(Hash)
Utility::Logger.error("Invalid ent-search config: #{ent_search_config.inspect}")
return nil
end
host = ent_search_config['elasticsearch.host'] || ent_search_config.dig('elasticsearch', 'host')
username = ent_search_config['elasticsearch.username'] || ent_search_config.dig('elasticsearch', 'username')
password = ent_search_config['elasticsearch.password'] || ent_search_config.dig('elasticsearch', 'password')
missing_fields = []
missing_fields << 'elasticsearch.host' unless host
missing_fields << 'elasticsearch.username' unless username
missing_fields << 'elasticsearch.password' unless password
if missing_fields.any?
Utility::Logger.error("Incomplete elasticsearch config, missing #{missing_fields.join(', ')}")
return nil
end
uri = begin
URI.parse(host)
rescue URI::InvalidURIError => e
Utility::Logger.error("Failed to parse elasticsearch host #{host}: #{e.message}")
return nil
end
unless uri.is_a?(URI::HTTP) || uri.is_a?(URI::HTTPS)
Utility::Logger.error("Invalid elasticsearch host #{host}, it must be a http or https URI.")
return nil
end
headers = ent_search_config['elasticsearch.headers'] || ent_search_config.dig('elasticsearch', 'headers')
{
:hosts => [
{
scheme: uri.scheme,
user: username,
password: password,
host: uri.host,
port: uri.port
}
],
:headers => headers
}
end
Config = ::Settings.tap do |config|
if ent_search_config = ent_search_es_config
Utility::Logger.error('Overriding elasticsearch config with ent-search config')
original_es_config = config[:elasticsearch].to_h
original_es_config.delete(:cloud_id)
original_es_config.delete(:hosts)
original_es_config.delete(:api_key)
config[:elasticsearch] = original_es_config.merge(ent_search_config)
end
end
end