java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java [53:326]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@NotThreadSafe
public class AggRecord {
	// Serialization protocol constants via the specification at
	// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
	private static final byte[] AGGREGATED_RECORD_MAGIC = new byte[] { (byte) 0xf3, (byte) 0x89, (byte) 0x9a,
			(byte) 0xc2 };
	protected static final String MESSAGE_DIGEST_NAME = "MD5";
	private static final BigInteger UINT_128_MAX = new BigInteger(StringUtils.repeat("FF", 16), 16);

	// Kinesis Limits
	// (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)
	protected static final int MAX_BYTES_PER_RECORD = 1024 * 1024; // 1 MB
	protected static final int AGGREGATION_OVERHEAD_BYTES = 256;
	protected static final int PARTITION_KEY_MIN_LENGTH = 1;
	protected static final int PARTITION_KEY_MAX_LENGTH = 256;

	/** The current size of the aggregated protobuf message. */
	private int aggregatedMessageSizeBytes;
	/** The set of unique explicit hash keys in the protocol buffer message. */
	private final KeySet explicitHashKeys;
	/** The set of unique partition keys in the protocol buffer message. */
	private final KeySet partitionKeys;
	/**
	 * The current builder object by which we're actively constructing a record.
	 */
	private AggregatedRecord.Builder aggregatedRecordBuilder;
	/**
	 * The message digest to use for calculating MD5 checksums per the protocol
	 * specification.
	 */
	private final MessageDigest md5;
	/** The partition key for the entire aggregated record. */
	private String aggPartitionKey;
	/** The explicit hash key for the entire aggregated record. */
	private String aggExplicitHashKey;

	/**
	 * Construct a new (empty) aggregated Kinesis record.
	 */
	public AggRecord() {
		this.aggregatedRecordBuilder = AggregatedRecord.newBuilder();
		this.aggregatedMessageSizeBytes = 0;
		this.explicitHashKeys = new KeySet();
		this.partitionKeys = new KeySet();

		this.aggExplicitHashKey = "";
		this.aggPartitionKey = "";

		try {
			this.md5 = MessageDigest.getInstance(MESSAGE_DIGEST_NAME);
		} catch (NoSuchAlgorithmException e) {
			throw new IllegalStateException("Could not create an MD5 message digest.", e);
		}
	}

	/**
	 * Get the current number of user records contained inside this aggregate
	 * record.
	 * 
	 * @return The current number of user records added via the "addUserRecord(...)"
	 *         method.
	 */
	public int getNumUserRecords() {
		return this.aggregatedRecordBuilder.getRecordsCount();
	}

	/**
	 * Get the current size in bytes of the fully serialized aggregated record.
	 * 
	 * @return The current size in bytes of this message in its serialized form.
	 */
	public int getSizeBytes() {
		if (getNumUserRecords() == 0) {
			return 0;
		}

		return AGGREGATED_RECORD_MAGIC.length + this.aggregatedMessageSizeBytes + this.md5.getDigestLength();
	}

	/**
	 * Serialize this record to bytes. Has no side effects (i.e. does not affect the
	 * contents of this record object).
	 * 
	 * @return A byte array containing an Kinesis aggregated format-compatible
	 *         Kinesis record.
	 */
	public byte[] toRecordBytes() {
		if (getNumUserRecords() == 0) {
			return new byte[0];
		}

		byte[] messageBody = this.aggregatedRecordBuilder.build().toByteArray();

		this.md5.reset();
		byte[] messageDigest = this.md5.digest(messageBody);

		// The way Java's API works is that write(byte[]) throws IOException on
		// a ByteArrayOutputStream, but
		// write(byte[],int,int) doesn't so that's why we're using the long
		// version of "write" here
		ByteArrayOutputStream baos = new ByteArrayOutputStream(getSizeBytes());
		baos.write(AGGREGATED_RECORD_MAGIC, 0, AGGREGATED_RECORD_MAGIC.length);
		baos.write(messageBody, 0, messageBody.length);
		baos.write(messageDigest, 0, messageDigest.length);

		return baos.toByteArray();
	}

