# encoding: utf-8
require "logstash/outputs/base"
require "logstash/namespace"
require "logstash/environment"
require "stud/buffer"
require 'socket'
require "java"
require 'json'

root_dir = File.expand_path(File.join(File.dirname(__FILE__), ".."))
LogStash::Environment.load_runtime_jars! File.join(root_dir, "vendor")

class LogStash::Outputs::LogService < LogStash::Outputs::Base
  include Stud::Buffer

  config_name "logservice"

  # log service config, https://help.aliyun.com/document_detail/sls/api/endpoints.html
  config :endpoint, :validate => :string, :required => true
  config :project, :validate=> :string, :required=> true
  config :logstore, :validate=> :string, :required=> true
  config :topic, :validate=> :string, :required=> false, :default=> ""
  # if source is null, will set ip default
  config :source, :validate=> :string, :required=> true

  # access_key_id/access_key_secret created by account of aliyun.com
  config :access_key_id, :validate=> :string, :required=> true
  config :access_key_secret, :validate=> :string, :required=> true

  # default 4000 logs in a logGroup for batch send
  config :max_buffer_items, :validate=> :number, :required=> false, :default=> 4096
  # default 2*1024*1024 Bytes in a logGroup for batch send
  config :max_buffer_bytes, :validate=> :number, :required=> false, :default=> 2097152
  # for batch send, logGroup will emit in default 3 seconds
  config :max_buffer_seconds, :validate=> :number, :required=> false, :default=> 3
  # the maximum log size that a single producer instance can cache is 100MB by default.
  config :total_size_in_bytes, :validate=> :number, :required=> false, :default=> 104857600

  # logGroup will retry send to log service when error happened, and will be discard when retry times exceed limit
  config :max_send_retry, :validate=> :number, :required=> true, :default=> 10
  # sleep default 200 milliseconds before retry next send
  config :send_retry_interval, :validate=> :number, :required=> false, :default=> 200

  config :to_json, :validate=> :boolean, :required=> false, :default=> true
  config :time_key, :validate=> :string, :required=> false, :default=> "@timestamp"

  LP = com.aliyun.openservices.aliyun.log.producer
  LogCommon = com.shade.aliyun.openservices.log.common


  public
  def register
    @producerConfig = LP.ProducerConfig::new();
    @producerConfig.setBatchCountThreshold(@max_buffer_items);
    @producerConfig.setBatchSizeThresholdInBytes(@max_buffer_bytes);
    @producerConfig.setLingerMs(@max_buffer_seconds*1000);
    @producerConfig.setRetries(@max_send_retry);
    @producerConfig.setBaseRetryBackoffMs(@send_retry_interval);
    @producerConfig.setTotalSizeInBytes(@total_size_in_bytes);
    @producer = LP.LogProducer::new(@producerConfig);
    @producer.putProjectConfig(LP.ProjectConfig::new(@project, @endpoint, @access_key_id,  @access_key_secret));

    @logger.info("init logstash-output-logservice plugin", :endpoint => @endpoint, :project => @project, :logstore => @logstore, :topic => @topic, :source => @source, :max_buffer_bytes => @max_buffer_bytes)
  end # def register

  public
  def receive(event)
    begin
      @event_map = event.to_hash
      if @event_map.size < 1
        return
      end
      @logitem = LogCommon.LogItem.new
      #@timestamp like 2016-02-18T03:23:11.053Z
      time_value = @event_map[@time_key]
      if time_value.nil?
         time_value = @event_map['@timestamp']
         @logger.warn("The time_key is nil, use @timestamp")
      end
      time_s = Time.parse(time_value.to_s)
      @logitem.SetTime(time_s.to_i)
      time_ms_part=time_s.to_datetime().strftime('%L').to_i
      @logitem.SetTimeNsPart(time_ms_part*1000000)
      @event_map.each do | key, value |
        @key_str = key.to_s
        if @key_str == '__time__'
          next
        end
        if value.instance_of? Hash
          @value_str = value.to_json
        else
          @value_str = value.to_s
        end
        @logitem.PushBack(@key_str, @value_str)
      end
      @producer.send(@project, @logstore, @topic, @source, @logitem)
      rescue => e
        @logger.warn("send log data fail", :exception => e)
      end
    end # def event

  def flush(events, close=false)
  end

  public
  def close
    @producer.close();
  end # def close

end # class LogStash::Outputs::LogService