qa/integration/services/logstash_service.rb (305 lines of code) (raw):

# Licensed to Elasticsearch B.V. under one or more contributor # license agreements. See the NOTICE file distributed with # this work for additional information regarding copyright # ownership. Elasticsearch B.V. licenses this file to you under # the Apache License, Version 2.0 (the "License"); you may # not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. require_relative "monitoring_api" require "childprocess" require "bundler" require "socket" require "shellwords" require "tempfile" require 'yaml' # A locally started Logstash service class LogstashService < Service LS_ROOT_DIR = File.join("..", "..", "..", "..") LS_VERSION_FILE = File.expand_path(File.join(LS_ROOT_DIR, "versions.yml"), __FILE__) LS_BUILD_DIR = File.join(LS_ROOT_DIR, "build") LS_BIN = File.join("bin", "logstash") LS_CONFIG_FILE = File.join("config", "logstash.yml") SETTINGS_CLI_FLAG = "--path.settings" STDIN_CONFIG = "input {stdin {}} output { }" RETRY_ATTEMPTS = 60 TIMEOUT_MAXIMUM = 60 * 10 # 10mins. class ProcessStatus < Struct.new(:exit_code, :stderr_and_stdout); end @process = nil attr_reader :logstash_home attr_reader :default_settings_file attr_writer :env_variables def initialize(settings, api_port = 9600) super("logstash", settings) # if you need to point to a LS in different path if @settings.is_set?("ls_home_abs_path") @logstash_home = @settings.get("ls_home_abs_path") else @logstash_home = clean_expand_built_tarball end puts "Using #{@logstash_home} as LS_HOME" @logstash_bin = File.join("#{@logstash_home}", LS_BIN) raise "Logstash binary not found in path #{@logstash_home}" unless File.file? @logstash_bin @default_settings_file = File.join(@logstash_home, LS_CONFIG_FILE) @monitoring_api = MonitoringAPI.new(api_port) end ## # @return [String] the path to a CLEAN expansion of the locally-built tarball def clean_expand_built_tarball build_dir = File.expand_path(LS_BUILD_DIR, __FILE__) # source of tarball target_dir = File.join(build_dir, "qa-fixture") # find the built tarball matching the current version, preferring non-SNAPSHOT ls_version = YAML.load_file(LS_VERSION_FILE).fetch("logstash") candidates = %W( logstash-#{ls_version}.tar.gz logstash-#{ls_version}-SNAPSHOT.tar.gz ) candidates.each do |tarball_candidate| tarball_candidate_path = File.join(build_dir, tarball_candidate) if File.exist?(tarball_candidate_path) expected_untar_directory = File.basename(tarball_candidate, ".tar.gz") result_logstash_home = File.join(target_dir, expected_untar_directory) if Dir.exist?(result_logstash_home) puts "expunging(#{result_logstash_home})" # FileUtils#rm_rf cannot be used here because it silently fails to remove the bundled jdk on MacOS expunge_result = `rm -rf #{Shellwords.escape(result_logstash_home)} 2>&1` fail("ERROR EXPUNGING: #{expunge_result}") unless $?.success? end puts "expanding(#{tarball_candidate_path})" FileUtils.mkdir_p(target_dir) unless Dir.exist?(target_dir) FileUtils.chdir(target_dir) do expand_result = `tar -xzf #{Shellwords.escape(tarball_candidate_path)} 2>&1` fail("ERROR EXPANDING: #{expand_result}") unless $?.success? end return result_logstash_home end end fail("failed to find any matching build tarballs (looked for `#{candidates}` in `#{build_dir}`)") end private :clean_expand_built_tarball def alive? if @process.nil? || @process.exited? raise "Logstash process is not up because of an error, or it stopped" else @process.alive? end end def exited? @process.exited? end def exit_code @process.exit_code end def pid @process.pid end # Starts a LS process in background with a given config file # and shuts it down after input is completely processed def start_background(config_file) spawn_logstash("-e", config_file) end # Given an input this pipes it to LS. Expects a stdin input in LS def start_with_input(config, input) Bundler.with_unbundled_env do `cat #{Shellwords.escape(input)} | LS_JAVA_HOME=#{java.lang.System.getProperty('java.home')} #{Shellwords.escape(@logstash_bin)} -e \'#{config}\'` end end def start_background_with_config_settings(config, settings_file) spawn_logstash("-f", "#{config}", "--path.settings", settings_file) end def start_with_config_string(config) spawn_logstash("-e", "#{config} ") end # Can start LS in stdin and can send messages to stdin # Useful to test metrics and such def start_with_stdin(pipeline_config = STDIN_CONFIG) spawn_logstash("-e", pipeline_config) wait_for_logstash end def write_to_stdin(input) if alive? @process.io.stdin.puts(input) end end # Spawn LS as a child process def spawn_logstash(*args) $stderr.puts "Starting Logstash #{Shellwords.escape(@logstash_bin)} #{Shellwords.join(args)}" Bundler.with_unbundled_env do out = Tempfile.new("duplex") out.sync = true @process = build_child_process(*args) # pipe STDOUT and STDERR to a file @process.io.stdout = @process.io.stderr = out @process.duplex = true # enable stdin to be written @env_variables.map { |k, v| @process.environment[k] = v} unless @env_variables.nil? if ENV['RUNTIME_JAVA_HOME'] logstash_java = @process.environment['LS_JAVA_HOME'] = ENV['RUNTIME_JAVA_HOME'] else ENV.delete('LS_JAVA_HOME') if ENV['LS_JAVA_HOME'] logstash_java = 'bundled java' end @process.io.inherit! @process.start puts "Logstash started with PID #{@process.pid}, using java: #{logstash_java}" if @process.alive? end end def build_child_process(*args) feature_config_dir = @settings.feature_config_dir # if we are using a feature flag and special settings dir to enable it, use it # If some tests is explicitly using --path.settings, ignore doing this, because the tests # chose to overwrite it. if feature_config_dir && !args.include?(SETTINGS_CLI_FLAG) args << "--path.settings" args << feature_config_dir puts "Found feature flag. Starting LS using --path.settings #{feature_config_dir}" end puts "Starting Logstash: #{@logstash_bin} #{args} (pwd: #{Dir.pwd})" ChildProcess.build(@logstash_bin, *args) end def teardown if !@process.nil? # todo: put this in a sleep-wait loop to kill it force kill @process.io.stdin.close rescue nil @process.stop @process = nil end end # check if LS HTTP port is open def is_port_open? begin s = TCPSocket.open("localhost", 9600) s.close return true rescue Errno::ECONNREFUSED, Errno::EHOSTUNREACH return false end end # check REST API is responsive def rest_active? result = monitoring_api.node_info !result.nil? rescue return false end def monitoring_api raise "Logstash is not up, but you asked for monitoring API" unless alive? @monitoring_api end # Wait until LS is started by repeatedly doing a socket connection to HTTP port def wait_for_logstash tries = RETRY_ATTEMPTS while tries > 0 if is_port_open? return else sleep 1 end tries -= 1 end raise "Logstash REST API did not come up after #{RETRY_ATTEMPTS}s." end # wait until LS respond to REST HTTP API request def wait_for_rest_api tries = RETRY_ATTEMPTS while tries > 0 if rest_active? return else sleep 1 end tries -= 1 end raise "Logstash REST API did not come up after #{RETRY_ATTEMPTS}s." end # this method only overwrites existing config with new config # it does not assume that LS pipeline is fully reloaded after a # config change. It is up to the caller to validate that. def reload_config(initial_config_file, reload_config_file) FileUtils.cp(reload_config_file, initial_config_file) end def get_version `LS_JAVA_HOME=#{java.lang.System.getProperty('java.home')} #{Shellwords.escape(@logstash_bin)} --version`.split("\n").last end def get_version_yml LS_VERSION_FILE end def process_id @process.pid end def application_settings_file feature_config_dir = @settings.feature_config_dir unless feature_config_dir @default_settings_file else File.join(feature_config_dir, "logstash.yml") end end def plugin_cli PluginCli.new(self) end def lock_file File.join(@logstash_home, "Gemfile.lock") end def run_cmd(cmd_args, change_dir = true, environment = {}) out = Tempfile.new("content") out.sync = true cmd, *args = cmd_args process = ChildProcess.build(cmd, *args) environment.each do |k, v| process.environment[k] = v end # JDK matrix tests value BUILD_JAVA_HOME to select the JDK to use to run the test code # forward this selection also in spawned Logstash if ENV.key?("BUILD_JAVA_HOME") && !process.environment.key?("LS_JAVA_HOME") process.environment["LS_JAVA_HOME"] = ENV["BUILD_JAVA_HOME"] end process.io.stdout = process.io.stderr = SynchronizedDelegate.new(out) Bundler.with_unbundled_env do if change_dir Dir.chdir(@logstash_home) do process.start end else process.start end end process.poll_for_exit(TIMEOUT_MAXIMUM) out.rewind ProcessStatus.new(process.exit_code, out.read) end def run(*args) run_cmd [@logstash_bin, *args] end ## # A `SynchronizedDelegate` wraps any object and ensures that exactly one # calling thread is invoking methods on it at a time. This is useful for our # clumsy setting of process io STDOUT and STDERR to the same IO object, which # can cause interleaved writes. class SynchronizedDelegate def initialize(obj) require "monitor" @mon = Monitor.new @obj = obj end def respond_to_missing?(method_name, include_private = false) @obj.respond_to?(method_name, include_private) || super end def method_missing(method_name, *args, &block) return super unless @obj.respond_to?(method_name) @mon.synchronize do @obj.public_send(method_name, *args, &block) end end end class PluginCli LOGSTASH_PLUGIN = File.join("bin", "logstash-plugin") attr_reader :logstash_plugin def initialize(logstash_service) @logstash = logstash_service @logstash_plugin = File.join(@logstash.logstash_home, LOGSTASH_PLUGIN) end def remove(plugin_name, *additional_plugins) plugin_list = Shellwords.shelljoin([plugin_name]+additional_plugins) run("remove #{plugin_list}") end def prepare_offline_pack(plugins, output_zip = nil) plugins = Array(plugins) if output_zip.nil? run("prepare-offline-pack #{plugins.join(" ")}") else run("prepare-offline-pack --output #{output_zip} #{plugins.join(" ")}") end end def list(*plugins, verbose: false) command = "list" command << " --verbose" if verbose command << " #{Shellwords.shelljoin(plugins)}" if plugins.any? run(command) end def install(plugin_name, *additional_plugins, version: nil, verify: true, preserve: false, local: false) args = [] args << "--no-verify" unless verify args << "--preserve" if preserve args << "--local" if local args << "--version" << version unless version.nil? args.concat(([plugin_name]+additional_plugins).flatten) run("install #{Shellwords.shelljoin(args)}") end def update(*plugin_list, level: nil, local: nil, verify: nil, conservative: nil) args = [] args << (verify ? "--verify" : "--no-verify") unless verify.nil? args << "--level" << "#{level}" unless level.nil? args << "--local" if local args << (conservative ? "--conservative" : "--no-conservative") unless conservative.nil? args.concat(plugin_list) run("update #{Shellwords.shelljoin(args)}") end def run(command) run_raw("#{logstash_plugin} #{command}") end def run_raw(cmd, change_dir = true, environment = {}) @logstash.run_cmd(Shellwords.shellsplit(cmd), change_dir, environment) end end end