write

in lib/fluent/plugin/out_kinesis_streams_aggregated.rb [44:58]


      def write(chunk)
        stream_name = extract_placeholders(@stream_name, chunk)
        write_records_batch(chunk, stream_name) do |batch|
          key = @partition_key_generator.call
          records = batch.map{|(data)|data}
          client.put_records(
            stream_name: stream_name,
            records: [{
              partition_key: key,
              data: aggregator.aggregate(records, key),
            }],
          )
        end
      end