in src/main/java/com/amazonaws/services/kinesis/aggregators/StreamAggregator.java [677:791]
public void aggregateEvents(List<InputEvent> events) throws Exception {
start = System.currentTimeMillis();
int aggregatedEventCount = 0;
int aggregatedElementCount = 0;
if (!online) {
throw new Exception("Aggregator Not Initialised");
}
BigInteger thisSequence;
List<AggregateData> extractedItems = null;
Date eventDate = null;
try {
for (InputEvent event : events) {
// reset extracted items
extractedItems = null;
if (event.getSequenceNumber() != null) {
thisSequence = new BigInteger(event.getSequenceNumber());
// ignore any records which are going backward with regard
// to
// the current hwm
if (highSeq != null
&& highSeq.compareTo(thisSequence) != -1) {
ignoredRecordsBelowHWM++;
continue;
}
}
// set the low sequence if this is the first record received
// after a flush
if (lowSeq == null)
lowSeq = event.getSequenceNumber();
// high sequence is always the latest value
highSeq = new BigInteger(event.getSequenceNumber());
// extract the data from the input event
try {
extractedItems = dataExtractor.getData(event);
} catch (SerializationException se) {
// customer may have elected to suppress serialisation
// errors if the stream is expected have heterogenous data
// on it
if (this.raiseExceptionOnDataExtractionErrors) {
throw se;
} else {
logWarn(String.format(
"Serialisation Exception Sequence %s Partition Key %s",
event.getSequenceNumber(),
event.getPartitionKey()), se);
}
}
// data extractor may have returned multiple data elements, or
// be empty if there were serialisation problems which are
// suppressed
if (extractedItems != null) {
aggregatedEventCount++;
for (AggregateData data : extractedItems) {
// run the idempotency check
if (!this.idempotencyCheck.doProcess(
event.getPartitionKey(),
event.getSequenceNumber(), data,
event.getData())) {
logInfo(String
.format("Ignoring Event %s as it failed Idempotency Check",
event.getPartitionKey()));
continue;
}
aggregatedElementCount++;
// if the data extractor didn't have a date value to
// extract, then use the current time
eventDate = data.getDate();
if (eventDate == null) {
eventDate = new Date(System.currentTimeMillis());
}
// generate the local updates, one per time horizon that
// is requested
for (TimeHorizon h : timeHorizons) {
// atomically update the aggregate table with event
// count or count + summaries
cache.update(
aggregatorType,
data.getLabels(),
(timeHorizons.size() > 1 ? h
.getItemWithMultiValueFormat(eventDate)
: h.getValue(eventDate)), h, event
.getSequenceNumber(), 1, data
.getSummaries(), dataExtractor
.getSummaryConfig());
}
}
}
}
logInfo(String
.format("Aggregation Complete - %s Records and %s Elements in %s ms",
aggregatedEventCount, aggregatedElementCount,
(System.currentTimeMillis() - start)));
} catch (SerializationException se) {
shutdown(true, InventoryModel.STATE.SERIALISATION_ERROR);
LOG.error(se);
throw se;
} catch (Exception e) {
shutdown(true, InventoryModel.STATE.UNKNOWN_ERROR);
LOG.error(e);
throw e;
}
}