lib/connectors/mongodb/connector.rb (187 lines of code) (raw):

# # Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one # or more contributor license agreements. Licensed under the Elastic License; # you may not use this file except in compliance with the Elastic License. # # frozen_string_literal: true require 'connectors/base/connector' require 'core/filtering/validation_status' require 'core/filtering/transform/transformation_target' require 'connectors/mongodb/mongo_advanced_snippet_snake_case_transformer' require 'connectors/mongodb/mongo_advanced_snippet_against_schema_validator' require 'mongo' require 'utility' module Connectors module MongoDB class Connector < Connectors::Base::Connector ALLOWED_TOP_LEVEL_FILTER_KEYS = %w[find aggregate] AGGREGATE = 'aggregate' FIND = 'find' PAGE_SIZE = 100 def self.service_type 'mongodb' end def self.display_name 'MongoDB' end def self.configurable_fields { :host => { :label => 'Server Hostname' }, :user => { :label => 'Username' }, :password => { :label => 'Password' }, :database => { :label => 'Database' }, :collection => { :label => 'Collection' }, :direct_connection => { :label => 'Direct connection? (true/false)' } } end def self.advanced_snippet_validators MongoAdvancedSnippetAgainstSchemaValidator end def self.filter_transformers { Core::Filtering::Transform::TransformationTarget::ADVANCED_SNIPPET => [MongoAdvancedSnippetSnakeCaseTransformer] } end def initialize(configuration: {}, job_description: nil) super @host = @configuration.dig(:host, :value) @database = @configuration.dig(:database, :value) @collection = @configuration.dig(:collection, :value) @user = @configuration.dig(:user, :value) @password = @configuration.dig(:password, :value) @direct_connection = @configuration.dig(:direct_connection, :value) end def yield_documents with_client do |client| # We do paging using skip().limit() here to make Ruby recycle the memory for each page pulled from the server after it's not needed any more. # This gives us more control on the usage of the memory (we can adjust PAGE_SIZE constant for that to decrease max memory consumption). # It's done due to the fact that usage of .find.each leads to memory leaks or overuse of memory - the whole result set seems to stay in memory # during the sync. Sometimes (not 100% sure) it even leads to a real leak, when the memory for these objects is never recycled. cursor_type, cursor_with_options = create_db_cursor_on_collection(client[@collection]) cursor, options = cursor_with_options case cursor_type when FIND skip = 0 found_overall = 0 # if no overall limit is specified by filtering use -1 to not break ingestion, when no overall limit is specified (found_overall is only increased, # thus can never reach -1) overall_limit = Float::INFINITY if options.present? # there could be a skip parameter defined for filtering skip = options.fetch(:skip, skip) # there could be a limit parameter defined for filtering -> used for an overall limit (not a page limit, which was introduced for memory optimization) overall_limit = options.fetch(:limit, overall_limit) end overall_limit_reached = false loop do found_in_page = 0 Utility::Logger.info("Requesting #{PAGE_SIZE} documents from MongoDB (Starting at #{skip})") view = cursor.skip(skip).limit(PAGE_SIZE) view.each do |document| yield_with_handling_tolerable_errors do yield serialize(document) found_in_page += 1 found_overall += 1 overall_limit_reached = found_overall >= overall_limit && overall_limit != Float::INFINITY end break if overall_limit_reached end page_was_empty = found_in_page == 0 break if page_was_empty || overall_limit_reached skip += PAGE_SIZE end when AGGREGATE cursor.each do |document| yield_with_handling_tolerable_errors do yield serialize(document) end end else raise "Unknown retrieval function #{cursor_type} for MongoDB." end end end private def create_db_cursor_on_collection(collection) return [AGGREGATE, create_aggregate_cursor(collection)] if @advanced_filter_config[:aggregate].present? return [FIND, create_find_cursor(collection)] if @advanced_filter_config[:find].present? [FIND, collection.find] end def create_aggregate_cursor(collection) aggregate = @advanced_filter_config[:aggregate] pipeline = aggregate[:pipeline] || [] options = extract_options(aggregate) if pipeline.empty? && options.empty? Utility::Logger.warn('\'Aggregate\' was specified with an empty pipeline and empty options.') end [collection.aggregate(pipeline, options), options] end def create_find_cursor(collection) find = @advanced_filter_config[:find] filter = find[:filter] options = extract_options(find) if !filter.nil? && filter.empty? && !options.present? Utility::Logger.warn('\'Find\' was specified with an empty filter and empty options.') end [collection.find(filter, options), options] end def extract_options(mongodb_function) mongodb_function[:options].present? ? mongodb_function[:options] : {} end def do_health_check with_client do |_client| Utility::Logger.debug("Mongo at #{@host}/#{@database} looks healthy.") end end def with_client raise "Invalid value for 'Direct connection' : #{@direct_connection}." unless %w[true false].include?(@direct_connection.to_s.strip.downcase) args = { database: @database, direct_connection: to_boolean(@direct_connection) } if @user.present? || @password.present? args[:user] = @user args[:password] = @password end Mongo::Client.new(@host, args) do |client| databases = client.database_names Utility::Logger.debug("Existing Databases: #{databases}") check_database_exists!(databases, @database) collections = client.database.collection_names Utility::Logger.debug("Existing Collections: #{collections}") check_collection_exists!(collections, @database, @collection) yield client end end def check_database_exists!(databases, database) return if databases.include?(database) raise "Database (#{database}) does not exist. Existing databases: #{databases.join(', ')}" end def check_collection_exists!(collections, database, collection) return if collections.include?(collection) raise "Collection (#{collection}) does not exist within database '#{database}'. Existing collections: #{collections.join(', ')}" end def serialize(mongodb_document) # This is some lazy serialization here. # Problem: MongoDB has its own format of things - e.g. ids are Bson::ObjectId, which when serialized to JSON # will produce something like: 'id': { '$oid': '536268a06d2d7019ba000000' }, which is not good for us case mongodb_document when BSON::ObjectId mongodb_document.to_s when BSON::Decimal128 mongodb_document.to_big_decimal # potential problems with NaNs but also will get treated as a string by Elasticsearch anyway when String # it's here cause Strings are Arrays too :/ mongodb_document.to_s when Array mongodb_document.map { |v| serialize(v) } when Hash mongodb_document.map do |key, value| key = 'id' if key == '_id' remapped_value = serialize(value) [key, remapped_value] end.to_h else mongodb_document end end def to_boolean(value) value == true || value =~ (/(true|t|yes|y|1)$/i) end end end end