x-pack/lib/monitoring/monitoring.rb (245 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. require "logstash/agent" require "monitoring/internal_pipeline_source" require 'helpers/elasticsearch_options' module LogStash class MonitoringExtension < LogStash::UniversalPlugin include LogStash::Util::Loggable java_import java.util.concurrent.TimeUnit class TemplateData def initialize(node_uuid, system_api_version, es_settings, collection_interval, collection_timeout_interval, extended_performance_collection, config_collection) @system_api_version = system_api_version @node_uuid = node_uuid @collection_interval = collection_interval @collection_timeout_interval = collection_timeout_interval @extended_performance_collection = extended_performance_collection @config_collection = config_collection @es_hosts = es_settings['hosts'] @user = es_settings['user'] @password = es_settings['password'] @cloud_id = es_settings['cloud_id'] @cloud_auth = es_settings['cloud_auth'] @api_key = es_settings['api_key'] @proxy = es_settings['proxy'] @ssl_enabled = es_settings['ssl_enabled'] @ssl_certificate_authorities = es_settings['ssl_certificate_authorities'] @ca_trusted_fingerprint = es_settings['ca_trusted_fingerprint'] @ssl_truststore_path = es_settings['ssl_truststore_path'] @ssl_truststore_password = es_settings['ssl_truststore_password'] @ssl_keystore_path = es_settings['ssl_keystore_path'] @ssl_keystore_password = es_settings['ssl_keystore_password'] @ssl_verification_mode = es_settings.fetch('ssl_verification_mode', 'full') @ssl_certificate = es_settings['ssl_certificate'] @ssl_key = es_settings['ssl_key'] @ssl_cipher_suites = es_settings['ssl_cipher_suites'] @sniffing = es_settings['sniffing'] end attr_accessor :system_api_version, :es_hosts, :user, :password, :node_uuid, :cloud_id, :cloud_auth, :api_key attr_accessor :proxy, :ssl_enabled, :ssl_certificate_authorities, :ca_trusted_fingerprint, :ssl_truststore_path, :ssl_truststore_password attr_accessor :ssl_keystore_path, :ssl_keystore_password, :sniffing, :ssl_verification_mode, :ssl_cipher_suites, :ssl_certificate, :ssl_key def collection_interval TimeUnit::SECONDS.convert(@collection_interval, TimeUnit::NANOSECONDS) end def collection_timeout_interval TimeUnit::SECONDS.convert(@collection_timeout_interval, TimeUnit::NANOSECONDS) end def cloud_id? !!cloud_id end def cloud_auth? !!cloud_auth && cloud_id? end def proxy? proxy end def auth? user && password end def api_key? api_key end def ssl_enabled? ssl_enabled || ssl_certificate_authorities || ca_trusted_fingerprint || ssl_truststore_path? || ssl_keystore_path? || ssl_certificate? end def ssl_truststore_path? ssl_truststore_path && ssl_truststore_password end def ssl_keystore_path? ssl_keystore_path && ssl_keystore_password end def ssl_certificate? ssl_certificate && ssl_key end def extended_performance_collection? @extended_performance_collection end def config_collection? @config_collection end def get_binding binding end def monitoring_endpoint if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) "/_bulk/" else "/_monitoring/bulk?system_id=logstash&system_api_version=#{system_api_version}&interval=1s" end end def monitoring_index if LogStash::MonitoringExtension.use_direct_shipping?(LogStash::SETTINGS) ".monitoring-logstash-#{system_api_version}-" + Time.now.utc.to_date.strftime("%Y.%m.%d") else "" #let the ES xpack's reporter to create it end end end class PipelineRegisterHook include LogStash::Util::Loggable, LogStash::Helpers::ElasticsearchOptions PIPELINE_ID = ".monitoring-logstash" API_VERSION = 7 def initialize # nothing to do here end def after_agent(runner) return unless monitoring_enabled?(runner.settings) deprecation_logger.deprecated( "Internal collectors option for Logstash monitoring is deprecated and targeted for removal in the next major version.\n"\ "Please configure Elastic Agent to monitor Logstash. Documentation can be found at: \n"\ "https://www.elastic.co/guide/en/logstash/current/monitoring-with-elastic-agent.html" ) logger.trace("registering the metrics pipeline") LogStash::SETTINGS.set("node.uuid", runner.agent.id) internal_pipeline_source = LogStash::Monitoring::InternalPipelineSource.new(setup_metrics_pipeline, runner.agent, LogStash::SETTINGS.clone) runner.source_loader.add_source(internal_pipeline_source) rescue => e logger.error("Failed to set up the metrics pipeline", :message => e.message, :backtrace => e.backtrace) raise e end # For versions prior to 6.3 the default value of "xpack.monitoring.enabled" was true # For versions 6.3+ the default of "xpack.monitoring.enabled" is false. # To help keep passivity, assume that if "xpack.monitoring.elasticsearch.hosts" has been set that monitoring should be enabled. # return true if xpack.monitoring.allow_legacy_collection=true and xpack.monitoring.enabled=true (explicitly) or xpack.monitoring.elasticsearch.hosts is configured def monitoring_enabled?(settings) log_warn_if_legacy_is_enabled_and_not_allowed(settings) return false unless settings.get_value("xpack.monitoring.allow_legacy_collection") return settings.get_value("monitoring.enabled") if settings.set?("monitoring.enabled") return settings.get_value("xpack.monitoring.enabled") if settings.set?("xpack.monitoring.enabled") if settings.set?("xpack.monitoring.elasticsearch.hosts") || settings.set?("xpack.monitoring.elasticsearch.cloud_id") logger.warn("xpack.monitoring.enabled has not been defined, but found elasticsearch configuration. Please explicitly set `xpack.monitoring.enabled: true` in logstash.yml") true else default = settings.get_default("xpack.monitoring.enabled") logger.trace("xpack.monitoring.enabled has not been defined, defaulting to default value: " + default.to_s) default # false as of 6.3 end end def log_warn_if_legacy_is_enabled_and_not_allowed(settings) allowed = settings.get_value("xpack.monitoring.allow_legacy_collection") legacy_monitoring_enabled = (settings.get_value("xpack.monitoring.enabled") || settings.get_value("monitoring.enabled")) if !allowed && legacy_monitoring_enabled logger.warn("You have enabled legacy internal monitoring. However, starting from version 9.0, this feature is deactivated and behind a feature flag. Set `xpack.monitoring.allow_legacy_collection` to `true` to allow access to the feature.") end end private :log_warn_if_legacy_is_enabled_and_not_allowed def setup_metrics_pipeline settings = LogStash::SETTINGS.clone # reset settings for the metrics pipeline settings.get_setting("path.config").reset settings.set("pipeline.id", PIPELINE_ID) settings.set("config.reload.automatic", false) settings.set("metric.collect", false) settings.set("queue.type", "memory") settings.set("pipeline.workers", 1) # this is a low throughput pipeline settings.set("pipeline.batch.size", 2) settings.set("pipeline.system", true) config = generate_pipeline_config(settings) logger.debug("compiled metrics pipeline config: ", :config => config) config_part = org.logstash.common.SourceWithMetadata.new("x-pack-metrics", "internal_pipeline_source", config) Java::OrgLogstashConfigIr::PipelineConfig.new(self.class, PIPELINE_ID.to_sym, [config_part], settings) end def generate_pipeline_config(settings) if settings.set?("xpack.monitoring.enabled") && settings.set?("monitoring.enabled") raise ArgumentError.new("\"xpack.monitoring.enabled\" is configured while also \"monitoring.enabled\"") end if any_set?(settings, /^xpack.monitoring/, "xpack.monitoring.allow_legacy_collection") && any_set?(settings, /^monitoring./) raise ArgumentError.new("\"xpack.monitoring.*\" settings can't be configured while using \"monitoring.*\"") end if MonitoringExtension.use_direct_shipping?(settings) opt = retrieve_collection_settings(settings) else opt = retrieve_collection_settings(settings, "xpack.") end es_settings = es_options_from_settings('monitoring', settings) data = TemplateData.new(LogStash::SETTINGS.get("node.uuid"), API_VERSION, es_settings, opt[:collection_interval], opt[:collection_timeout_interval], opt[:extended_performance_collection], opt[:config_collection]) template_path = ::File.join(::File.dirname(__FILE__), "..", "template.cfg.erb") template = ::File.read(template_path) ERB.new(template).result(data.get_binding) end private def retrieve_collection_settings(settings, prefix = "") opt = {} opt[:collection_interval] = settings.get("#{prefix}monitoring.collection.interval").to_nanos opt[:collection_timeout_interval] = settings.get("#{prefix}monitoring.collection.timeout_interval").to_nanos opt[:extended_performance_collection] = settings.get("#{prefix}monitoring.collection.pipeline.details.enabled") opt[:config_collection] = settings.get("#{prefix}monitoring.collection.config.enabled") opt end def any_set?(settings, regexp, to_avoid = []) !settings.get_subset(regexp).to_hash.keys.select{ |k| !to_avoid.include?(k)}.select { |k| settings.set?(k)}.empty? end end def self.use_direct_shipping?(settings) settings.get("monitoring.enabled") end public def initialize # nothing to do here end def register_hooks(hooks) logger.trace("registering hook") hooks.register_hooks(LogStash::Runner, PipelineRegisterHook.new) end def additionals_settings(settings) logger.trace("registering additionals_settings") register_monitoring_settings(settings, "xpack.") # (Experimental) Direct shipping settings register_monitoring_settings(settings) settings.register(LogStash::Setting::SettingString.new("node.uuid", "")) rescue => e logger.error e.message logger.error e.backtrace.to_s raise e end private def register_monitoring_settings(settings, prefix = "") if prefix == "xpack." settings.register(LogStash::Setting::Boolean.new("xpack.monitoring.allow_legacy_collection", false)) end settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.enabled", false)) settings.register(LogStash::Setting::ArrayCoercible.new("#{prefix}monitoring.elasticsearch.hosts", String, ["http://localhost:9200"])) settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.interval", "10s")) settings.register(LogStash::Setting::TimeValue.new("#{prefix}monitoring.collection.timeout_interval", "10m")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.username", "logstash_system")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.password")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.proxy")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.cloud_id")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.cloud_auth")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.api_key")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.certificate_authority")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.ca_trusted_fingerprint")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.path")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.truststore.password")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.path")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.keystore.password")) settings.register(LogStash::Setting::SettingString.new("#{prefix}monitoring.elasticsearch.ssl.verification_mode", "full", true, ["none", "certificate", "full"])) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.certificate")) settings.register(LogStash::Setting::SettingNullableString.new("#{prefix}monitoring.elasticsearch.ssl.key")) settings.register(LogStash::Setting::ArrayCoercible.new("#{prefix}monitoring.elasticsearch.ssl.cipher_suites", String, [])) settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.elasticsearch.sniffing", false)) settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.pipeline.details.enabled", true)) settings.register(LogStash::Setting::Boolean.new("#{prefix}monitoring.collection.config.enabled", true)) end end end