in lib/fluent/plugin/kinesis.rb [93:119]
def data_formatter_create(conf)
formatter = formatter_create
compressor = compressor_create
if @data_key.nil?
if @chomp_record
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
compressor.call(formatter.format(tag, time, record).chomp.b)
}
else
->(tag, time, record) {
record = inject_values_to_record(tag, time, record)
compressor.call(formatter.format(tag, time, record).b)
}
end
else
->(tag, time, record) {
raise InvalidRecordError, record unless record.is_a? Hash
raise KeyNotFoundError.new(@data_key, record) if record[@data_key].nil?
compressor.call(record[@data_key].to_s.b)
}
end
end