in src/main/java/com/amazonaws/services/kinesis/io/JsonDataExtractor.java [77:188]
public List<AggregateData> getData(InputEvent event)
throws SerializationException {
try {
List<AggregateData> aggregateData = new ArrayList<>();
Date dateValue = null;
JsonNode jsonContent = null;
String dateString, summary = null;
sumUpdates = new HashMap<>();
List<String> items = (List<String>) serialiser.toClass(event);
// log a warning if we didn't get anything back from the serialiser
// - this could be OK, but probably isn't
if (items == null || items.size() == 0)
LOG.warn(String
.format("Failed to deserialise any content for Record (Partition Key %s, Sequence %s",
event.getPartitionKey(),
event.getSequenceNumber()));
// process all the items returned by the serialiser
for (String item : items) {
// Convert the string to a Jackson JsonNode for navigation
jsonContent = StreamAggregatorUtils.asJsonNode(item);
LabelSet labels = new LabelSet();
for (String key : this.labelAttributes) {
labels.put(key, StreamAggregatorUtils.readValueAsString(
jsonContent, key));
}
// get the unique ID for the event
String uniqueId = null;
if (this.uniqueIdAttribute != null) {
switch (this.uniqueIdAttribute) {
case StreamAggregator.REF_PARTITION_KEY:
uniqueId = event.getPartitionKey();
break;
case StreamAggregator.REF_SEQUENCE:
uniqueId = event.getSequenceNumber();
break;
default:
uniqueId = StreamAggregatorUtils.readValueAsString(
jsonContent, uniqueIdAttribute);
break;
}
}
// get the date value from the line
if (dateValueAttribute != null) {
dateString = StreamAggregatorUtils.readValueAsString(
jsonContent, dateValueAttribute);
// bail on no date returned
if (dateString == null || dateString.equals(""))
throw new SerializationException(
String.format(
"Unable to read date value attribute %s from JSON Content %s",
dateValueAttribute, item));
// turn date as long or string into Date
if (this.dateFormat != null) {
dateValue = dateFormatter.parse(dateString);
} else {
// no formatter, so treat as epoch seconds
try {
dateValue = new Date(Long.parseLong(dateString));
} catch (Exception e) {
LOG.error(String
.format("Unable to create Date Value element from item '%s' due to invalid format as Epoch Seconds",
dateValueAttribute));
throw new SerializationException(e);
}
}
} else {
// no date value attribute configured, so use now
dateValue = new Date(System.currentTimeMillis());
}
// get the summed values
if (this.aggregatorType.equals(AggregatorType.SUM)) {
// get the positional sum items
for (String s : summaryConfig.getItemSet()) {
try {
summary = StreamAggregatorUtils.readValueAsString(
jsonContent, s);
// if a summary is not found in the data element,
// then we simply continue without it
// StreamAggregatorUtils.readValueAsString returns
// "" if
// attribute is not found.
if (summary != null && !summary.equals("")) {
sumUpdates.put(s, Double.parseDouble(summary));
}
} catch (NumberFormatException nfe) {
LOG.error(String
.format("Unable to deserialise Summary '%s' due to NumberFormatException",
s));
throw new SerializationException(nfe);
}
}
}
aggregateData.add(new AggregateData(uniqueId, labels,
dateValue, sumUpdates));
}
return aggregateData;
} catch (Exception e) {
throw new SerializationException(e);
}
}