in lib/logstash/outputs/logservice.rb [51:103]
def register
@producerConfig = LP.ProducerConfig::new();
@producerConfig.setBatchCountThreshold(@max_buffer_items);
@producerConfig.setBatchSizeThresholdInBytes(@max_buffer_bytes);
@producerConfig.setLingerMs(@max_buffer_seconds*1000);
@producerConfig.setRetries(@max_send_retry);
@producerConfig.setBaseRetryBackoffMs(@send_retry_interval);
@producerConfig.setTotalSizeInBytes(@total_size_in_bytes);
@producer = LP.LogProducer::new(@producerConfig);
@producer.putProjectConfig(LP.ProjectConfig::new(@project, @endpoint, @access_key_id, @access_key_secret));
@logger.info("init logstash-output-logservice plugin", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :topic => @topic, :source => @source, :max_buffer_bytes => @max_buffer_bytes)
end
public
def receive(event)
begin
@event_map = event.to_hash
if @event_map.size < 1
return
end
@logitem = LogCommon.LogItem.new
time_value = @event_map[@time_key]
if time_value.nil?
time_value = @event_map['@timestamp']
@logger.warn("The time_key is nil, use @timestamp")
end
time_s = Time.parse(time_value.to_s)
@logitem.SetTime(time_s.to_i)
time_ms_part=time_s.to_datetime().strftime('%L').to_i
@logitem.SetTimeNsPart(time_ms_part*1000000)
@event_map.each do | key, value |
@key_str = key.to_s
if @key_str == '__time__'
next
end
if value.instance_of? Hash
@value_str = value.to_json
else
@value_str = value.to_s
end
@logitem.PushBack(@key_str, @value_str)
end
@producer.send(@project, @logstore, @topic, @source, @logitem)
rescue => e
@logger.warn("send log data fail", :exception => e)
end
end
def flush(events, close=false)
end