configure

in source/plugins/ruby/out_health_forward.rb [172:286]


    def configure(conf)
      compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')

      super

      unless @chunk_key_tag
        raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
      end

      @read_interval = @read_interval_msec / 1000.0
      @recover_sample_size = @recover_wait / @heartbeat_interval

      if @heartbeat_type == :tcp
        log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
        @heartbeat_type = :transport
      end

      if @dns_round_robin && @heartbeat_type == :udp
        raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
      end

      if @transport == :tls
        
        if @tls_cert_path && !@tls_cert_path.empty?
          @tls_ca_cert_path = @tls_cert_path
        end
        if @tls_ca_cert_path && !@tls_ca_cert_path.empty?
          @tls_ca_cert_path.each do |path|
            raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
            raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
          end
        end

        if @tls_insecure_mode
          log.warn "TLS transport is configured in insecure way"
          @tls_verify_hostname = false
          @tls_allow_self_signed_cert = true
        end

        if Fluent.windows?
          if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name
            raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted"
          end
        else
          raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name
          raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint
        end
      end

      @ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
      socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
      @connection_manager = Fluent::Plugin::ForwardOutput::ConnectionManager.new(
        log: @log,
        secure: !!@security,
        connection_factory: method(:create_transfer_socket),
        socket_cache: socket_cache,
      )

      configs = []

      
      conf.elements(name: 'server').each do |s|
        s.name = 'service'
      end

      unless conf.elements(name: 'service').empty?
        
        new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
        configs << { type: :static, conf: new_elem }
      end

      conf.elements(name: 'service_discovery').each_with_index do |c, i|
        configs << { type: @service_discovery[i][:@type], conf: c }
      end

      service_discovery_create_manager(
        :out_forward_service_discovery_watcher,
        configurations: configs,
        load_balancer: Fluent::Plugin::ForwardOutput::LoadBalancer.new(log),
        custom_build_method: method(:build_node),
      )

      discovery_manager.services.each do |server|
        
        @nodes << server
        unless @heartbeat_type == :none
          begin
            server.validate_host_resolution!
          rescue => e
            raise unless @ignore_network_errors_at_startup
            log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
            server.disable!
          end
        end
      end

      unless @as_secondary
        if @compress == :gzip && @buffer.compress == :text
          @buffer.compress = :gzip
        elsif @compress == :text && @buffer.compress == :gzip
          log.info "buffer is compressed.  If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
        end
      end

      if discovery_manager.services.empty?
        raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
      end

      if !@keepalive && @keepalive_timeout
        log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
      end

      raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
    end