	/**
	 * Clears out all records and metadata from this object so that it can be reused
	 * just like a fresh instance of this object.
	 */
	public void clear() {
		this.md5.reset();
		this.aggExplicitHashKey = "";
		this.aggPartitionKey = "";
		this.aggregatedMessageSizeBytes = 0;
		this.explicitHashKeys.clear();
		this.partitionKeys.clear();
		this.aggregatedRecordBuilder = AggregatedRecord.newBuilder();
	}

	/**
	 * Get the overarching partition key for the entire aggregated record.
	 * 
	 * @return The partition key to use for the aggregated record or null if this
	 *         aggregated record is empty.
	 */
	public String getPartitionKey() {
		if (getNumUserRecords() == 0) {
			return null;
		}

		return this.aggPartitionKey;
	}

	/**
	 * Get the overarching explicit hash key for the entire aggregated record.
	 * 
	 * @return The explicit hash key to use for the aggregated record or null if
	 *         this aggregated record is empty.
	 */
	public String getExplicitHashKey() {
		if (getNumUserRecords() == 0) {
			return null;
		}

		return this.aggExplicitHashKey;
	}

	/**
	 * Based on the current size of this aggregated record, calculate what the new
	 * size would be if we added another user record with the specified parameters
	 * (used to determine when this aggregated record is full and can't accept any
	 * more user records). This calculation is highly dependent on the Kinesis
	 * aggregated message format.
	 * 
	 * @param partitionKey    The partition key of the new record to simulate adding
	 * @param explicitHashKey The explicit hash key of the new record to simulate
	 *                        adding
	 * @param data            The raw data of the new record to simulate adding
	 * @return The new size of this existing record in bytes if a new user record
	 *         with the specified parameters was added.
	 * @see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
	 */
	private int calculateRecordSize(String partitionKey, String explicitHashKey, byte[] data) {
		int messageSize = 0;

		// has the partition key been added to the table of known PKs yet?
		if (!this.partitionKeys.contains(partitionKey)) {
			int pkLength = partitionKey.getBytes().length;
			messageSize += 1; // (message index + wire type for PK table)
			messageSize += calculateVarintSize(pkLength); // size of pk length
															// value
			messageSize += pkLength; // actual pk length
		}

		// has the explicit hash key been added to the table of known EHKs yet?
		if (!this.explicitHashKeys.contains(explicitHashKey)) {
			int ehkLength = explicitHashKey.getBytes().length;
			messageSize += 1; // (message index + wire type for EHK table)
			messageSize += calculateVarintSize(ehkLength); /* size of ehk length value */
			messageSize += ehkLength; // actual ehk length
		}

		// remaining calculations are for adding the new record to the list of
		// records

		long innerRecordSize = 0;

		// partition key field
		innerRecordSize += 1; // (message index + wire type for PK index)
		innerRecordSize += calculateVarintSize(
				this.partitionKeys.getPotentialIndex(partitionKey)); /* size of pk index value */

		// explicit hash key field (this is optional)
		if (explicitHashKey != null) {
			innerRecordSize += 1; // (message index + wire type for EHK index)
			innerRecordSize += calculateVarintSize(
					this.explicitHashKeys.getPotentialIndex(explicitHashKey)); /* size of ehk index value */
		}

		// data field
		innerRecordSize += 1; // (message index + wire type for record data)

		innerRecordSize += calculateVarintSize(data.length); /* size of data length value */
		innerRecordSize += data.length; // actual data length

		messageSize += 1; // (message index + wire type for record)
		messageSize += calculateVarintSize(innerRecordSize); /* size of entire record length value */
		messageSize += innerRecordSize; // actual entire record length

		return messageSize;
	}

