in lib/fluent/plugin/out_kinesis_streams_aggregated.rb [44:58]
def write(chunk)
stream_name = extract_placeholders(@stream_name, chunk)
write_records_batch(chunk, stream_name) do |batch|
key = @partition_key_generator.call
records = batch.map{|(data)|data}
client.put_records(
stream_name: stream_name,
records: [{
partition_key: key,
data: aggregator.aggregate(records, key),
}],
)
end
end