# frozen_string_literal: true

module Aws
  module Record
    # +Aws::Record::TableConfig+ provides a DSL for describing and modifying
    # the remote configuration of your DynamoDB tables. A table configuration
    # object can perform intelligent comparisons and incremental migrations
    # versus the current remote configuration, if the table exists, or do a full
    # create if it does not. In this manner, table configuration becomes fully
    # declarative.
    #
    # @example A basic model with configuration.
    #   class Model
    #     include Aws::Record
    #     string_attr :uuid, hash_key: true
    #   end
    #
    #   table_config = Aws::Record::TableConfig.define do |t|
    #     t.model_class Model
    #     t.read_capacity_units 10
    #     t.write_capacity_units 5
    #   end
    #
    # @example A basic model with pay per request billing.
    #   class Model
    #     include Aws::Record
    #     string_attr :uuid, hash_key: true
    #   end
    #
    #   table_config = Aws::Record::TableConfig.define do |t|
    #     t.model_class Model
    #     t.billing_mode "PAY_PER_REQUEST"
    #   end
    #
    # @example Running a conditional migration on a basic model.
    #   table_config = Aws::Record::TableConfig.define do |t|
    #     t.model_class Model
    #     t.read_capacity_units 10
    #     t.write_capacity_units 5
    #   end
    #
    #   table_config.migrate! unless table_config.compatible?
    #
    # @example A model with a global secondary index.
    #   class Forum
    #     include Aws::Record
    #     string_attr     :forum_uuid, hash_key: true
    #     integer_attr    :post_id,    range_key: true
    #     string_attr     :post_title
    #     string_attr     :post_body
    #     string_attr     :author_username
    #     datetime_attr   :created_date
    #     datetime_attr   :updated_date
    #     string_set_attr :tags
    #     map_attr        :metadata, default_value: {}
    #
    #     global_secondary_index(
    #       :title,
    #       hash_key:  :forum_uuid,
    #       range_key: :post_title,
    #       projection: {
    #         projection_type: "ALL"
    #       }
    #     )
    #   end
    #
    #   table_config = Aws::Record::TableConfig.define do |t|
    #     t.model_class Forum
    #     t.read_capacity_units 10
    #     t.write_capacity_units 5
    #
    #     t.global_secondary_index(:title) do |i|
    #       i.read_capacity_units 5
    #       i.write_capacity_units 5
    #     end
    #   end
    #
    # @example A model with a Time to Live attribute
    #   class ExpiringTokens
    #     string_attr :token_uuid, hash_key: true
    #     epoch_time_attr :ttl
    #   end
    #
    #   table_config = Aws::Record::TableConfig.define do |t|
    #     t.model_class ExpiringTokens
    #     t.read_capacity_units 10
    #     t.write_capacity_units 1
    #     t.ttl_attribute :ttl
    #   end
    #
    class TableConfig
      attr_accessor :client

      class << self
        # Creates a new table configuration, using a DSL in the provided block.
        # The DSL has the following methods:
        # * +#model_class+ A class name reference to the +Aws::Record+ model
        #   class.
        # * +#read_capacity_units+ Sets the read capacity units for the table.
        # * +#write_capacity_units+ Sets the write capacity units for the table.
        # * +#global_secondary_index(index_symbol, &block)+ Defines a global
        #   secondary index with capacity attributes in a block:
        #   * +#read_capacity_units+ Sets the read capacity units for the
        #     index.
        #   * +#write_capacity_units+ Sets the write capacity units for the
        #     index.
        # * +#ttl_attribute+ Sets the attribute ID to be used as the TTL
        #   attribute, and if present, TTL will be enabled for the table.
        # * +#billing_mode+ Sets the billing mode, with the current supported
        #   options being "PROVISIONED" and "PAY_PER_REQUEST". If using
        #   "PAY_PER_REQUEST" you must not set provisioned throughput values,
        #   and if using "PROVISIONED" you must set provisioned throughput
        #   values. Default assumption is "PROVISIONED".
        #
        # @example Defining a migration with a GSI.
        #   class Forum
        #     include Aws::Record
        #     string_attr     :forum_uuid, hash_key: true
        #     integer_attr    :post_id,    range_key: true
        #     string_attr     :post_title
        #     string_attr     :post_body
        #     string_attr     :author_username
        #     datetime_attr   :created_date
        #     datetime_attr   :updated_date
        #     string_set_attr :tags
        #     map_attr        :metadata, default_value: {}
        #
        #     global_secondary_index(
        #       :title,
        #       hash_key:  :forum_uuid,
        #       range_key: :post_title,
        #       projection_type: "ALL"
        #     )
        #   end
        #
        #   table_config = Aws::Record::TableConfig.define do |t|
        #     t.model_class Forum
        #     t.read_capacity_units 10
        #     t.write_capacity_units 5
        #
        #     t.global_secondary_index(:title) do |i|
        #       i.read_capacity_units 5
        #       i.write_capacity_units 5
        #     end
        #   end
        def define(&block)
          cfg = TableConfig.new
          cfg.instance_eval(&block)
          cfg.configure_client
          cfg
        end
      end

      # @api private
      def initialize
        @client_options = {}
        @global_secondary_indexes = {}
        @billing_mode = 'PROVISIONED' # default
      end

      # @api private
      def model_class(model)
        @model_class = model
      end

      # @api private
      def read_capacity_units(units)
        @read_capacity_units = units
      end

      # @api private
      def write_capacity_units(units)
        @write_capacity_units = units
      end

      # @api private
      def global_secondary_index(name, &block)
        gsi = GlobalSecondaryIndex.new
        gsi.instance_eval(&block) if block_given?
        @global_secondary_indexes[name] = gsi
      end

      # @api private
      def client_options(opts)
        @client_options = opts
      end

      # @api private
      def configure_client
        @client = Aws::DynamoDB::Client.new(@client_options)
        @client.config.user_agent_frameworks << 'aws-record'
      end

      # @api private
      def ttl_attribute(attribute_symbol)
        attribute = @model_class.attributes.attribute_for(attribute_symbol)
        raise ArgumentError, "Invalid attribute #{attribute_symbol} for #{@model_class}" unless attribute

        @ttl_attribute = attribute.database_name
      end

      # @api private
      def billing_mode(mode)
        @billing_mode = mode
      end

      # Performs a migration, if needed, against the remote table. If
      # +#compatible?+ would return true, the remote table already has the same
      # throughput, key schema, attribute definitions, and global secondary
      # indexes, so no further API calls are made. Otherwise, a DynamoDB table
      # will be created or updated to match your declared configuration.
      def migrate!
        _validate_required_configuration
        begin
          resp = @client.describe_table(table_name: @model_class.table_name)
          if _compatible_check(resp)
            nil
          else
            # Gotcha: You need separate migrations for indexes and throughput
            unless _throughput_equal(resp)
              @client.update_table(_update_throughput_opts(resp))
              @client.wait_until(
                :table_exists,
                table_name: @model_class.table_name
              )
            end
            unless _gsi_superset(resp)
              @client.update_table(_update_index_opts(resp))
              @client.wait_until(
                :table_exists,
                table_name: @model_class.table_name
              )
            end
          end
        rescue DynamoDB::Errors::ResourceNotFoundException
          # Code Smell: Exception as control flow.
          # Can I use SDK ability to skip raising an exception for this?
          @client.create_table(_create_table_opts)
          @client.wait_until(:table_exists, table_name: @model_class.table_name)
        end
        # At this stage, we have a table and need to check for after-effects to
        # apply.
        # First up is TTL attribute. Since this migration is not exact match,
        # we will only alter TTL status if we have a TTL attribute defined. We
        # may someday support explicit TTL deletion, but we do not yet do this.
        return unless @ttl_attribute
        return if _ttl_compatibility_check

        client.update_time_to_live(
          table_name: @model_class.table_name,
          time_to_live_specification: {
            enabled: true,
            attribute_name: @ttl_attribute
          }
        )
        # Else TTL is compatible and we are done.
        # Else our work is done.
      end

      # Checks the remote table for compatibility. Similar to +#exact_match?+,
      # this will return +false+ if the remote table does not exist. It also
      # checks the keys, declared global secondary indexes, declared attribute
      # definitions, and throughput for exact matches. However, if the remote
      # end has additional attribute definitions and global secondary indexes
      # not defined in your config, will still return +true+. This allows for a
      # check that is friendly to single table inheritance use cases.
      #
      # @return [Boolean] true if remote is compatible, false otherwise.
      def compatible?
        resp = @client.describe_table(table_name: @model_class.table_name)
        _compatible_check(resp) && _ttl_compatibility_check
      rescue DynamoDB::Errors::ResourceNotFoundException
        false
      end

      # Checks against the remote table's configuration. If the remote table
      # does not exist, guaranteed +false+. Otherwise, will check if the remote
      # throughput, keys, attribute definitions, and global secondary indexes
      # are exactly equal to your declared configuration.
      #
      # @return [Boolean] true if remote is an exact match, false otherwise.
      def exact_match?
        resp = @client.describe_table(table_name: @model_class.table_name)
        _throughput_equal(resp) &&
          _keys_equal(resp) &&
          _ad_equal(resp) &&
          _gsi_equal(resp) &&
          _ttl_match_check
      rescue DynamoDB::Errors::ResourceNotFoundException
        false
      end

      private

      def _ttl_compatibility_check
        if @ttl_attribute
          ttl_status = @client.describe_time_to_live(
            table_name: @model_class.table_name
          )
          desc = ttl_status.time_to_live_description
          %w[ENABLED ENABLING].include?(desc.time_to_live_status) &&
            desc.attribute_name == @ttl_attribute
        else
          true
        end
      end

      def _ttl_match_check
        ttl_status = @client.describe_time_to_live(
          table_name: @model_class.table_name
        )
        desc = ttl_status.time_to_live_description
        if @ttl_attribute
          %w[ENABLED ENABLING].include?(desc.time_to_live_status) &&
            desc.attribute_name == @ttl_attribute
        else
          !%w[ENABLED ENABLING].include?(desc.time_to_live_status) ||
            desc.attribute_name.nil?
        end
      end

      def _compatible_check(resp)
        _throughput_equal(resp) &&
          _keys_equal(resp) &&
          _ad_superset(resp) &&
          _gsi_superset(resp)
      end

      def _create_table_opts
        opts = {
          table_name: @model_class.table_name
        }
        if @billing_mode == 'PROVISIONED'
          opts[:provisioned_throughput] = {
            read_capacity_units: @read_capacity_units,
            write_capacity_units: @write_capacity_units
          }
        elsif @billing_mode == 'PAY_PER_REQUEST'
          opts[:billing_mode] = @billing_mode
        else
          raise ArgumentError, "Unsupported billing mode #{@billing_mode}"
        end
        opts[:key_schema] = _key_schema
        opts[:attribute_definitions] = _attribute_definitions
        gsi = _global_secondary_indexes
        opts[:global_secondary_indexes] = gsi unless gsi.empty?
        opts
      end

      def _add_global_secondary_index_throughput(opts, resp_gsis)
        gsis = resp_gsis.map(&:index_name)
        gsi_updates = []
        gsis.each do |index_name|
          lgsi = @global_secondary_indexes[index_name.to_sym]
          gsi_updates << {
            update: {
              index_name: index_name,
              provisioned_throughput: lgsi.provisioned_throughput
            }
          }
        end
        opts[:global_secondary_index_updates] = gsi_updates
        true
      end

      def _update_throughput_opts(resp)
        if @billing_mode == 'PROVISIONED'
          opts = {
            table_name: @model_class.table_name,
            provisioned_throughput: {
              read_capacity_units: @read_capacity_units,
              write_capacity_units: @write_capacity_units
            }
          }
          # special case: we have global secondary indexes existing, and they
          # need provisioned capacity to be set within this call
          if !resp.table.billing_mode_summary.nil? &&
             resp.table.billing_mode_summary.billing_mode == 'PAY_PER_REQUEST'
            opts[:billing_mode] = @billing_mode
            if resp.table.global_secondary_indexes
              resp_gsis = resp.table.global_secondary_indexes
              _add_global_secondary_index_throughput(opts, resp_gsis)
            end
          end
          opts
        elsif @billing_mode == 'PAY_PER_REQUEST'
          {
            table_name: @model_class.table_name,
            billing_mode: 'PAY_PER_REQUEST'
          }
        else
          raise ArgumentError, "Unsupported billing mode #{@billing_mode}"
        end
      end

      def _update_index_opts(resp)
        gsi_updates, attribute_definitions = _gsi_updates(resp)
        opts = {
          table_name: @model_class.table_name,
          global_secondary_index_updates: gsi_updates
        }
        opts[:attribute_definitions] = attribute_definitions unless attribute_definitions.empty?
        opts
      end

      def _gsi_updates(resp)
        gsi_updates = []
        attributes_referenced = Set.new
        remote_gsis = resp.table.global_secondary_indexes
        local_gsis = _global_secondary_indexes
        remote_idx, local_idx = _gsi_index_names(remote_gsis, local_gsis)
        create_candidates = local_idx - remote_idx
        update_candidates = local_idx.intersection(remote_idx)
        create_candidates.each do |index_name|
          gsi = @model_class.global_secondary_indexes_for_migration.find do |i|
            i[:index_name].to_s == index_name
          end
          gsi[:key_schema].each do |k|
            attributes_referenced.add(k[:attribute_name])
          end
          if @billing_mode == 'PROVISIONED'
            lgsi = @global_secondary_indexes[index_name.to_sym]
            gsi[:provisioned_throughput] = lgsi.provisioned_throughput
          end
          gsi_updates << {
            create: gsi
          }
        end
        # we don't currently update anything other than throughput
        if @billing_mode == 'PROVISIONED'
          update_candidates.each do |index_name|
            lgsi = @global_secondary_indexes[index_name.to_sym]
            gsi_updates << {
              update: {
                index_name: index_name,
                provisioned_throughput: lgsi.provisioned_throughput
              }
            }
          end
        end
        attribute_definitions = _attribute_definitions
        incremental_attributes = attributes_referenced.map do |attr_name|
          attribute_definitions.find do |ad|
            ad[:attribute_name] == attr_name
          end
        end
        [gsi_updates, incremental_attributes]
      end

      def _key_schema
        _keys.map do |type, attr|
          {
            attribute_name: attr.database_name,
            key_type: type == :hash ? 'HASH' : 'RANGE'
          }
        end
      end

      def _attribute_definitions
        attribute_definitions = _keys.map do |_type, attr|
          {
            attribute_name: attr.database_name,
            attribute_type: attr.dynamodb_type
          }
        end
        @model_class.global_secondary_indexes.each_value do |attributes|
          gsi_keys = [attributes[:hash_key]]
          gsi_keys << attributes[:range_key] if attributes[:range_key]
          gsi_keys.each do |name|
            attribute = @model_class.attributes.attribute_for(name)
            exists = attribute_definitions.any? do |ad|
              ad[:attribute_name] == attribute.database_name
            end
            next if exists

            attribute_definitions << {
              attribute_name: attribute.database_name,
              attribute_type: attribute.dynamodb_type
            }
          end
        end
        attribute_definitions
      end

      def _keys
        @model_class.keys.each_with_object({}) do |(type, name), acc|
          acc[type] = @model_class.attributes.attribute_for(name)
          acc
        end
      end

      def _throughput_equal(resp)
        if @billing_mode == 'PAY_PER_REQUEST'
          !resp.table.billing_mode_summary.nil? &&
            resp.table.billing_mode_summary.billing_mode == 'PAY_PER_REQUEST'
        else
          expected = resp.table.provisioned_throughput.to_h
          actual = {
            read_capacity_units: @read_capacity_units,
            write_capacity_units: @write_capacity_units
          }
          actual.all? do |k, v|
            expected[k] == v
          end
        end
      end

      def _keys_equal(resp)
        remote_key_schema = resp.table.key_schema.map(&:to_h)
        _array_unsorted_eql(remote_key_schema, _key_schema)
      end

      def _ad_equal(resp)
        remote_ad = resp.table.attribute_definitions.map(&:to_h)
        _array_unsorted_eql(remote_ad, _attribute_definitions)
      end

      def _ad_superset(resp)
        remote_ad = resp.table.attribute_definitions.map(&:to_h)
        _attribute_definitions.all? do |attribute_definition|
          remote_ad.include?(attribute_definition)
        end
      end

      def _gsi_superset(resp)
        remote_gsis = resp.table.global_secondary_indexes
        local_gsis = _global_secondary_indexes
        remote_idx, local_idx = _gsi_index_names(remote_gsis, local_gsis)
        if local_idx.subset?(remote_idx)
          _gsi_set_compare(remote_gsis, local_gsis)
        else
          # If we have any local indexes not on the remote table,
          # guaranteed false.
          false
        end
      end

      def _gsi_equal(resp)
        remote_gsis = resp.table.global_secondary_indexes
        local_gsis = _global_secondary_indexes
        remote_idx, local_idx = _gsi_index_names(remote_gsis, local_gsis)
        if local_idx == remote_idx
          _gsi_set_compare(remote_gsis, local_gsis)
        else
          false
        end
      end

      def _gsi_set_compare(remote_gsis, local_gsis)
        local_gsis.all? do |lgsi|
          rgsi = remote_gsis.find do |r|
            r.index_name == lgsi[:index_name].to_s
          end

          remote_key_schema = rgsi.key_schema.map(&:to_h)
          ks_match = _array_unsorted_eql(remote_key_schema, lgsi[:key_schema])

          # Throughput Check: Dependent on Billing Mode
          rpt = rgsi.provisioned_throughput.to_h
          lpt = lgsi[:provisioned_throughput]
          if @billing_mode == 'PROVISIONED'
            pt_match = lpt.all? do |k, v|
              rpt[k] == v
            end
          elsif @billing_mode == 'PAY_PER_REQUEST'
            pt_match = lpt.nil?
          else
            raise ArgumentError, "Unsupported billing mode #{@billing_mode}"
          end

          rp = rgsi.projection.to_h
          lp = lgsi[:projection]
          rp[:non_key_attributes]&.sort!
          lp[:non_key_attributes]&.sort!
          p_match = rp == lp

          ks_match && pt_match && p_match
        end
      end

      def _gsi_index_names(remote, local)
        remote_index_names = Set.new
        local_index_names = Set.new
        remote&.each do |gsi|
          remote_index_names.add(gsi.index_name)
        end
        local&.each do |gsi|
          local_index_names.add(gsi[:index_name].to_s)
        end
        [remote_index_names, local_index_names]
      end

      def _global_secondary_indexes
        gsis = []
        model_gsis = @model_class.global_secondary_indexes_for_migration
        gsi_config = @global_secondary_indexes
        model_gsis&.each do |mgsi|
          config = gsi_config[mgsi[:index_name]]
          gsis << if @billing_mode == 'PROVISIONED'
                    mgsi.merge(
                      provisioned_throughput: config.provisioned_throughput
                    )
                  else
                    mgsi
                  end
        end
        gsis
      end

      def _array_unsorted_eql(a, b)
        a.all? { |x| b.include?(x) } && b.all? { |x| a.include?(x) }
      end

      def _validate_required_configuration
        missing_config = []
        missing_config << 'model_class' unless @model_class
        if @billing_mode == 'PROVISIONED'
          missing_config << 'read_capacity_units' unless @read_capacity_units
          missing_config << 'write_capacity_units' unless @write_capacity_units
        elsif @read_capacity_units || @write_capacity_units
          raise ArgumentError, "Cannot have billing mode #{@billing_mode} with provisioned capacity."
        end
        return if missing_config.empty?

        msg = missing_config.join(', ')
        raise Errors::MissingRequiredConfiguration, "Missing: #{msg}"
      end

      # @api private
      class GlobalSecondaryIndex
        attr_reader :provisioned_throughput

        def initialize
          @provisioned_throughput = {}
        end

        def read_capacity_units(units)
          @provisioned_throughput[:read_capacity_units] = units
        end

        def write_capacity_units(units)
          @provisioned_throughput[:write_capacity_units] = units
        end
      end
    end
  end
end
