#
# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.
#

# frozen_string_literal: true

require 'connectors/base/connector'
require 'core/filtering/validation_status'
require 'core/filtering/transform/transformation_target'
require 'connectors/mongodb/mongo_advanced_snippet_snake_case_transformer'
require 'connectors/mongodb/mongo_advanced_snippet_against_schema_validator'
require 'mongo'
require 'utility'

module Connectors
  module MongoDB
    class Connector < Connectors::Base::Connector

      ALLOWED_TOP_LEVEL_FILTER_KEYS = %w[find aggregate]

      AGGREGATE = 'aggregate'
      FIND = 'find'

      PAGE_SIZE = 100

      def self.service_type
        'mongodb'
      end

      def self.display_name
        'MongoDB'
      end

      def self.configurable_fields
        {
          :host => {
            :label => 'Server Hostname'
          },
          :user => {
            :label => 'Username'
          },
          :password => {
            :label => 'Password'
          },
          :database => {
            :label => 'Database'
          },
          :collection => {
            :label => 'Collection'
          },
          :direct_connection => {
            :label => 'Direct connection? (true/false)'
          }
        }
      end

      def self.advanced_snippet_validators
        MongoAdvancedSnippetAgainstSchemaValidator
      end

      def self.filter_transformers
        {
          Core::Filtering::Transform::TransformationTarget::ADVANCED_SNIPPET => [MongoAdvancedSnippetSnakeCaseTransformer]
        }
      end

      def initialize(configuration: {}, job_description: nil)
        super

        @host = @configuration.dig(:host, :value)
        @database = @configuration.dig(:database, :value)
        @collection = @configuration.dig(:collection, :value)
        @user = @configuration.dig(:user, :value)
        @password = @configuration.dig(:password, :value)
        @direct_connection = @configuration.dig(:direct_connection, :value)
      end

      def yield_documents
        with_client do |client|
          # We do paging using skip().limit() here to make Ruby recycle the memory for each page pulled from the server after it's not needed any more.
          # This gives us more control on the usage of the memory (we can adjust PAGE_SIZE constant for that to decrease max memory consumption).
          # It's done due to the fact that usage of .find.each leads to memory leaks or overuse of memory - the whole result set seems to stay in memory
          # during the sync. Sometimes (not 100% sure) it even leads to a real leak, when the memory for these objects is never recycled.
          cursor_type, cursor_with_options = create_db_cursor_on_collection(client[@collection])
          cursor, options = cursor_with_options

          case cursor_type
          when FIND
            skip = 0
            found_overall = 0

            # if no overall limit is specified by filtering use -1 to not break ingestion, when no overall limit is specified (found_overall is only increased,
            # thus can never reach -1)
            overall_limit = Float::INFINITY

            if options.present?
              # there could be a skip parameter defined for filtering
              skip = options.fetch(:skip, skip)
              # there could be a limit parameter defined for filtering -> used for an overall limit (not a page limit, which was introduced for memory optimization)
              overall_limit = options.fetch(:limit, overall_limit)
            end

            overall_limit_reached = false

            loop do
              found_in_page = 0

              Utility::Logger.info("Requesting #{PAGE_SIZE} documents from MongoDB (Starting at #{skip})")
              view = cursor.skip(skip).limit(PAGE_SIZE)
              view.each do |document|
                yield_with_handling_tolerable_errors do
                  yield serialize(document)
                  found_in_page += 1
                  found_overall += 1
                  overall_limit_reached = found_overall >= overall_limit && overall_limit != Float::INFINITY
                end
                break if overall_limit_reached
              end

              page_was_empty = found_in_page == 0

              break if page_was_empty || overall_limit_reached

              skip += PAGE_SIZE
            end
          when AGGREGATE
            cursor.each do |document|
              yield_with_handling_tolerable_errors do
                yield serialize(document)
              end
            end
          else
            raise "Unknown retrieval function #{cursor_type} for MongoDB."
          end
        end
      end

      private

      def create_db_cursor_on_collection(collection)
        return [AGGREGATE, create_aggregate_cursor(collection)] if @advanced_filter_config[:aggregate].present?

        return [FIND, create_find_cursor(collection)] if @advanced_filter_config[:find].present?

        [FIND, collection.find]
      end

      def create_aggregate_cursor(collection)
        aggregate = @advanced_filter_config[:aggregate]

        pipeline = aggregate[:pipeline] || []
        options = extract_options(aggregate)

        if pipeline.empty? && options.empty?
          Utility::Logger.warn('\'Aggregate\' was specified with an empty pipeline and empty options.')
        end

        [collection.aggregate(pipeline, options), options]
      end

      def create_find_cursor(collection)
        find = @advanced_filter_config[:find]

        filter = find[:filter]
        options = extract_options(find)

        if !filter.nil? && filter.empty? && !options.present?
          Utility::Logger.warn('\'Find\' was specified with an empty filter and empty options.')
        end

        [collection.find(filter, options), options]
      end

      def extract_options(mongodb_function)
        mongodb_function[:options].present? ? mongodb_function[:options] : {}
      end

      def do_health_check
        with_client do |_client|
          Utility::Logger.debug("Mongo at #{@host}/#{@database} looks healthy.")
        end
      end

      def with_client
        raise "Invalid value for 'Direct connection' : #{@direct_connection}." unless %w[true false].include?(@direct_connection.to_s.strip.downcase)

        args = {
          database: @database,
          direct_connection: to_boolean(@direct_connection)
        }

        if @user.present? || @password.present?
          args[:user] = @user
          args[:password] = @password
        end

        Mongo::Client.new(@host, args) do |client|
          databases = client.database_names

          Utility::Logger.debug("Existing Databases: #{databases}")
          check_database_exists!(databases, @database)

          collections = client.database.collection_names

          Utility::Logger.debug("Existing Collections: #{collections}")
          check_collection_exists!(collections, @database, @collection)

          yield client
        end
      end

      def check_database_exists!(databases, database)
        return if databases.include?(database)

        raise "Database (#{database}) does not exist. Existing databases: #{databases.join(', ')}"
      end

      def check_collection_exists!(collections, database, collection)
        return if collections.include?(collection)

        raise "Collection (#{collection}) does not exist within database '#{database}'. Existing collections: #{collections.join(', ')}"
      end

      def serialize(mongodb_document)
        # This is some lazy serialization here.
        # Problem: MongoDB has its own format of things - e.g. ids are Bson::ObjectId, which when serialized to JSON
        # will produce something like: 'id': { '$oid': '536268a06d2d7019ba000000' }, which is not good for us
        case mongodb_document
        when BSON::ObjectId
          mongodb_document.to_s
        when BSON::Decimal128
          mongodb_document.to_big_decimal # potential problems with NaNs but also will get treated as a string by Elasticsearch anyway
        when String
          # it's here cause Strings are Arrays too :/
          mongodb_document.to_s
        when Array
          mongodb_document.map { |v| serialize(v) }
        when Hash
          mongodb_document.map do |key, value|
            key = 'id' if key == '_id'
            remapped_value = serialize(value)
            [key, remapped_value]
          end.to_h
        else
          mongodb_document
        end
      end

      def to_boolean(value)
        value == true || value =~ (/(true|t|yes|y|1)$/i)
      end
    end
  end
end
