in lib/fluent/plugin/kinesis_helper/aggregator.rb [57:70]
def deaggregate(encoded)
unless aggregated?(encoded)
raise InvalidEncodingError, "Invalid MagicNumber #{encoded[0..3]}}"
end
message, digest = encoded[4..encoded.length-17], encoded[encoded.length-16..-1]
if Digest::MD5.digest(message) != digest
raise InvalidEncodingError, "Digest mismatch #{digest}"
end
decoded = AggregatedRecord.decode(message)
records = decoded.records.map(&:data)
partition_key = decoded.partition_key_table[1]
[records, partition_key]
end