lib/app/preflight_check.rb (112 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
require 'app/version'
require 'utility'
require 'faraday'
module App
class PreflightCheck
class CheckFailure < StandardError; end
class UnhealthyCluster < StandardError; end
STARTUP_RETRY_INTERVAL = 5
STARTUP_RETRY_TIMEOUT = 600
class << self
def run!
check_es_connection!
check_es_version!
check_system_indices!
check_single_connector!
end
private
#-------------------------------------------------------------------------------------------------
# Checks to make sure we can connect to Elasticsearch and make API requests to it
def check_es_connection!
check_es_connection_with_retries!(
:retry_interval => STARTUP_RETRY_INTERVAL,
:retry_timeout => STARTUP_RETRY_TIMEOUT
)
end
#-------------------------------------------------------------------------------------------------
# Ensures that the version of Elasticsearch is compatible with connector service
def check_es_version!
info = client.info
version = info.dig('version', 'number')
fail_check!("Cannot retrieve version from Elasticsearch response:\n#{info.to_json}") unless version
if match_es_version?(version)
Utility::Logger.info("Connector service version (#{App::VERSION}) matches Elasticsearch version (#{version}).")
else
fail_check!("Connector service (#{App::VERSION}) is required to run with the same major and minor version of Elasticsearch (#{version}).")
end
end
#-------------------------------------------------------------------------------------------------
# Ensures that the required system indices of connector service exist
def check_system_indices!
check_system_indices_with_retries!(
:retry_interval => STARTUP_RETRY_INTERVAL,
:retry_timeout => STARTUP_RETRY_TIMEOUT
)
end
#-------------------------------------------------------------------------------------------------
# Ensures the connector is supported when running in non-native mode
def check_single_connector!
if App::Config.native_mode
Utility::Logger.info('Skip single connector check for native mode.')
elsif !Connectors::REGISTRY.registered?(App::Config.service_type)
fail_check!("The service type #{App::Config.service_type} is not supported. Terminating...")
end
end
def check_es_connection_with_retries!(retry_interval:, retry_timeout:)
started_at = Time.now
begin
response = client.cluster.health
Utility::Logger.info('Successfully connected to Elasticsearch')
case response['status']
when 'green'
Utility::Logger.info('Elasticsearch is running and healthy.')
when 'yellow'
Utility::Logger.warn('Elasticsearch is running but the status is yellow.')
when 'red'
raise UnhealthyCluster, 'Elasticsearch is running but unhealthy.'
else
raise UnhealthyCluster, "Unexpected cluster status: #{response['status']}"
end
rescue *Utility::AUTHORIZATION_ERRORS => e
Utility::ExceptionTracking.log_exception(e)
fail_check!("Elasticsearch returned 'Unauthorized' response. Check your authentication details. Terminating...")
rescue *App::RETRYABLE_CONNECTION_ERRORS => e
Utility::Logger.warn('Could not connect to Elasticsearch. Make sure it is running and healthy.')
Utility::Logger.debug("Error: #{e.full_message}")
sleep(retry_interval)
time_elapsed = Time.now - started_at
retry if time_elapsed < retry_timeout
# If we ran out of time, there is not much we can do but shut down
fail_check!("Could not connect to Elasticsearch after #{time_elapsed.to_i} seconds. Terminating...")
end
end
def match_es_version?(es_version)
parse_minor_version(App::VERSION) == parse_minor_version(es_version)
end
def parse_minor_version(version)
version.split('.').slice(0, 2).join('.')
end
def check_system_indices_with_retries!(retry_interval:, retry_timeout:)
started_at = Time.now
loop do
if client.indices.exists?(:index => Utility::Constants::CONNECTORS_INDEX) && client.indices.exists?(:index => Utility::Constants::JOB_INDEX)
Utility::Logger.info("Found system indices #{Utility::Constants::CONNECTORS_INDEX} and #{Utility::Constants::JOB_INDEX}.")
return
end
Utility::Logger.warn('Required system indices for connector service don\'t exist. Make sure to run Kibana first to create system indices.')
sleep(retry_interval)
time_elapsed = Time.now - started_at
if time_elapsed > retry_timeout
fail_check!("Could not find required system indices after #{time_elapsed.to_i} seconds. Terminating...")
end
end
end
def client
@client ||= Utility::EsClient.new(App::Config[:elasticsearch])
end
def fail_check!(message)
raise CheckFailure, message
end
end
end
RETRYABLE_CONNECTION_ERRORS = [
::Faraday::ConnectionFailed,
::Faraday::ClientError,
::Errno::ECONNREFUSED,
::SocketError,
::Errno::ECONNRESET,
App::PreflightCheck::UnhealthyCluster,
::HTTPClient::KeepAliveDisconnected
]
end