in src/main/java/com/amazonaws/services/kinesis/io/StringDataExtractor.java [106:183]
public List<AggregateData> getData(InputEvent event) throws SerializationException {
try {
int summaryIndex = -1;
String dateString;
Date dateValue;
List<AggregateData> data = new ArrayList<>();
List<List<String>> content = serialiser.toClass(event);
for (List<String> line : content) {
if (line != null) {
LabelSet labels = new LabelSet();
labels.withAlias(this.labelAttributeAlias);
for (Integer key : this.labelIndicies) {
labels.put("" + key, line.get(key));
}
// get the unique index
String uniqueId = null;
if (this.usePartitionKeyForUnique) {
uniqueId = event.getPartitionKey();
} else if (this.useSequenceForUnique) {
uniqueId = event.getSequenceNumber();
} else {
if (this.uniqueIdIndex != -1) {
uniqueId = line.get(this.uniqueIdIndex);
}
}
// get the date value from the line
if (this.dateValueIndex != -1) {
dateString = line.get(dateValueIndex);
if (this.dateFormat != null) {
dateValue = dateFormatter.parse(dateString);
} else {
// no formatter, so treat as epoch seconds
try {
dateValue = new Date(Long.parseLong(dateString));
} catch (Exception e) {
LOG.error(String.format(
"Unable to create Date Value element from item '%s' due to invalid format as Epoch Seconds",
dateValueIndex));
throw new SerializationException(e);
}
}
} else {
dateValue = new Date(System.currentTimeMillis());
}
// get the summed values
if (this.aggregatorType.equals(AggregatorType.SUM)) {
sumUpdates = new HashMap<>();
// get the positional sum items
for (int i = 0; i < summaryIndicies.size(); i++) {
summaryIndex = summaryIndicies.get(i);
try {
sumUpdates.put("" + summaryIndex,
Double.parseDouble(line.get(summaryIndex)));
} catch (NumberFormatException nfe) {
LOG.error(String.format(
"Unable to deserialise Summary '%s' due to NumberFormatException",
i));
throw new SerializationException(nfe);
}
}
}
data.add(new AggregateData(uniqueId, labels, dateValue, sumUpdates));
}
}
return data;
} catch (Exception e) {
throw new SerializationException(e);
}
}