	/**
	 * For an integral value represented by a varint, calculate how many bytes are
	 * necessary to represent the value in a protobuf message.
	 * 
	 * @param value The value whose varint size will be calculated
	 * @return The number of bytes necessary to represent the input value as a
	 *         varint.
	 * @see https://developers.google.com/protocol-buffers/docs/encoding#varints
	 */
	private int calculateVarintSize(long value) {
		if (value < 0) {
			throw new IllegalArgumentException("Size values should not be negative.");
		}

		int numBitsNeeded = 0;
		if (value == 0) {
			numBitsNeeded = 1;
		} else {
			// shift the value right one bit at a time until
			// there are no more '1' bits left...this counts
			// how many bits we need to represent the number
			while (value > 0) {
				numBitsNeeded++;
				value = value >> 1;
			}
		}

		// varints only use 7 bits of the byte for the actual value
		int numVarintBytes = numBitsNeeded / 7;
		if (numBitsNeeded % 7 > 0) {
			numVarintBytes += 1;
		}

		return numVarintBytes;
	}

	/**
	 * Add a new user record to this existing aggregated record if there is enough
	 * space (based on the defined Kinesis limits for a PutRecord call).
	 * 
	 * @param partitionKey    The partition key of the new user record to add
	 * @param explicitHashKey The explicit hash key of the new user record to add
	 * @param data            The raw data of the new user record to add
	 * @return True if the new user record was successfully added to this aggregated
	 *         record or false if this aggregated record is too full.
	 */
	public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) {
		// set the explicit hash key for the message to the partition key -
		// required for encoding
		explicitHashKey = explicitHashKey != null ? explicitHashKey : createExplicitHashKey(partitionKey);

		// validate values from the provided message
		validatePartitionKey(partitionKey);
		validateExplicitHashKey(explicitHashKey);
		validateData(data);

		// Validate new record size won't overflow max size for a
		// PutRecordRequest
		int sizeOfNewRecord = calculateRecordSize(partitionKey, explicitHashKey, data);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



java/KinesisAggregatorV2/src/main/java/com/amazonaws/kinesis/agg/AggRecord.java [56:329]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
@NotThreadSafe
public class AggRecord {
	// Serialization protocol constants via the specification at
	// https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
	private static final byte[] AGGREGATED_RECORD_MAGIC = new byte[] { (byte) 0xf3, (byte) 0x89, (byte) 0x9a,
			(byte) 0xc2 };
	protected static final String MESSAGE_DIGEST_NAME = "MD5";
	private static final BigInteger UINT_128_MAX = new BigInteger(StringUtils.repeat("FF", 16), 16);

	// Kinesis Limits
	// (https://docs.aws.amazon.com/kinesis/latest/APIReference/API_PutRecord.html)
	protected static final int MAX_BYTES_PER_RECORD = 1024 * 1024; // 1 MB
	protected static final int AGGREGATION_OVERHEAD_BYTES = 256;
	protected static final int PARTITION_KEY_MIN_LENGTH = 1;
	protected static final int PARTITION_KEY_MAX_LENGTH = 256;

	/** The current size of the aggregated protobuf message. */
	private int aggregatedMessageSizeBytes;
	/** The set of unique explicit hash keys in the protocol buffer message. */
	private final KeySet explicitHashKeys;
	/** The set of unique partition keys in the protocol buffer message. */
	private final KeySet partitionKeys;
	/**
	 * The current builder object by which we're actively constructing a record.
	 */
	private AggregatedRecord.Builder aggregatedRecordBuilder;
	/**
	 * The message digest to use for calculating MD5 checksums per the protocol
	 * specification.
	 */
	private final MessageDigest md5;
	/** The partition key for the entire aggregated record. */
	private String aggPartitionKey;
	/** The explicit hash key for the entire aggregated record. */
	private String aggExplicitHashKey;

	/**
	 * Construct a new (empty) aggregated Kinesis record.
	 */
	public AggRecord() {
		this.aggregatedRecordBuilder = AggregatedRecord.newBuilder();
		this.aggregatedMessageSizeBytes = 0;
		this.explicitHashKeys = new KeySet();
		this.partitionKeys = new KeySet();

		this.aggExplicitHashKey = "";
		this.aggPartitionKey = "";

		try {
			this.md5 = MessageDigest.getInstance(MESSAGE_DIGEST_NAME);
		} catch (NoSuchAlgorithmException e) {
			throw new IllegalStateException("Could not create an MD5 message digest.", e);
		}
	}

	/**
	 * Get the current number of user records contained inside this aggregate
	 * record.
	 * 
	 * @return The current number of user records added via the "addUserRecord(...)"
	 *         method.
	 */
	public int getNumUserRecords() {
		return this.aggregatedRecordBuilder.getRecordsCount();
	}

	/**
	 * Get the current size in bytes of the fully serialized aggregated record.
	 * 
	 * @return The current size in bytes of this message in its serialized form.
	 */
	public int getSizeBytes() {
		if (getNumUserRecords() == 0) {
			return 0;
		}

		return AGGREGATED_RECORD_MAGIC.length + this.aggregatedMessageSizeBytes + this.md5.getDigestLength();
	}

	/**
	 * Serialize this record to bytes. Has no side effects (i.e. does not affect the
	 * contents of this record object).
	 * 
	 * @return A byte array containing an Kinesis aggregated format-compatible
	 *         Kinesis record.
	 */
	public byte[] toRecordBytes() {
		if (getNumUserRecords() == 0) {
			return new byte[0];
		}

		byte[] messageBody = this.aggregatedRecordBuilder.build().toByteArray();

		this.md5.reset();
		byte[] messageDigest = this.md5.digest(messageBody);

		// The way Java's API works is that write(byte[]) throws IOException on
		// a ByteArrayOutputStream, but
		// write(byte[],int,int) doesn't so that's why we're using the long
		// version of "write" here
		ByteArrayOutputStream baos = new ByteArrayOutputStream(getSizeBytes());
		baos.write(AGGREGATED_RECORD_MAGIC, 0, AGGREGATED_RECORD_MAGIC.length);
		baos.write(messageBody, 0, messageBody.length);
		baos.write(messageDigest, 0, messageDigest.length);

		return baos.toByteArray();
	}

	/**
	 * Clears out all records and metadata from this object so that it can be reused
	 * just like a fresh instance of this object.
	 */
	public void clear() {
		this.md5.reset();
		this.aggExplicitHashKey = "";
		this.aggPartitionKey = "";
		this.aggregatedMessageSizeBytes = 0;
		this.explicitHashKeys.clear();
		this.partitionKeys.clear();
		this.aggregatedRecordBuilder = AggregatedRecord.newBuilder();
	}

	/**
	 * Get the overarching partition key for the entire aggregated record.
	 * 
	 * @return The partition key to use for the aggregated record or null if this
	 *         aggregated record is empty.
	 */
	public String getPartitionKey() {
		if (getNumUserRecords() == 0) {
			return null;
		}

		return this.aggPartitionKey;
	}

	/**
	 * Get the overarching explicit hash key for the entire aggregated record.
	 * 
	 * @return The explicit hash key to use for the aggregated record or null if
	 *         this aggregated record is empty.
	 */
	public String getExplicitHashKey() {
		if (getNumUserRecords() == 0) {
			return null;
		}

		return this.aggExplicitHashKey;
	}

	/**
	 * Based on the current size of this aggregated record, calculate what the new
	 * size would be if we added another user record with the specified parameters
	 * (used to determine when this aggregated record is full and can't accept any
	 * more user records). This calculation is highly dependent on the Kinesis
	 * aggregated message format.
	 * 
	 * @param partitionKey    The partition key of the new record to simulate adding
	 * @param explicitHashKey The explicit hash key of the new record to simulate
	 *                        adding
	 * @param data            The raw data of the new record to simulate adding
	 * @return The new size of this existing record in bytes if a new user record
	 *         with the specified parameters was added.
	 * @see https://github.com/awslabs/amazon-kinesis-producer/blob/master/aggregation-format.md
	 */
	private int calculateRecordSize(String partitionKey, String explicitHashKey, byte[] data) {
		int messageSize = 0;

		// has the partition key been added to the table of known PKs yet?
		if (!this.partitionKeys.contains(partitionKey)) {
			int pkLength = partitionKey.getBytes().length;
			messageSize += 1; // (message index + wire type for PK table)
			messageSize += calculateVarintSize(pkLength); // size of pk length
															// value
			messageSize += pkLength; // actual pk length
		}

		// has the explicit hash key been added to the table of known EHKs yet?
		if (!this.explicitHashKeys.contains(explicitHashKey)) {
			int ehkLength = explicitHashKey.getBytes().length;
			messageSize += 1; // (message index + wire type for EHK table)
			messageSize += calculateVarintSize(ehkLength); /* size of ehk length value */
			messageSize += ehkLength; // actual ehk length
		}

		// remaining calculations are for adding the new record to the list of
		// records

		long innerRecordSize = 0;

		// partition key field
		innerRecordSize += 1; // (message index + wire type for PK index)
		innerRecordSize += calculateVarintSize(
				this.partitionKeys.getPotentialIndex(partitionKey)); /* size of pk index value */

		// explicit hash key field (this is optional)
		if (explicitHashKey != null) {
			innerRecordSize += 1; // (message index + wire type for EHK index)
			innerRecordSize += calculateVarintSize(
					this.explicitHashKeys.getPotentialIndex(explicitHashKey)); /* size of ehk index value */
		}

		// data field
		innerRecordSize += 1; // (message index + wire type for record data)

		innerRecordSize += calculateVarintSize(data.length); /* size of data length value */
		innerRecordSize += data.length; // actual data length

		messageSize += 1; // (message index + wire type for record)
		messageSize += calculateVarintSize(innerRecordSize); /* size of entire record length value */
		messageSize += innerRecordSize; // actual entire record length

		return messageSize;
	}

	/**
	 * For an integral value represented by a varint, calculate how many bytes are
	 * necessary to represent the value in a protobuf message.
	 * 
	 * @param value The value whose varint size will be calculated
	 * @return The number of bytes necessary to represent the input value as a
	 *         varint.
	 * @see https://developers.google.com/protocol-buffers/docs/encoding#varints
	 */
	private int calculateVarintSize(long value) {
		if (value < 0) {
			throw new IllegalArgumentException("Size values should not be negative.");
		}

		int numBitsNeeded = 0;
		if (value == 0) {
			numBitsNeeded = 1;
		} else {
			// shift the value right one bit at a time until
			// there are no more '1' bits left...this counts
			// how many bits we need to represent the number
			while (value > 0) {
				numBitsNeeded++;
				value = value >> 1;
			}
		}

		// varints only use 7 bits of the byte for the actual value
		int numVarintBytes = numBitsNeeded / 7;
		if (numBitsNeeded % 7 > 0) {
			numVarintBytes += 1;
		}

		return numVarintBytes;
	}

	/**
	 * Add a new user record to this existing aggregated record if there is enough
	 * space (based on the defined Kinesis limits for a PutRecord call).
	 * 
	 * @param partitionKey    The partition key of the new user record to add
	 * @param explicitHashKey The explicit hash key of the new user record to add
	 * @param data            The raw data of the new user record to add
	 * @return True if the new user record was successfully added to this aggregated
	 *         record or false if this aggregated record is too full.
	 */
	public boolean addUserRecord(String partitionKey, String explicitHashKey, byte[] data) {
		// set the explicit hash key for the message to the partition key -
		// required for encoding
		explicitHashKey = explicitHashKey != null ? explicitHashKey : createExplicitHashKey(partitionKey);

		// validate values from the provided message
		validatePartitionKey(partitionKey);
		validateExplicitHashKey(explicitHashKey);
		validateData(data);

		// Validate new record size won't overflow max size for a
		// PutRecordRequest
		int sizeOfNewRecord = calculateRecordSize(partitionKey, explicitHashKey, data);
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



