x-pack/lib/geoip_database_management/downloader.rb (138 lines of code) (raw):

# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. require_relative '../../../lib/bootstrap/util/compress' require 'logstash/util/loggable' require_relative 'util' require_relative 'metadata' require "json" require "zlib" require "stud/try" require "down" require "fileutils" require 'uri' module LogStash module GeoipDatabaseManagement class Downloader include GeoipDatabaseManagement::Constants include GeoipDatabaseManagement::Util include LogStash::Util::Loggable class BadResponseCodeError < Error attr_reader :response_code, :response_body def initialize(response_code, response_body) @response_code = response_code @response_body = response_body end def message "GeoIP service response code '#{response_code}', body '#{response_body}'" end end attr_reader :list_databases_url ## # @param metadata [Metadata] # @param service_endpoint [URI,String] def initialize(metadata, service_endpoint) logger.trace("init", metadata: metadata, endpoint: service_endpoint) @metadata = metadata @paths = metadata.paths service_endpoint = URI(service_endpoint).dup.freeze if service_endpoint.query&.chars&.any? logger.warn("GeoIP endpoint URI includes query parameter, which will be ignored: `#{safe_uri service_endpoint}`") end @list_databases_url = service_endpoint.merge("?key=#{uuid}&elastic_geoip_service_tos=agree").freeze end public # Check available update and download them. Unzip and validate the file. # if the download failed, valid_download return false # return Array of [database_type, valid_download, dirname, new_database_path] def fetch_databases(db_types) dirname = Time.now.to_i.to_s check_update(db_types) .map do |database_type, db_info| begin new_zip_path = download_database(database_type, dirname, db_info) new_database_path = unzip(database_type, dirname, new_zip_path) assert_database!(new_database_path) [database_type, true, dirname, new_database_path] rescue => e logger.error("failed to fetch #{database_type} database", error_details(e, logger)) [database_type, false, nil, nil] end end end private # Call infra endpoint to get md5 of latest databases and verify with metadata # return Array of new database information [database_type, db_info] def check_update(db_types) return enum_for(:check_update, db_types).to_a unless block_given? res = rest_client.get(list_databases_url) logger.debug("check update", :endpoint => safe_uri(list_databases_url).to_s, :response => res.code) if res.code < 200 || res.code > 299 raise BadResponseCodeError.new(res.code, res.body) end service_resp = JSON.parse(res.body) db_types.each do |database_type| db_info = service_resp.find { |db| db['name'].eql?("#{GEOLITE}#{database_type}.#{GZ_EXT}") } if db_info.nil? logger.debug("Database service did not include #{database_type}") elsif @metadata.database_path(database_type).nil? logger.debug("Local #{database_type} database is not present.") yield(database_type, db_info) elsif @metadata.gz_md5(database_type) == db_info['md5_hash'] logger.debug("Local #{database_type} database is up-to-date.") else logger.debug("Updated #{database_type} database is available.") yield(database_type, db_info) end end end def download_database(database_type, dirname, db_info) Stud.try(3.times) do FileUtils.mkdir_p(@paths.resolve(dirname)) zip_path = @paths.gz(database_type, dirname) actual_url = resolve_download_url(db_info['url']).to_s logger.debug? && logger.debug("download #{actual_url}") options = { destination: zip_path } options.merge!({proxy: ENV['http_proxy']}) if ENV.include?('http_proxy') Down.download(actual_url, **options) raise "the new download has wrong checksum" if md5(zip_path) != db_info['md5_hash'] logger.debug("new database downloaded in ", :path => zip_path) zip_path end end # extract all files and folders from .tgz to path.data directory # return dirname [String], new_database_path [String] def unzip(database_type, dirname, zip_path) temp_path = ::File.join(@paths.resolve(dirname), database_type) LogStash::Util::Tar.extract(zip_path, temp_path) FileUtils.cp_r(::File.join(temp_path, '.'), @paths.resolve(dirname)) FileUtils.rm_r(temp_path) @paths.db(database_type, dirname) end def rest_client @client ||= begin client_options = { request_timeout: 15, connect_timeout: 5 } client_options[:proxy] = ENV['http_proxy'] if ENV.include?('http_proxy') Manticore::Client.new(client_options) end end def uuid @uuid ||= ::File.read(::File.join(LogStash::SETTINGS.get("path.data"), "uuid")) rescue "UNSET" end def resolve_download_url(possibly_relative_url) list_databases_url.merge(possibly_relative_url) end def assert_database!(database_path) raise "failed to load database #{database_path} because it does not exist" unless file_exist?(database_path) raise "failed to load database #{database_path} because it does not appear to be a MaxMind DB" unless scan_binary_file(database_path, "\xab\xcd\xefMaxMind.com") end def safe_uri(unsafe) LogStash::Util::SafeURI.new(unsafe) end ## # Scans a binary file for the given verbatim byte sequence # without loading the entire binary file into memory by scanning # in chunks def scan_binary_file(file_path, byte_sequence) byte_sequence = byte_sequence.b partial_size = [byte_sequence.bytesize, 1024].max ::File.open(file_path, 'r:BINARY') do |io| a, b = ''.b, ''.b # two binary buffers until io.eof? io.readpartial(partial_size, b) bridged_chunk = (a+b) return true if bridged_chunk.include?(byte_sequence) a,b = b,a # swap buffers before continuing end end false end end end end