java/KinesisTestProducers/src/main/java/com/amazonaws/kinesis/producer/SampleAggregatorProducer.java [35:65]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private static void sendRecord(AmazonKinesis producer, String streamName, AggRecord aggRecord) {
		if (aggRecord == null || aggRecord.getNumUserRecords() == 0) {
			return;
		}

		System.out.println("Submitting record EHK=" + aggRecord.getExplicitHashKey() + " NumRecords="
				+ aggRecord.getNumUserRecords() + " NumBytes=" + aggRecord.getSizeBytes());
		try {
			producer.putRecord(aggRecord.toPutRecordRequest(streamName));
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("Completed record EHK=" + aggRecord.getExplicitHashKey());
	}

	/**
	 * Flush out and send any remaining records from the aggregator and then
	 * wait for all pending transmissions to finish.
	 */
	private static void flushAndFinish(AmazonKinesis producer, String streamName, RecordAggregator aggregator) {
		// Do one final flush & send to get any remaining records that haven't
		// triggered a callback yet
		AggRecord finalRecord = aggregator.clearAndGet();
		ForkJoinPool.commonPool().execute(() -> {
			sendRecord(producer, streamName, finalRecord);
		});

		// Wait up to 2 minutes for all the publisher threads to finish
		System.out.println("Waiting for all transmissions to complete...");
		ForkJoinPool.commonPool().awaitQuiescence(2, TimeUnit.MINUTES);
		System.out.println("Transmissions complete.");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



java/KinesisTestProducers/src/main/java/com/amazonaws/kinesis/producer/SampleAggregatorProducerKCLCompliant.java [35:65]:
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -
	private static void sendRecord(AmazonKinesis producer, String streamName, AggRecord aggRecord) {
		if (aggRecord == null || aggRecord.getNumUserRecords() == 0) {
			return;
		}

		System.out.println("Submitting record EHK=" + aggRecord.getExplicitHashKey() + " NumRecords="
				+ aggRecord.getNumUserRecords() + " NumBytes=" + aggRecord.getSizeBytes());
		try {
			producer.putRecord(aggRecord.toPutRecordRequest(streamName));
		} catch (Exception e) {
			e.printStackTrace();
		}
		System.out.println("Completed record EHK=" + aggRecord.getExplicitHashKey());
	}

	/**
	 * Flush out and send any remaining records from the aggregator and then
	 * wait for all pending transmissions to finish.
	 */
	private static void flushAndFinish(AmazonKinesis producer, String streamName, RecordAggregator aggregator) {
		// Do one final flush & send to get any remaining records that haven't
		// triggered a callback yet
		AggRecord finalRecord = aggregator.clearAndGet();
		ForkJoinPool.commonPool().execute(() -> {
			sendRecord(producer, streamName, finalRecord);
		});

		// Wait up to 2 minutes for all the publisher threads to finish
		System.out.println("Waiting for all transmissions to complete...");
		ForkJoinPool.commonPool().awaitQuiescence(2, TimeUnit.MINUTES);
		System.out.println("Transmissions complete.");
- - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - - -



