in src/fluentd/src/fluent-plugin-pgjson/lib/fluent/plugin/out_pgjson.rb [231:273]
def write(chunk)
hex_id = dump_unique_id_hex chunk.unique_id
log.info "[pgjson] chunk #{hex_id} received"
thread = Thread.current
if ! thread.key?(:conn)
init_connection
end
if ! thread[:conn].nil?
begin
chunk.msgpack_each do |time, record|
kind = record["objectSnapshot"]["kind"]
trigger = record['objectSnapshotTrigger']
log.info "[pgjson] object type #{kind} triggered by #{trigger} found in chunk #{hex_id}"
if trigger == "OnFrameworkRetry" && kind == "Framework"
insert_framework hex_id, time, record
elsif trigger == "OnTaskRetry" && kind == "Task"
insert_task hex_id, time, "retry", record
elsif trigger == "OnTaskDeletion" && kind == "Task"
insert_task hex_id, time, "deletion", record
elsif trigger == "OnPodDeletion" && kind == "Pod"
insert_pod hex_id, time, record
else
logMessage = record['logMessage']
log.info "[pgjson] object type #{kind} triggered by #{trigger} in chunk #{hex_id} ignored. log message: #{logMessage}"
end
end
rescue PG::ConnectionBad, PG::UnableToSend => err
reset_connection
log.warn "[pgjson] connection error happens for chunk #{hex_id}. message: #{err.message}"
retry
rescue PG::Error => err
log.warn "[pgjson] PG::Error happens for chunk #{hex_id}. message: #{err.message}"
errmsg = "error class %s happens, message: %s" % [ err.class.name, err.message ]
raise errmsg
else
log.info "[pgjson] chunk #{hex_id} writes successfully"
end
else
raise "Cannot connect to db host."
end
end