lib/app/console_app.rb (232 lines of code) (raw):
#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#
# frozen_string_literal: true
$LOAD_PATH << '../'
require 'app/config'
require 'app/menu'
require 'app/preflight_check'
require 'app/worker'
require 'connectors/registry'
require 'utility'
require 'core'
module App
module ConsoleApp
extend self
INDEX_NAME_REGEXP = /[a-zA-Z]+[\d_\-a-zA-Z]*/
@commands = [
{ :command => :sync_now, :hint => 'start one-time synchronization NOW' },
{ :command => :register, :hint => 'register connector with Elasticsearch' },
{ :command => :scheduling_on, :hint => 'enable connector scheduling' },
{ :command => :scheduling_off, :hint => 'disable connector scheduling' },
{ :command => :set_configurable_field, :hint => 'update the values of configurable fields' },
{ :command => :read_configurable_fields, :hint => 'read the stored values of configurable fields' },
{ :command => :status, :hint => 'check the status of a third-party service' },
{ :command => :exit, :hint => 'end the program' }
]
def connector_id
App::Config[:connector_id]
end
def update_connector_id(connector_id)
App::Config[:connector_id] = connector_id
end
def start_sync_now
return unless connector_registered?
puts 'Initiating synchronization NOW...'
Core::ElasticConnectorActions.force_sync(connector_id)
puts "Successfully synced for connector #{connector_id}"
config_settings = Core::ConnectorSettings.fetch_by_id(connector_id)
Core::ElasticConnectorActions.ensure_content_index_exists(config_settings[:index_name])
Core::SyncJobRunner.new(config_settings, App::Config[:service_type]).execute
end
def show_status
return unless connector_registered?
connector = current_connector
puts 'Checking status...'
puts connector.source_status
puts
end
def register_connector
if connector_id.present?
puts "You already have registered a connector with ID: #{connector_id}. Registering a new connector will overwrite the existing one."
puts 'Are you sure you want to continue? (y/N)'
return false unless gets.chomp.strip.casecmp('y').zero?
end
puts 'Please enter index name for data ingestion. Use only letters, underscored and dashes.'
index_name = gets.chomp.strip
unless INDEX_NAME_REGEXP.match?(index_name)
puts "Index name #{index_name} contains symbols that aren't allowed!"
return false
end
puts 'Do you want to use ICU Analysis Plugin? (y/N)'
use_analysis_icu = gets.chomp.strip.casecmp('y').zero?
language_code = select_analyzer
# create the connector
created_id = create_connector(index_name, use_analysis_icu, language_code)
update_connector_id(created_id)
true
end
def validate_cronline(cronline)
!!Fugit::Cron.parse(Utility::Cron.quartz_to_crontab(cronline))
end
def enable_scheduling
return unless connector_registered?
previous_schedule = Core::ConnectorSettings.fetch_by_id(connector_id)&.full_sync_scheduling&.fetch(:interval, nil)
if previous_schedule.present?
puts "Please enter a valid crontab expression for scheduling. Previous schedule was: #{previous_schedule}."
else
puts 'Please enter a valid crontab expression for scheduling.'
end
cron_expression = gets.chomp.strip.downcase
unless validate_cronline(cron_expression)
puts "Quartz Cron expression #{cron_expression} isn't valid!"
return
end
Core::ElasticConnectorActions.enable_connector_scheduling(connector_id, cron_expression)
puts "Enabled scheduling for connector #{connector_id} with cron expression #{cron_expression}"
end
def disable_scheduling
return unless connector_registered?
puts "Are you sure you want to disable scheduling for connector #{connector_id}? (y/n)"
return unless gets.chomp.strip.casecmp('y').zero?
Core::ElasticConnectorActions.disable_connector_scheduling(connector_id)
puts "Disabled scheduling for connector #{connector_id}"
end
def connector_registered?(warn_if_not: true)
result = connector_id.present?
if warn_if_not && !result
'You have no connector ID yet. Register a new connector before continuing.'
end
result
end
def create_connector(index_name, use_analysis_icu = false, language_code = :en)
id = Core::ElasticConnectorActions.create_connector(index_name, App::Config[:service_type])
Core::ElasticConnectorActions.ensure_content_index_exists(index_name, use_analysis_icu, language_code)
puts "Successfully registered connector #{index_name} with ID #{id}"
connector_settings = Core::ConnectorSettings.fetch_by_id(id)
connector_settings.id
end
def read_command
menu = App::Menu.new('Please select the command:', @commands)
menu.select_command
rescue Interrupt
exit_normally
end
def select_analyzer
analyzers = App::Menu.new('Please select a language analyzer', supported_analyzers)
analyzers.select_command
rescue Interrupt
exit_normally
end
def supported_analyzers
@supported_analyzers ||= YAML.safe_load(
File.read(Utility::Elasticsearch::Index::TextAnalysisSettings::LANGUAGE_DATA_FILE_PATH),
symbolize_names: true
).map do |language_code, data|
{ :command => language_code, :hint => data[:name] }
end
puts @supported_analyzers
@supported_analyzers
end
def wait_for_keypress(message = nil)
if message.present?
puts message
end
puts 'Press any key to continue...'
gets
end
def current_connector
connector_settings = Core::ConnectorSettings.fetch_by_id(App::Config[:connector_id])
service_type = App::Config[:service_type]
if service_type.present?
return registry.connector(service_type, connector_settings.configuration)
end
puts 'You have not set connector service type in settings. Please do so before continuing.'
nil
end
def exit_normally(message = 'Kthxbye!... ¯\_(ツ)_/¯')
puts(message)
exit(true)
end
def registry
@registry = Connectors::REGISTRY
end
puts 'Hello Connectors 3.0!'
sleep(1)
def set_configurable_field
return unless connector_registered?
connector = current_connector
connector_class = connector.class
current_values = Core::ConnectorSettings.fetch_by_id(connector_id)&.configuration
return unless connector.present?
puts 'Provided configurable fields:'
configurable_fields = connector_class.configurable_fields
fields = configurable_fields.each_key.map do |key|
field = configurable_fields[key].with_indifferent_access
current_value = current_values&.fetch(key, nil)
{ :command => key, :hint => "#{field[:label]} (current value: #{current_value}, default: #{field[:value]})" }
end
menu = App::Menu.new('Please select the configurable field:', fields)
field_name = menu.select_command
field_label = configurable_fields.dig(field_name, :label)
puts 'Please enter the new value:'
new_value = gets.chomp.strip
Core::ElasticConnectorActions.set_configurable_field(connector_id, field_name, field_label, new_value)
Utility::Logger.debug("Successfully updated field #{field_name} for connector #{connector_id} to #{new_value}")
end
def read_configurable_fields
return unless connector_registered?
connector = current_connector
connector_class = connector.class
current_values = Core::ConnectorSettings.fetch_by_id(connector_id)&.configuration
return unless connector.present?
puts 'Persisted values of configurable fields:'
connector_class.configurable_fields.each_key.each do |key|
field = connector_class.configurable_fields[key].with_indifferent_access
current_value = current_values&.fetch(key, nil)
puts "* #{field[:label]} - current value: #{current_value}, default: #{field[:value]}"
end
end
Utility::Environment.set_execution_environment(App::Config) do
App::PreflightCheck.run!
loop do
command = read_command
case command
when :sync_now
start_sync_now
wait_for_keypress('Synchronization finished!')
when :status
show_status
wait_for_keypress('Status checked!')
when :register
if register_connector
wait_for_keypress('Please store connector ID in config file and restart the program.')
else
wait_for_keypress('Registration canceled!')
end
when :scheduling_on
enable_scheduling
wait_for_keypress('Scheduling enabled! Start synchronization to see it in action.')
when :scheduling_off
disable_scheduling
wait_for_keypress('Scheduling disabled! Starting synchronization will have no effect now.')
when :set_configurable_field
set_configurable_field
wait_for_keypress('Configurable field is updated!')
when :read_configurable_fields
read_configurable_fields
wait_for_keypress
when :exit
exit_normally
else
exit_normally('Sorry, this command is not yet implemented')
end
end
end
rescue App::PreflightCheck::CheckFailure => e
Utility::Logger.error("Preflight check failed: #{e.message}")
exit(-1)
rescue SystemExit
puts 'Exiting.'
rescue Interrupt
exit_normally
rescue StandardError => e
Utility::ExceptionTracking.log_exception(e)
exit(false)
end
end