in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AggregatorUtil.java [94:196]
public List<KinesisClientRecord> deaggregate(List<KinesisClientRecord> records,
BigInteger startingHashKey,
BigInteger endingHashKey) {
List<KinesisClientRecord> result = new ArrayList<>();
byte[] magic = new byte[AGGREGATED_RECORD_MAGIC.length];
byte[] digest = new byte[DIGEST_SIZE];
for (KinesisClientRecord r : records) {
boolean isAggregated = true;
long subSeqNum = 0;
ByteBuffer bb = r.data();
if (bb.remaining() >= magic.length) {
bb.get(magic);
} else {
isAggregated = false;
}
if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) {
isAggregated = false;
}
if (isAggregated) {
int oldLimit = bb.limit();
bb.limit(oldLimit - DIGEST_SIZE);
byte[] messageData = new byte[bb.remaining()];
bb.get(messageData);
bb.limit(oldLimit);
bb.get(digest);
byte[] calculatedDigest = calculateTailCheck(messageData);
if (!Arrays.equals(digest, calculatedDigest)) {
isAggregated = false;
} else {
try {
Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
List<String> pks = ar.getPartitionKeyTableList();
List<String> ehks = ar.getExplicitHashKeyTableList();
long aat = r.approximateArrivalTimestamp() == null
? -1 : r.approximateArrivalTimestamp().toEpochMilli();
try {
int recordsInCurrRecord = 0;
for (Messages.Record mr : ar.getRecordsList()) {
String explicitHashKey = null;
String partitionKey = pks.get((int) mr.getPartitionKeyIndex());
if (mr.hasExplicitHashKeyIndex()) {
explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex());
}
BigInteger effectiveHashKey = effectiveHashKey(partitionKey, explicitHashKey);
if (effectiveHashKey.compareTo(startingHashKey) < 0
|| effectiveHashKey.compareTo(endingHashKey) > 0) {
for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
result.remove(result.size() - 1);
}
break;
}
++recordsInCurrRecord;
KinesisClientRecord record = r.toBuilder()
.data(ByteBuffer.wrap(mr.getData().toByteArray()))
.partitionKey(partitionKey)
.explicitHashKey(explicitHashKey)
.build();
result.add(convertRecordToKinesisClientRecord(record, true, subSeqNum++, explicitHashKey));
}
} catch (Exception e) {
StringBuilder sb = new StringBuilder();
sb.append("Unexpected exception during deaggregation, record was:\n");
sb.append("PKS:\n");
for (String s : pks) {
sb.append(s).append("\n");
}
sb.append("EHKS: \n");
for (String s : ehks) {
sb.append(s).append("\n");
}
for (Messages.Record mr : ar.getRecordsList()) {
sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
.append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
.append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
.append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
}
sb.append("Sequence number: ").append(r.sequenceNumber()).append("\n")
.append("Raw data: ")
.append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
log.error(sb.toString(), e);
}
} catch (InvalidProtocolBufferException e) {
isAggregated = false;
}
}
}
if (!isAggregated) {
bb.rewind();
result.add(r);
}
}
return result;
}