# Copyright Elasticsearch B.V. and/or licensed to Elasticsearch B.V. under one
# or more contributor license agreements. Licensed under the Elastic License;
# you may not use this file except in compliance with the Elastic License.

require_relative "../spec_helper"
require "stud/temporary"
require "rspec/wait"

describe "Read configuration from elasticsearch" do
  before :each do
    @elasticsearch_service = elasticsearch

    @temporary_directory = Stud::Temporary.directory

    @pipelines = {
      "super_generator" => "input { generator { count => 100 } tcp { port => 6000 } } output { file { path => '#{@temporary_directory}/super_generator' }}",
      "another_generator" => "input { generator { count => 100 } tcp { port => 6002 } } output { file { path => '#{@temporary_directory}/another_generator' }}",
      "hello" => nil
    }

    cleanup_system_indices(@pipelines.keys)
    cleanup_elasticsearch(".monitoring-logstash*")

    @pipelines.each do |pipeline_id, config|
      push_elasticsearch_config(pipeline_id, config) unless config.nil?
    end

    @logstash_service = logstash("bin/logstash -w 1", {
      :settings => {
        "xpack.management.enabled" => true,
        "xpack.management.pipeline.id" => @pipelines.keys + ["*"],
        "xpack.management.logstash.poll_interval" => "1s",
        "xpack.management.elasticsearch.hosts" => ["http://localhost:9200"],
        "xpack.management.elasticsearch.username" => "elastic",
        "xpack.management.elasticsearch.password" => elastic_password,
        "xpack.monitoring.allow_legacy_collection" => true,
        "xpack.monitoring.elasticsearch.username" => "elastic",
        "xpack.monitoring.elasticsearch.password" => elastic_password

      }, # will be saved in the logstash.yml
      :belzebuth => {
        :wait_condition => /Pipelines running/,
        :timeout => 5 * 60 # Fail safe, this mean something went wrong if we hit this before the wait_condition
      }
    })
  end

  let(:max_retry) { 100 }

  it "fetches the multiples configuration from elasticsearch" do
    elasticsearch_client.indices.refresh

    Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
      @pipelines.keys do |pipeline_id|
        expect(File.exist?(File.join(@temporary_directory, pipeline_id))).to be_truthy
      end
    end
  end

  it "fetches new pipelines" do
    temporary_file = File.join(Stud::Temporary.directory, "hello.log")
    new_config = "input { generator { count => 10000 } tcp { port => 6008 }} output { file { path => '#{temporary_file}' } }"

    expect(File.exist?(temporary_file)).to be_falsey
    push_elasticsearch_config("hello", new_config)
    elasticsearch_client.indices.refresh

    Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
      expect(File.exist?(temporary_file)).to be_truthy
    end
  end

  it "should immediately register a new pipeline state document when the pipeline is reloaded" do
    wait(40).for do
      count_hashes(@pipelines.keys)
    end.to eq(2)

    pipelines = {
      "super_generator" => "input { generator { count => 100 } tcp { port => 6005 } } output { file { path => '#{@temporary_directory}/super_generator_new' }}",
      "another_generator" => "input { generator { count => 100 } tcp { port => 6006 } } output { file { path => '#{@temporary_directory}/another_generator_new' }}"
    }

    pipelines.each do |pipeline_id, config|
      expect(File.exist?(File.join(@temporary_directory, "#{pipeline_id}_new"))).to be_falsey
      push_elasticsearch_config(pipeline_id, config)
    end

    wait(40).for do
      count_hashes(@pipelines.keys)
    end.to eq(4)
  end

  it "add new pipelines" do
    temporary_file = File.join(Stud::Temporary.directory, "wildcard_pipeline.log")
    new_config = "input { generator { count => 10000 } tcp { port => 6008 }} output { file { path => '#{temporary_file}' } }"

    expect(File.exist?(temporary_file)).to be_falsey
    push_elasticsearch_config("wildcard_pipeline", new_config)
    elasticsearch_client.indices.refresh

    Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
      expect(File.exist?(temporary_file)).to be_truthy
    end
  end

  # Returns the number of hashes for the list of pipelines
  # Returns nil if the response is bad
  # This can happen if ES is not yet up or if the data is not yet in ES
  def count_hashes(pipelines)
    elasticsearch_client.indices.refresh

    query = {
      "size": 0,
      "query": {
        "term": {
          "type": {
            "value": "logstash_state"
          }
        }
      },
      "aggs": {
        "pipeline_ids": {
          "terms": {
            "field": "logstash_state.pipeline.id",
            "size": 10
          },
          "aggs": {
            "pipeline_hashes": {
              "terms": {
                "field": "logstash_state.pipeline.hash",
                "size": 10
              }
            }
          }
        }
      }
    }

    begin
      res = elasticsearch_client.search(index: '.monitoring-logstash-*', body: query)
    rescue Elastic::Transport::Transport::Errors::ServiceUnavailable
      return nil
    end

    begin
      hashes = res["aggregations"]["pipeline_ids"]["buckets"]
          .select {|b| pipelines.include?(b["key"])}
          .map {|b| b["pipeline_hashes"] }

      puts hashes.inspect
      hashes
          .flat_map {|b| b["buckets"]}
          .size
    rescue
      # In the case that some part of the path is missing just return nil
      nil
    end
  end

  it "reloads the configuration when its different from the running pipeline" do
    pipelines = {
      "super_generator" => "input { generator { count => 100 } tcp { port => 6005 } } output { file { path => '#{@temporary_directory}/super_generator_new' }}",
      "another_generator" => "input { generator { count => 100 } tcp { port => 6006 } } output { file { path => '#{@temporary_directory}/another_generator_new' }}"
    }

    pipelines.each do |pipeline_id, config|
      expect(File.exist?(File.join(@temporary_directory, "#{pipeline_id}_new"))).to be_falsey
      push_elasticsearch_config(pipeline_id, config)
    end

    elasticsearch_client.indices.refresh

    pipelines.keys.each do |pipeline_id|
      Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
        expect(File.exist?(File.join(@temporary_directory, "#{pipeline_id}_new"))).to be_truthy
      end
    end
  end

  it "should pick up recreated pipeline with the same config string and different metadata" do
    elasticsearch_client.indices.refresh

    pipeline_id = @pipelines.keys[0]
    config = @pipelines.values[0]
    file = File.join(@temporary_directory, pipeline_id)

    Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
      expect(File.exist?(file)).to be_truthy
    end

    cleanup_system_indices([pipeline_id])
    File.delete(file)
    expect(File.exist?(file)).to be_falsey

    push_elasticsearch_config(pipeline_id, config, "2")
    elasticsearch_client.indices.refresh

    Stud.try(max_retry.times, [RSpec::Expectations::ExpectationNotMetError]) do
      expect(File.exist?(file)).to be_truthy
    end
  end

  after :each do
    @logstash_service.stop if !!@logstash_service
    @elasticsearch_service.stop if !!@elasticsearch_service
  end
end
