x-pack/lib/geoip_database_management/manager.rb (241 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. require_relative 'constants' require_relative 'util' require_relative 'data_path' require_relative 'metadata' require_relative 'downloader' require_relative 'subscription' require_relative 'db_info' require_relative 'metric' require "logstash/util/loggable" require "singleton" require "concurrent/set" ## # The GeoipDatabaseManagement's Manager can be used by plugins to acquire # a subscription to an auto-updating, EULA-compliant Geoip database. # The Manager is lazy by default, and begins maintaining local databases # on disk when the first subscription is started. # # Once started, it queries an Elastic database service daily to discover # available updates, loading discovered updates in the background and notifying # all subscribers before eventually removing databases from disk that are no # longer assigned to any subscribers. # # The terms of the MaxMind EULA are enforced, ensuring that databases that # have not been synchronized with the service in more than 30 days are not # assigned to subscribers so they can be freed. After 25 days without sync # the manager begins emitting warning messages. # # The provided Metric namespace is populated with information about the # current state of managed databases, age-since-sync, etc. # # @example Subscribe to a database # sub = Manager.instance.subscribe_database_path("City") # sub.observe(construct: -> (db_info) { ... }, # on_update: -> (db_info) { ... }, # on_expire: -> ( ) { ... }) # sub.release! # unsubscribe module LogStash module GeoipDatabaseManagement class Manager include Constants include Util include LogStash::Util::Loggable include Singleton def initialize @start_lock = Mutex.new @enabled = LogStash::SETTINGS.get("xpack.geoip.downloader.enabled") @endpoint = LogStash::SETTINGS.get("xpack.geoip.downloader.endpoint") @poll_interval = LogStash::SETTINGS.get("xpack.geoip.downloader.poll.interval") data_directory = ::File.expand_path("geoip_database_management", LogStash::SETTINGS.get_value('path.data')) @data_path = GeoipDatabaseManagement::DataPath.new(data_directory) @metadata = Metadata.new(data_path) unless enabled? if @metadata.exist? logger.info("database manager is disabled; removing managed databases from disk``") metadata.delete clean_up_database else logger.info("database manager is disabled.") end end end ## # @param database_type [String] one of `GeoipDatabaseManagement::DB_TYPES` # @return [Subscription] the observer def subscribe_database_path(database_type) fail ArgumentError, "unsupported database type `#{database_type}`" unless DB_TYPES.include?(database_type) return nil unless enabled? ensure_started! @states.fetch(database_type).subscribe end ## # @return [Boolean] true unless the database management feature has been disabled def enabled? @enabled end ## # @return [Enumerable<String>] the types of databases that can be subscribed to def supported_database_types DB_TYPES end ## # @api internal def database_metric=(database_metric) @database_metric = database_metric end ## # @api internal def running? @start_lock.synchronize { @download_task&.running? } end protected attr_reader :endpoint attr_reader :poll_interval attr_reader :data_path attr_reader :metadata def database_metric logger.debug("Database Metric is nil. No geoip metrics are available. Please report the bug") if @database_metric.nil? @database_metric ||= LogStash::GeoipDatabaseManagement::Metric.new(LogStash::Instrument::NamespacedNullMetric.new) end def downloader @downloader ||= Downloader.new(metadata, endpoint) end def ensure_started! @start_lock.synchronize do return if @download_task setup execute_download_job logger.debug "spawning periodic check for updates (#{poll_interval})" @download_task = Concurrent::TimerTask.execute(execution_interval: poll_interval.to_seconds) do periodic_sync end end end def periodic_sync LogStash::Util::set_thread_name 'geoip database sync task' do LogStash::Util::with_logging_thread_context("pipeline.id" => nil) do logger.debug "running database update check" execute_download_job end end end def clean_up_database(excluded_dirnames = []) return unless ::Dir.exist?(data_path.root) protected_dirnames = excluded_dirnames.uniq existing_dirnames = ::Dir.children(data_path.root) .select { |f| ::File.directory? ::File.join(data_path.root, f) } (existing_dirnames - protected_dirnames).each do |dirname| dir_path = data_path.resolve(dirname) FileUtils.rm_r(dir_path) logger.info("Stale database directory `#{dir_path}` has been deleted") end end def setup FileUtils.mkdir_p(data_path.root) metadata.touch @states = DB_TYPES.each_with_object({}) do |type, memo| db_info = if metadata.has_type?(type) DbInfo.new(path: metadata.database_path(type)) else DbInfo::PENDING end memo[type] = State.new(db_info) end database_metric.initialize_metrics(metadata.get_all) end def execute_download_job success_cnt = 0 database_metric.set_download_status_updating updated_db = downloader.fetch_databases(DB_TYPES) updated_db.each do |database_type, valid_download, dirname, new_database_path| if valid_download metadata.save_metadata(database_type, dirname, gz_md5: md5(data_path.gz(database_type, dirname))) @states[database_type].update!(new_database_path) do |previous_db_info| logger.info("managed geoip database has been updated on disk", :database_type => database_type, :database_path => new_database_path) end success_cnt += 1 end end updated_types = updated_db.map { |database_type, valid_download, dirname, new_database_path| database_type } (DB_TYPES - updated_types).each do |unchange_type| metadata.update_timestamp(unchange_type) success_cnt += 1 end rescue => e logger.error("failed to sync databases", error_details(e, logger)) ensure check_age clean_up_database(metadata.dirnames) database_metric.update_download_stats(success_cnt == DB_TYPES.size) end def check_age(database_types = DB_TYPES) deferred_deletions = [] database_types.map do |database_type| db_metadata = metadata.get_metadata(database_type).last if db_metadata.nil? logger.debug("No metadata for #{database_type}...") next end check_at = db_metadata[Metadata::Column::CHECK_AT].to_i days_without_update = time_diff_in_days(check_at) case when days_without_update >= 30 @states[database_type].expire! do |db_info| logger.error("The managed MaxMind GeoIP #{database_type} database hasn't been synchronized in #{days_without_update} days "\ "and #{db_info.expired? ? "has been" : "will be"} removed in order to remain compliant with the MaxMind EULA. "\ "Logstash is unable to get newer version from internet. "\ "Please check the network settings and allow Logstash accesses the internet to download the latest database. "\ "Alternatively you can switch to a self-managed GeoIP database service (`xpack.geoip.download.endpoint`), or "\ "configure each plugin with a self-managed database which you can download from https://dev.maxmind.com/geoip/geoip2/geolite2/ ") end deferred_deletions << metadata.database_path(database_type) metadata.unset_path(database_type) database_status = Metric::DATABASE_EXPIRED when days_without_update >= 25 logger.warn("The MaxMind GeoIP #{database_type} database hasn't been synchronized in #{days_without_update} days. "\ "Logstash will remove access to the stale database in #{30 - days_without_update} days in order to remain compliant with the MaxMind EULA. "\ "Please check the network settings and allow Logstash accesses the internet to download the latest database.") database_status = Metric::DATABASE_TO_BE_EXPIRED else logger.trace("The MaxMind GeoIP #{database_type} database passed age check", :days_without_update => days_without_update) database_status = Metric::DATABASE_UP_TO_DATE end database_metric.update_database_status(database_type, database_status, db_metadata, days_without_update) end ensure deferred_deletions.compact.each do |path| FileUtils.rm(path, force: true) logger.debug("Removed database file `#{path}`") end end ## # @api testing def shutdown! @start_lock.synchronize do return unless @download_task&.running? @download_task.shutdown 10.times do break unless @download_task.running? sleep 1 end @states.values.each(&:delete_observers) end end private :shutdown! ## # @api testing def current_db_info(database_type) current_state(database_type)&.db_info end private :current_db_info ## # @api testing def current_state(database_type) @states&.dig(database_type) end private :current_state ## # @api private class State attr_reader :db_info require 'observer' # ruby stdlib include Observable def initialize(db_info) @db_info = db_info end ## # @api internal def subscribe synchronize do subscription = Subscription.new(@db_info, self) add_observer(subscription, :notify) subscription end end def unsubscribe(observer) synchronize do delete_observer(observer) end end ## # @param new_database_path [String] # @yieldparam previous_db_info [DbInfo] # @yieldreturn [void] def update!(new_database_path) synchronize do previous_db_info, @db_info = @db_info, DbInfo.new(path: new_database_path) changed yield(previous_db_info) if block_given? notify_observers(@db_info) end end ## # @yieldparam previous_path [String] # @yieldparam was_expired [Boolean] # @yieldreturn [void] def expire! synchronize do previous_db_info, @db_info = @db_info, DbInfo::EXPIRED changed yield(previous_db_info) if block_given? notify_observers(@db_info) end end ## # @api internal def release!(subscription) synchronize do delete_observer(subscription) end end private def synchronize(&block) LogStash::Util.synchronize(self) do yield end end end end; end; end