in src/main/java/com/amazonaws/services/kinesis/io/ObjectExtractor.java [192:279]
public List<AggregateData> getData(InputEvent event) throws SerializationException {
if (!validated) {
try {
validate();
} catch (Exception e) {
throw new SerializationException(e);
}
}
try {
List<AggregateData> data = new ArrayList<>();
Object o = serialiser.toClass(event);
// get the value of the reflected labels
LabelSet labels = new LabelSet();
for (String key : this.aggregateLabelMethods) {
labels.put(key, aggregateLabelMethodMap.get(key).invoke(o).toString());
}
// get the unique ID value from the object
String uniqueId = null;
if (this.uniqueIdMethodName != null) {
switch (this.uniqueIdMethodName) {
case StreamAggregator.REF_PARTITION_KEY:
uniqueId = event.getPartitionKey();
break;
case StreamAggregator.REF_SEQUENCE:
uniqueId = event.getSequenceNumber();
break;
default:
Object id = uniqueIdMethod.invoke(o);
if (id != null) {
uniqueId = id.toString();
}
break;
}
}
// get the date value from the object
if (this.dateMethod != null) {
eventDate = dateMethod.invoke(o);
if (eventDate == null) {
dateValue = new Date(System.currentTimeMillis());
} else {
if (eventDate instanceof Date) {
dateValue = (Date) eventDate;
} else if (eventDate instanceof Long) {
dateValue = new Date((Long) eventDate);
} else {
throw new Exception(String.format(
"Cannot use data type %s for date value on event",
eventDate.getClass().getSimpleName()));
}
}
}
// extract all summed values from the serialised object
if (this.aggregatorType.equals(AggregatorType.SUM)) {
// lift out the aggregated method value
for (String s : this.sumValueMap.keySet()) {
summaryValue = this.sumValueMap.get(s).invoke(o);
if (summaryValue != null) {
if (summaryValue instanceof Double) {
sumUpdates.put(s, (Double) summaryValue);
} else if (summaryValue instanceof Long) {
sumUpdates.put(s, ((Long) summaryValue).doubleValue());
} else if (summaryValue instanceof Integer) {
sumUpdates.put(s, ((Integer) summaryValue).doubleValue());
} else {
String msg = String.format(
"Unable to access Summary %s due to NumberFormatException", s);
LOG.error(msg);
throw new SerializationException(msg);
}
}
}
}
data.add(new AggregateData(uniqueId, labels, dateValue, sumUpdates));
return data;
} catch (Exception e) {
throw new SerializationException(e);
}
}