public void aggregateEvents()

in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregator.java [677:791]


	public void aggregateEvents(List<InputEvent> events) throws Exception {
		start = System.currentTimeMillis();
		int aggregatedEventCount = 0;
		int aggregatedElementCount = 0;

		if (!online) {
			throw new Exception("Aggregator Not Initialised");
		}

		BigInteger thisSequence;
		List<AggregateData> extractedItems = null;
		Date eventDate = null;

		try {
			for (InputEvent event : events) {
				// reset extracted items
				extractedItems = null;

				if (event.getSequenceNumber() != null) {
					thisSequence = new BigInteger(event.getSequenceNumber());
					// ignore any records which are going backward with regard
					// to
					// the current hwm
					if (highSeq != null
							&& highSeq.compareTo(thisSequence) != -1) {
						ignoredRecordsBelowHWM++;
						continue;
					}
				}

				// set the low sequence if this is the first record received
				// after a flush
				if (lowSeq == null)
					lowSeq = event.getSequenceNumber();

				// high sequence is always the latest value
				highSeq = new BigInteger(event.getSequenceNumber());

				// extract the data from the input event
				try {
					extractedItems = dataExtractor.getData(event);
				} catch (SerializationException se) {
					// customer may have elected to suppress serialisation
					// errors if the stream is expected have heterogenous data
					// on it
					if (this.raiseExceptionOnDataExtractionErrors) {
						throw se;
					} else {
						logWarn(String.format(
								"Serialisation Exception Sequence %s Partition Key %s",
								event.getSequenceNumber(),
								event.getPartitionKey()), se);
					}
				}

				// data extractor may have returned multiple data elements, or
				// be empty if there were serialisation problems which are
				// suppressed
				if (extractedItems != null) {
					aggregatedEventCount++;

					for (AggregateData data : extractedItems) {
						// run the idempotency check
						if (!this.idempotencyCheck.doProcess(
								event.getPartitionKey(),
								event.getSequenceNumber(), data,
								event.getData())) {
							logInfo(String
									.format("Ignoring Event %s as it failed Idempotency Check",
											event.getPartitionKey()));
							continue;
						}

						aggregatedElementCount++;

						// if the data extractor didn't have a date value to
						// extract, then use the current time
						eventDate = data.getDate();
						if (eventDate == null) {
							eventDate = new Date(System.currentTimeMillis());
						}

						// generate the local updates, one per time horizon that
						// is requested
						for (TimeHorizon h : timeHorizons) {
							// atomically update the aggregate table with event
							// count or count + summaries
							cache.update(
									aggregatorType,
									data.getLabels(),
									(timeHorizons.size() > 1 ? h
											.getItemWithMultiValueFormat(eventDate)
											: h.getValue(eventDate)), h, event
											.getSequenceNumber(), 1, data
											.getSummaries(), dataExtractor
											.getSummaryConfig());
						}
					}
				}
			}

			logInfo(String
					.format("Aggregation Complete - %s Records and %s Elements in %s ms",
							aggregatedEventCount, aggregatedElementCount,
							(System.currentTimeMillis() - start)));
		} catch (SerializationException se) {
			shutdown(true, InventoryModel.STATE.SERIALISATION_ERROR);
			LOG.error(se);
			throw se;
		} catch (Exception e) {
			shutdown(true, InventoryModel.STATE.UNKNOWN_ERROR);
			LOG.error(e);
			throw e;
		}
	}