java/KinesisAggregator/src/main/java/com/amazonaws/kinesis/agg/RecordAggregator.java [182:258]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public AggRecord addUserRecord(String partitionKey, byte[] data) throws Exception {
		return addUserRecord(partitionKey, null, data);
	}

	/**
	 * Add a new user record to this aggregated record (will trigger a callback
	 * via onRecordComplete if aggregated record is full).
	 * 
	 * @param partitionKey
	 *            The partition key of the record to add
	 * @param explicitHashKey
	 *            The explicit hash key of the record to add
	 * @param data
	 *            The record data of the record to add
	 * @return A AggRecord if this aggregated record is full and ready to be
	 *         transmitted or null otherwise.
     * @throws Exception If the user record can't be added to the current agg record
	 */
	public AggRecord addUserRecord(String partitionKey, String explicitHashKey, byte[] data) throws Exception {
		boolean success = this.currentRecord.addUserRecord(partitionKey, explicitHashKey, data);

		if (success) {
			// we were able to add the current data to the in-flight record
			return null;
		} else {
			// this record is full, let all the listeners know
			final AggRecord completeRecord = this.currentRecord;
			for (ListenerExecutorPair pair : this.listeners) {
				pair.getExecutor().execute(() -> {
					pair.getListener().recordComplete(completeRecord);
				});
			}

			// current record is full; clear it out, make a new empty one and
			// add the new user record
			clearRecord();
			success = this.currentRecord.addUserRecord(partitionKey, explicitHashKey, data);

			if (!success) {
				throw new Exception(String.format("Unable to add User Record %s, %s with data length %s", partitionKey,
						explicitHashKey, data.length));
			}

			return completeRecord;
		}
	}

	/**
	 * A helper class for tracking callbacks that contains a listener for
	 * callbacks and the executor to execute the callback with.
	 */
	private class ListenerExecutorPair {
		/** The listener to use for making a callback. */
		private RecordCompleteListener listener;
		/** The executor to execute the listener callback on. */
		private Executor executor;

		/**
		 * Create a new listener/executor pair.
		 */
		public ListenerExecutorPair(RecordCompleteListener listener, Executor executor) {
			this.listener = listener;
			this.executor = executor;
		}

		/**
		 * @return Get the listener object.
		 */
		public RecordCompleteListener getListener() {
			return this.listener;
		}

		/**
		 * @return Get the executor associated with the listener.
		 */
		public Executor getExecutor() {
			return this.executor;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



java/KinesisAggregatorV2/src/main/java/com/amazonaws/kinesis/agg/RecordAggregator.java [176:249]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	public AggRecord addUserRecord(String partitionKey, byte[] data) throws Exception {
		return addUserRecord(partitionKey, null, data);
	}

	/**
	 * Add a new user record to this aggregated record (will trigger a callback via
	 * onRecordComplete if aggregated record is full).
	 * 
	 * @param partitionKey    The partition key of the record to add
	 * @param explicitHashKey The explicit hash key of the record to add
	 * @param data            The record data of the record to add
	 * @return A AggRecord if this aggregated record is full and ready to be
	 *         transmitted or null otherwise.
	 * @throws Exception If the user record can't be added to the current agg record
	 */
	public AggRecord addUserRecord(String partitionKey, String explicitHashKey, byte[] data) throws Exception {
		boolean success = this.currentRecord.addUserRecord(partitionKey, explicitHashKey, data);

		if (success) {
			// we were able to add the current data to the in-flight record
			return null;
		} else {
			// this record is full, let all the listeners know
			final AggRecord completeRecord = this.currentRecord;
			for (ListenerExecutorPair pair : this.listeners) {
				pair.getExecutor().execute(() -> {
					pair.getListener().recordComplete(completeRecord);
				});
			}

			// current record is full; clear it out, make a new empty one and
			// add the new user record
			clearRecord();
			success = this.currentRecord.addUserRecord(partitionKey, explicitHashKey, data);

			if (!success) {
				throw new Exception(String.format("Unable to add User Record %s, %s with data length %s", partitionKey,
						explicitHashKey, data.length));
			}

			return completeRecord;
		}
	}

	/**
	 * A helper class for tracking callbacks that contains a listener for callbacks
	 * and the executor to execute the callback with.
	 */
	private class ListenerExecutorPair {
		/** The listener to use for making a callback. */
		private RecordCompleteListener listener;
		/** The executor to execute the listener callback on. */
		private Executor executor;

		/**
		 * Create a new listener/executor pair.
		 */
		public ListenerExecutorPair(RecordCompleteListener listener, Executor executor) {
			this.listener = listener;
			this.executor = executor;
		}

		/**
		 * @return Get the listener object.
		 */
		public RecordCompleteListener getListener() {
			return this.listener;
		}

		/**
		 * @return Get the executor associated with the listener.
		 */
		public Executor getExecutor() {
			return this.executor;
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



