in source/plugins/ruby/out_health_forward.rb [172:286]
def configure(conf)
compat_parameters_convert(conf, :buffer, default_chunk_key: 'tag')
super
unless @chunk_key_tag
raise Fluent::ConfigError, "buffer chunk key must include 'tag' for forward output"
end
@read_interval = @read_interval_msec / 1000.0
@recover_sample_size = @recover_wait / @heartbeat_interval
if @heartbeat_type == :tcp
log.warn "'heartbeat_type tcp' is deprecated. use 'transport' instead."
@heartbeat_type = :transport
end
if @dns_round_robin && @heartbeat_type == :udp
raise Fluent::ConfigError, "forward output heartbeat type must be 'transport' or 'none' to use dns_round_robin option"
end
if @transport == :tls
if @tls_cert_path && !@tls_cert_path.empty?
@tls_ca_cert_path = @tls_cert_path
end
if @tls_ca_cert_path && !@tls_ca_cert_path.empty?
@tls_ca_cert_path.each do |path|
raise Fluent::ConfigError, "specified cert path does not exist:#{path}" unless File.exist?(path)
raise Fluent::ConfigError, "specified cert path is not readable:#{path}" unless File.readable?(path)
end
end
if @tls_insecure_mode
log.warn "TLS transport is configured in insecure way"
@tls_verify_hostname = false
@tls_allow_self_signed_cert = true
end
if Fluent.windows?
if (@tls_cert_path || @tls_ca_cert_path) && @tls_cert_logical_store_name
raise Fluent::ConfigError, "specified both cert path and tls_cert_logical_store_name is not permitted"
end
else
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_logical_store_name
raise Fluent::ConfigError, "This parameter is for only Windows" if @tls_cert_thumbprint
end
end
@ack_handler = @require_ack_response ? AckHandler.new(timeout: @ack_response_timeout, log: @log, read_length: @read_length) : nil
socket_cache = @keepalive ? SocketCache.new(@keepalive_timeout, @log) : nil
@connection_manager = Fluent::Plugin::ForwardOutput::ConnectionManager.new(
log: @log,
secure: !!@security,
connection_factory: method(:create_transfer_socket),
socket_cache: socket_cache,
)
configs = []
conf.elements(name: 'server').each do |s|
s.name = 'service'
end
unless conf.elements(name: 'service').empty?
new_elem = Fluent::Config::Element.new('static_service_discovery', {}, {}, conf.elements(name: 'service'))
configs << { type: :static, conf: new_elem }
end
conf.elements(name: 'service_discovery').each_with_index do |c, i|
configs << { type: @service_discovery[i][:@type], conf: c }
end
service_discovery_create_manager(
:out_forward_service_discovery_watcher,
configurations: configs,
load_balancer: Fluent::Plugin::ForwardOutput::LoadBalancer.new(log),
custom_build_method: method(:build_node),
)
discovery_manager.services.each do |server|
@nodes << server
unless @heartbeat_type == :none
begin
server.validate_host_resolution!
rescue => e
raise unless @ignore_network_errors_at_startup
log.warn "failed to resolve node name when configured", server: (server.name || server.host), error: e
server.disable!
end
end
end
unless @as_secondary
if @compress == :gzip && @buffer.compress == :text
@buffer.compress = :gzip
elsif @compress == :text && @buffer.compress == :gzip
log.info "buffer is compressed. If you also want to save the bandwidth of a network, Add `compress` configuration in <match>"
end
end
if discovery_manager.services.empty?
raise Fluent::ConfigError, "forward output plugin requires at least one node is required. Add <server> or <service_discovery>"
end
if !@keepalive && @keepalive_timeout
log.warn('The value of keepalive_timeout is ignored. if you want to use keepalive, please add `keepalive true` to your conf.')
end
raise Fluent::ConfigError, "ack_response_timeout must be a positive integer" if @ack_response_timeout < 1
end