public List getData()

in src/main/java/com/amazonaws/services/kinesis/io/JsonDataExtractor.java [77:188]


	public List<AggregateData> getData(InputEvent event)
			throws SerializationException {
		try {
			List<AggregateData> aggregateData = new ArrayList<>();
			Date dateValue = null;
			JsonNode jsonContent = null;
			String dateString, summary = null;
			sumUpdates = new HashMap<>();

			List<String> items = (List<String>) serialiser.toClass(event);

			// log a warning if we didn't get anything back from the serialiser
			// - this could be OK, but probably isn't
			if (items == null || items.size() == 0)
				LOG.warn(String
						.format("Failed to deserialise any content for Record (Partition Key %s, Sequence %s",
								event.getPartitionKey(),
								event.getSequenceNumber()));

			// process all the items returned by the serialiser
			for (String item : items) {
				// Convert the string to a Jackson JsonNode for navigation
				jsonContent = StreamAggregatorUtils.asJsonNode(item);

				LabelSet labels = new LabelSet();
				for (String key : this.labelAttributes) {
					labels.put(key, StreamAggregatorUtils.readValueAsString(
							jsonContent, key));
				}

				// get the unique ID for the event
				String uniqueId = null;
				if (this.uniqueIdAttribute != null) {
					switch (this.uniqueIdAttribute) {
					case StreamAggregator.REF_PARTITION_KEY:
						uniqueId = event.getPartitionKey();
						break;
					case StreamAggregator.REF_SEQUENCE:
						uniqueId = event.getSequenceNumber();
						break;
					default:
						uniqueId = StreamAggregatorUtils.readValueAsString(
								jsonContent, uniqueIdAttribute);
						break;
					}
				}

				// get the date value from the line
				if (dateValueAttribute != null) {
					dateString = StreamAggregatorUtils.readValueAsString(
							jsonContent, dateValueAttribute);

					// bail on no date returned
					if (dateString == null || dateString.equals(""))
						throw new SerializationException(
								String.format(
										"Unable to read date value attribute %s from JSON Content %s",
										dateValueAttribute, item));

					// turn date as long or string into Date
					if (this.dateFormat != null) {
						dateValue = dateFormatter.parse(dateString);
					} else {
						// no formatter, so treat as epoch seconds
						try {
							dateValue = new Date(Long.parseLong(dateString));
						} catch (Exception e) {
							LOG.error(String
									.format("Unable to create Date Value element from item '%s' due to invalid format as Epoch Seconds",
											dateValueAttribute));
							throw new SerializationException(e);
						}
					}
				} else {
					// no date value attribute configured, so use now
					dateValue = new Date(System.currentTimeMillis());
				}

				// get the summed values
				if (this.aggregatorType.equals(AggregatorType.SUM)) {
					// get the positional sum items
					for (String s : summaryConfig.getItemSet()) {
						try {
							summary = StreamAggregatorUtils.readValueAsString(
									jsonContent, s);

							// if a summary is not found in the data element,
							// then we simply continue without it
							// StreamAggregatorUtils.readValueAsString returns
							// "" if
							// attribute is not found.
							if (summary != null && !summary.equals("")) {
								sumUpdates.put(s, Double.parseDouble(summary));
							}
						} catch (NumberFormatException nfe) {
							LOG.error(String
									.format("Unable to deserialise Summary '%s' due to NumberFormatException",
											s));
							throw new SerializationException(nfe);
						}
					}
				}

				aggregateData.add(new AggregateData(uniqueId, labels,
						dateValue, sumUpdates));
			}

			return aggregateData;
		} catch (Exception e) {
			throw new SerializationException(e);
		}
	}