write

in src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb [231:273]


    def write(chunk)
      hex_id = dump_unique_id_hex chunk.unique_id
      log.info "[pgjson] chunk #{hex_id} received"
      thread = Thread.current
      if ! thread.key?(:conn)
        init_connection
      end
      if ! thread[:conn].nil?
        begin
          chunk.msgpack_each do |time, record|
            kind = record["objectSnapshot"]["kind"]
            trigger = record['objectSnapshotTrigger']
            log.info "[pgjson] object type #{kind} triggered by #{trigger} found in chunk #{hex_id}"
            if trigger == "OnFrameworkRetry" && kind == "Framework"
              insert_framework hex_id, time, record
            elsif trigger == "OnTaskRetry" && kind == "Task"
              insert_task hex_id, time, "retry", record
            elsif trigger == "OnTaskDeletion" && kind == "Task"
              insert_task hex_id, time, "deletion", record
            elsif trigger == "OnPodDeletion" && kind == "Pod"
              insert_pod hex_id, time, record
            else
              logMessage = record['logMessage']
              log.info "[pgjson] object type #{kind} triggered by #{trigger} in chunk #{hex_id} ignored. log message: #{logMessage}"
            end
          end
        rescue PG::ConnectionBad, PG::UnableToSend => err
          
          reset_connection 
          log.warn "[pgjson] connection error happens for chunk #{hex_id}. message: #{err.message}"
          retry
        rescue PG::Error => err
          log.warn "[pgjson] PG::Error happens for chunk #{hex_id}. message: #{err.message}"
          errmsg = "error class %s happens, message: %s" % [ err.class.name, err.message ]
          raise errmsg
        else
          log.info "[pgjson] chunk #{hex_id} writes successfully"
        end
      else
        raise "Cannot connect to db host."
      end
    end