public List deaggregate()

in amazon-kinesis-client/src/main/java/software/amazon/kinesis/retrieval/AggregatorUtil.java [94:196]


    public List<KinesisClientRecord> deaggregate(List<KinesisClientRecord> records,
                                                        BigInteger startingHashKey,
                                                        BigInteger endingHashKey) {
        List<KinesisClientRecord> result = new ArrayList<>();
        byte[] magic = new byte[AGGREGATED_RECORD_MAGIC.length];
        byte[] digest = new byte[DIGEST_SIZE];

        for (KinesisClientRecord r : records) {
            boolean isAggregated = true;
            long subSeqNum = 0;
            ByteBuffer bb = r.data();

            if (bb.remaining() >= magic.length) {
                bb.get(magic);
            } else {
                isAggregated = false;
            }

            if (!Arrays.equals(AGGREGATED_RECORD_MAGIC, magic) || bb.remaining() <= DIGEST_SIZE) {
                isAggregated = false;
            }

            if (isAggregated) {
                int oldLimit = bb.limit();
                bb.limit(oldLimit - DIGEST_SIZE);
                byte[] messageData = new byte[bb.remaining()];
                bb.get(messageData);
                bb.limit(oldLimit);
                bb.get(digest);
                byte[] calculatedDigest = calculateTailCheck(messageData);

                if (!Arrays.equals(digest, calculatedDigest)) {
                    isAggregated = false;
                } else {
                    try {
                        Messages.AggregatedRecord ar = Messages.AggregatedRecord.parseFrom(messageData);
                        List<String> pks = ar.getPartitionKeyTableList();
                        List<String> ehks = ar.getExplicitHashKeyTableList();
                        long aat = r.approximateArrivalTimestamp() == null
                                ? -1 : r.approximateArrivalTimestamp().toEpochMilli();
                        try {
                            int recordsInCurrRecord = 0;
                            for (Messages.Record mr : ar.getRecordsList()) {
                                String explicitHashKey = null;
                                String partitionKey = pks.get((int) mr.getPartitionKeyIndex());
                                if (mr.hasExplicitHashKeyIndex()) {
                                    explicitHashKey = ehks.get((int) mr.getExplicitHashKeyIndex());
                                }

                                BigInteger effectiveHashKey = effectiveHashKey(partitionKey, explicitHashKey);

                                if (effectiveHashKey.compareTo(startingHashKey) < 0
                                        || effectiveHashKey.compareTo(endingHashKey) > 0) {
                                    for (int toRemove = 0; toRemove < recordsInCurrRecord; ++toRemove) {
                                        result.remove(result.size() - 1);
                                    }
                                    break;
                                }

                                ++recordsInCurrRecord;

                                KinesisClientRecord record = r.toBuilder()
                                        .data(ByteBuffer.wrap(mr.getData().toByteArray()))
                                        .partitionKey(partitionKey)
                                        .explicitHashKey(explicitHashKey)
                                        .build();
                                result.add(convertRecordToKinesisClientRecord(record, true, subSeqNum++, explicitHashKey));
                            }
                        } catch (Exception e) {
                            StringBuilder sb = new StringBuilder();
                            sb.append("Unexpected exception during deaggregation, record was:\n");
                            sb.append("PKS:\n");
                            for (String s : pks) {
                                sb.append(s).append("\n");
                            }
                            sb.append("EHKS: \n");
                            for (String s : ehks) {
                                sb.append(s).append("\n");
                            }
                            for (Messages.Record mr : ar.getRecordsList()) {
                                sb.append("Record: [hasEhk=").append(mr.hasExplicitHashKeyIndex()).append(", ")
                                        .append("ehkIdx=").append(mr.getExplicitHashKeyIndex()).append(", ")
                                        .append("pkIdx=").append(mr.getPartitionKeyIndex()).append(", ")
                                        .append("dataLen=").append(mr.getData().toByteArray().length).append("]\n");
                            }
                            sb.append("Sequence number: ").append(r.sequenceNumber()).append("\n")
                                    .append("Raw data: ")
                                    .append(javax.xml.bind.DatatypeConverter.printBase64Binary(messageData)).append("\n");
                            log.error(sb.toString(), e);
                        }
                    } catch (InvalidProtocolBufferException e) {
                        isAggregated = false;
                    }
                }
            }

            if (!isAggregated) {
                bb.rewind();
                result.add(r);
            }
        }
        return result;
    }