require 'connectors/connector_status'
require 'connectors/sync_status'
require 'core'
require 'utility'

describe Core::SyncJobRunner do
  let(:connector_id) { '123' }
  let(:service_type) { 'foo' }
  let(:request_pipeline) { Core::ConnectorSettings::DEFAULT_REQUEST_PIPELINE }
  let(:connector_status) { Connectors::ConnectorStatus::CONNECTED }
  let(:connector_stored_configuration) do
    # returned from Elasticsearch with values already specified by user
    {
      :lala => {
        :label => 'Lala',
        :value => 'hello'
      }
    }
  end
  let(:connector_default_configuration) do
    # returned from Connector class with default values
    {
      :lala => {
        :label => 'Lala',
        :value => nil
      }
    }
  end

  let(:connector_settings) { double }

  let(:filtering) do
    [
      {
        'domain' => Core::Filtering::DEFAULT_DOMAIN,
        'rules' => [
          Core::Filtering::SimpleRule::DEFAULT_RULE.to_h
        ],
        'advanced_snippet' => {
          'value' => {}
        }
      }
    ]
  end

  let(:job) { double }
  let(:connector_class) { double }
  let(:connector_instance) { double }
  let(:sink) { double }

  let(:output_index_name) { 'test-ingest-index' }
  let(:existing_document_ids) { [] } # ids of documents that are already in the index
  let(:extracted_documents) { [] } # documents returned from 3rd-party system
  let(:connector_metadata) { { :foo => 'bar' } } # metadata returned from connectors

  let(:any_filtering_feature_enabled) { true }
  let(:filtering_rule_feature_enabled) { true }
  let(:filtering_validation_result) {
    {
      :state => Core::Filtering::ValidationStatus::VALID,
      :errors => []
    }
  }

  let(:connector_running) { false }
  let(:job_id) { 'job-123' }
  let(:job_canceling) { false }
  let(:job_in_progress) { true }
  let(:error_message) { nil }

  let(:extract_binary_content) { true }
  let(:reduce_whitespace) { true }
  let(:run_ml_inference) { true }
  let(:total_document_count) { 100 }
  let(:ingestion_stats) do
    {
      :indexed_document_count => 12,
      :deleted_document_count => 234,
      :indexed_document_volume => 0
    }
  end

  let(:max_ingestion_queue_size) { 123 }
  let(:max_ingestion_queue_bytes) { 123456789 }

  subject { described_class.new(connector_settings, job, max_ingestion_queue_size, max_ingestion_queue_bytes) }

  before(:each) do
    allow(Core::ConnectorSettings).to receive(:fetch_by_id).with(connector_id).and_return(connector_settings)

    allow(Core::ConnectorJob).to receive(:fetch_by_id).with(job_id).and_return(job)
    allow(job).to receive(:id).and_return(job_id)
    allow(job).to receive(:make_running!)
    allow(job).to receive(:filtering).and_return(filtering)
    allow(job).to receive(:update_metadata)
    allow(job).to receive(:done!)
    allow(job).to receive(:cancel!)
    allow(job).to receive(:error!)
    allow(job).to receive(:canceling?).and_return(job_canceling)
    allow(job).to receive(:in_progress?).and_return(job_in_progress)
    allow(job).to receive(:index_name).and_return(output_index_name)
    allow(job).to receive(:service_type).and_return(service_type)
    allow(job).to receive(:extract_binary_content?).and_return(extract_binary_content)
    allow(job).to receive(:reduce_whitespace?).and_return(reduce_whitespace)
    allow(job).to receive(:run_ml_inference?).and_return(run_ml_inference)
    allow(job).to receive(:configuration).and_return(connector_stored_configuration)

    allow(Core::ElasticConnectorActions).to receive(:fetch_document_ids).and_return(existing_document_ids)
    allow(Core::ElasticConnectorActions).to receive(:update_connector_status)
    allow(Core::ElasticConnectorActions).to receive(:update_connector_sync_start)

    allow(Connectors::REGISTRY).to receive(:connector_class).and_return(connector_class)
    allow(Core::Ingestion::EsSink).to receive(:new).and_return(sink)
    allow(sink).to receive(:ingest)
    allow(sink).to receive(:delete)
    allow(sink).to receive(:flush)
    allow(sink).to receive(:ingestion_stats).and_return(ingestion_stats)

    allow(connector_settings).to receive(:id).and_return(connector_id)
    allow(connector_settings).to receive(:configuration).and_return(connector_stored_configuration)
    allow(connector_settings).to receive(:request_pipeline).and_return(request_pipeline)
    allow(connector_settings).to receive(:running?).and_return(connector_running)
    allow(connector_settings).to receive(:update_last_sync!)
    allow(connector_settings).to receive(:any_filtering_feature_enabled?).and_return(any_filtering_feature_enabled)
    allow(connector_settings).to receive(:filtering_rule_feature_enabled?).and_return(filtering_rule_feature_enabled)

    allow(connector_class).to receive(:configurable_fields).and_return(connector_default_configuration)
    allow(connector_class).to receive(:validate_filtering).and_return(filtering_validation_result)
    allow(connector_class).to receive(:new).and_return(connector_instance)

    allow(connector_instance).to receive(:metadata).and_return(connector_metadata)
    allow(connector_instance).to receive(:do_health_check!)
    allow_statement = allow(connector_instance).to receive(:yield_documents)
    extracted_documents.each { |document| allow_statement.and_yield(document) }

    # set to a large number to skip job check
    stub_const("#{described_class}::JOB_REPORTING_INTERVAL", 10000)
  end

  describe '#new' do
    let(:bulk_queue) { double }

    before(:each) do
      allow(Utility::BulkQueue).to receive(:new).and_return(bulk_queue)
    end

    it 'passes max_ingestion_queue_size and max_ingestion_queue_bytes to ingestion classes' do
      expect(Utility::BulkQueue).to receive(:new)
        .with(max_ingestion_queue_size, max_ingestion_queue_bytes)

      expect(Core::Ingestion::EsSink).to receive(:new)
        .with(anything, anything, bulk_queue, max_ingestion_queue_bytes)

      described_class.new(connector_settings, job, max_ingestion_queue_size, max_ingestion_queue_bytes)
    end
  end

  describe '#execute' do
    shared_examples_for 'claims the job' do
      it '' do
        expect(Core::ElasticConnectorActions).to receive(:update_connector_sync_start)
        expect(job).to receive(:make_running!)

        subject.execute
      end
    end

    shared_examples_for 'does not run a sync' do
      it '' do
        expect(job).to_not receive(:done!)
        expect(job).to_not receive(:cancel!)
        expect(job).to_not receive(:error!)
        expect(connector_settings).to_not receive(:update_last_sync!).with(job)

        subject.execute
      end
    end

    shared_examples_for 'sync stops with error' do
      it 'stops with error' do
        expect(job).to receive(:error!).with(error_message, ingestion_stats, connector_metadata)
        expect(connector_settings).to receive(:update_last_sync!).with(job)

        subject.execute
      end
    end

    shared_examples_for 'runs a full sync' do
      it 'finishes a sync job' do
        expect(job).to receive(:done!).with(ingestion_stats, connector_metadata)
        expect(connector_settings).to receive(:update_last_sync!).with(job)

        subject.execute
      end
    end

    context 'when connector was already configured with different configurable field set' do
      let(:connector_stored_configuration) do
        {
            :foo => {
                :label => 'Foo',
                :value => nil
            }
        }
      end

      let(:connector_default_configuration) do
        {
            :lala => {
                :label => 'Lala',
                :value => 'hello'
            }
        }
      end

      it 'raises an error' do
        expect { subject.execute }.to raise_error(Core::IncompatibleConfigurableFieldsError)
      end
    end

    context 'when connector is running' do
      let(:connector_running) { true }

      it_behaves_like 'does not run a sync'
    end

    context 'when failing to make connector running' do
      before(:each) do
        allow(Core::ElasticConnectorActions).to receive(:update_connector_sync_start).and_raise(StandardError)
      end

      it_behaves_like 'does not run a sync'
    end

    context 'when failing to make job running' do
      before(:each) do
        allow(job).to receive(:make_running!).and_raise(StandardError)
      end

      it_behaves_like 'does not run a sync'
    end

    context 'when filtering is in state invalid' do
      let(:error_message) { "Active filtering is not in valid state (current state: #{filtering_validation_result[:state]}) for connector #{connector_id}. Please check active filtering in connectors index." }
      let(:connector_metadata) { nil }
      let(:filtering_validation_result) {
        {
          :state => Core::Filtering::ValidationStatus::INVALID,
          :errors => []
        }
      }

      it_behaves_like 'sync stops with error'
    end

    context 'when filtering is in state edited' do
      let(:error_message) { "Active filtering is not in valid state (current state: #{filtering_validation_result[:state]}) for connector #{connector_id}. Please check active filtering in connectors index." }
      let(:connector_metadata) { nil }
      let(:filtering_validation_result) {
        {
          :state => Core::Filtering::ValidationStatus::EDITED,
          :errors => []
        }
      }

      it_behaves_like 'sync stops with error'
    end

    context 'when filtering is in state valid, but errors are present' do
      let(:error_message) { "Active filtering is in valid state, but errors were detected (errors: #{filtering_validation_result[:errors]}) for connector #{connector_id}. Please check active filtering in connectors index." }
      let(:connector_metadata) { nil }
      let(:filtering_validation_result) {
        {
          :state => Core::Filtering::ValidationStatus::VALID,
          :errors => ['Error']
        }
      }

      it_behaves_like 'sync stops with error'
    end

    it_behaves_like 'claims the job'

    it 'flushes the sink' do
      # We don't ingest anything, but flush still happens just in case.
      # This is done so that the last batch of documents is always ingested into the sink
      expect(sink).to receive(:flush)

      subject.execute
    end

    it_behaves_like 'runs a full sync'

    context 'when an error occurs' do
      let(:error_message) { 'error message' }
      before(:each) do
        allow(connector_instance).to receive(:do_health_check!).and_raise(StandardError.new(error_message))
      end

      it_behaves_like 'sync stops with error'
    end

    context 'when validation thread did not finish execution' do
      let(:error_message) { 'Sync thread didn\'t finish execution. Check connector logs for more details.' }
      before(:each) do
        # Exception, which is not rescued (treated like something, which stopped the sync thread)
        allow(connector_instance).to receive(:do_health_check!).and_raise(Exception.new('Oh no!'))
      end

      it 'sets an error, that the validation thread was killed' do
        # Check for exception thrown on purpose, so that the test is not marked as failed for the wrong reason
        expect { subject.execute }.to raise_exception

        expect(subject.instance_variable_get(:@sync_status)).to eq(Connectors::SyncStatus::ERROR)
        expect(subject.instance_variable_get(:@sync_error)).to eq('Sync thread didn\'t finish execution. Check connector logs for more details.')
      end
    end

    context 'when a bunch of documents are returned from 3rd-party system' do
      let(:doc1) do
        {
          :id => 1,
          :title => 'Hello',
          :body => 'World'
        }
      end

      let(:doc2) do
        {
          :id => 2,
          :title => 'thanks',
          :body => 'for the fish'
        }
      end

      let(:extracted_documents) { [doc1, doc2] } # documents returned from 3rd-party system

      shared_examples_for 'indexes all docs' do
        it '' do
          expect(sink).to receive(:ingest).with(doc1)
          expect(sink).to receive(:ingest).with(doc2)

          subject.execute
        end
      end

      shared_examples_for 'validates filtering' do
        it '' do
          expect(connector_class).to receive(:validate_filtering)

          subject.execute
        end
      end

      shared_context 'exclude one document' do
        let(:additional_rules) do
          [
            Core::Filtering::SimpleRule.from_args('1', 'exclude', 'title', 'equals', 'Hello').to_h
          ]
        end

        before(:each) do
          filtering[0]['rules'].unshift(*additional_rules)
        end
      end

      it_behaves_like 'validates filtering'
      it_behaves_like 'indexes all docs'
      it_behaves_like 'runs a full sync'

      context 'when filtering rule feature is enabled' do
        include_context 'exclude one document'

        let(:any_filtering_feature_enabled) { true }
        let(:filtering_rule_feature_enabled) { true }

        it 'does not ingest the excluded document' do
          expect(sink).to_not receive(:ingest).with(doc1)
          expect(sink).to receive(:ingest).with(doc2)

          subject.execute
        end

        context 'with non-matching rule' do
          let(:additional_rules) do
            [
              Core::Filtering::SimpleRule.from_args('1', 'exclude', 'foo', 'equals', 'Hello').to_h
            ]
          end

          it_behaves_like 'validates filtering'
          it_behaves_like 'indexes all docs'
          it_behaves_like 'runs a full sync'
        end
      end

      context 'when no filtering feature is enabled' do
        # will be ignored as no filtering feature is enabled
        include_context 'exclude one document'

        let(:any_filtering_feature_enabled) { false }
        let(:filtering_rule_feature_enabled) { false }

        # filtering won't be validated if all filtering features are disabled
        it 'does not validate filtering' do
          expect(connector_class).to_not receive(:validate_filtering)
        end

        it_behaves_like 'indexes all docs'
        it_behaves_like 'runs a full sync'
      end

      context 'when another filtering feature is enabled while filtering basic rule feature is disabled' do
        # will be ignored as basic rule filtering is disabled
        include_context 'exclude one document'

        let(:any_filtering_feature_enabled) { true }
        let(:filtering_rule_feature_enabled) { false }

        it_behaves_like 'validates filtering'
        it_behaves_like 'indexes all docs'
        it_behaves_like 'runs a full sync'
      end

      context 'when some documents were present before' do
        let(:existing_document_ids) { [3, 4, 'lala', 'some other id'] }

        it 'attempts to remove existing documents' do
          existing_document_ids.each do |id|
            expect(sink).to receive(:delete).with(id)
          end

          subject.execute
        end

        it_behaves_like 'runs a full sync'

        context 'when an error happens during sync' do
          let(:error_message) { 'whoops' }
          before(:each) do
            allow(sink).to receive(:flush).and_raise('whoops')
          end

          it_behaves_like 'sync stops with error'
        end
      end

      context 'with reporting' do
        before(:each) do
          # it will check job and report metadata for every document
          stub_const("#{described_class}::JOB_REPORTING_INTERVAL", 0)
        end

        it 'reports metadata' do
          expect(job).to receive(:update_metadata).with(ingestion_stats, connector_metadata)

          subject.execute
        end

        context 'when connector is deleted' do
          before(:each) do
            allow(Core::ConnectorSettings).to receive(:fetch_by_id).and_return(nil)
          end

          it 'marks the job as error' do
            expect(job).to receive(:error!).with(Core::ConnectorNotFoundError.new(connector_id).message, ingestion_stats, connector_metadata)

            subject.execute
          end
        end

        context 'when job is deleted' do
          before(:each) do
            allow(Core::ConnectorJob).to receive(:fetch_by_id).and_return(nil)
          end

          it 'updates connector' do
            expect(connector_settings).to receive(:update_last_sync!)

            subject.execute
          end
        end

        context 'when job is canceled' do
          let(:job_canceling) { true }

          it 'cancels the job' do
            expect(job).to receive(:cancel!).with(ingestion_stats, connector_metadata)
            expect(connector_settings).to receive(:update_last_sync!)

            subject.execute
          end
        end

        context 'when job is not in_progress' do
          let(:job_in_progress) { false }
          let(:job_status) { Connectors::SyncStatus::COMPLETED }

          before(:each) do
            allow(job).to receive(:status).and_return(job_status)
          end

          it 'marks the job error' do
            expect(job).to receive(:error!).with(Core::ConnectorJobNotRunningError.new(job_id, job_status).message, ingestion_stats, connector_metadata)
            expect(connector_settings).to receive(:update_last_sync!)

            subject.execute
          end
        end
      end
    end
  end

  context 'ingest metadata' do
    let(:document) { { 'body' => 'hello, world' } }

    it 'augments document data' do
      subject.send(:add_ingest_metadata, document)
      expect(document['_extract_binary_content']).to be
      expect(document['_reduce_whitespace']).to be
      expect(document['_run_ml_inference']).to be
    end

    context 'when the settings are false' do
      let(:extract_binary_content) { false }
      let(:reduce_whitespace) { false }
      let(:run_ml_inference) { false }

      it 'does not augment data' do
        subject.send(:add_ingest_metadata, document)
        expect(document['_extract_binary_content']).to_not be
        expect(document['_reduce_whitespace']).to_not be
        expect(document['_run_ml_inference']).to_not be
      end
    end
  end
end
