public List getData()

in src/main/java/com/amazonaws/services/kinesis/io/StringDataExtractor.java [106:183]


    public List<AggregateData> getData(InputEvent event) throws SerializationException {
        try {
            int summaryIndex = -1;
            String dateString;
            Date dateValue;
            List<AggregateData> data = new ArrayList<>();

            List<List<String>> content = serialiser.toClass(event);
            for (List<String> line : content) {
                if (line != null) {
                    LabelSet labels = new LabelSet();
                    labels.withAlias(this.labelAttributeAlias);

                    for (Integer key : this.labelIndicies) {
                        labels.put("" + key, line.get(key));
                    }

                    // get the unique index
                    String uniqueId = null;
                    if (this.usePartitionKeyForUnique) {
                        uniqueId = event.getPartitionKey();
                    } else if (this.useSequenceForUnique) {
                        uniqueId = event.getSequenceNumber();
                    } else {
                        if (this.uniqueIdIndex != -1) {
                            uniqueId = line.get(this.uniqueIdIndex);
                        }
                    }

                    // get the date value from the line
                    if (this.dateValueIndex != -1) {
                        dateString = line.get(dateValueIndex);
                        if (this.dateFormat != null) {
                            dateValue = dateFormatter.parse(dateString);
                        } else {
                            // no formatter, so treat as epoch seconds
                            try {
                                dateValue = new Date(Long.parseLong(dateString));
                            } catch (Exception e) {
                                LOG.error(String.format(
                                        "Unable to create Date Value element from item '%s' due to invalid format as Epoch Seconds",
                                        dateValueIndex));
                                throw new SerializationException(e);
                            }
                        }
                    } else {
                        dateValue = new Date(System.currentTimeMillis());
                    }

                    // get the summed values
                    if (this.aggregatorType.equals(AggregatorType.SUM)) {
                        sumUpdates = new HashMap<>();

                        // get the positional sum items
                        for (int i = 0; i < summaryIndicies.size(); i++) {
                            summaryIndex = summaryIndicies.get(i);
                            try {
                                sumUpdates.put("" + summaryIndex,
                                        Double.parseDouble(line.get(summaryIndex)));
                            } catch (NumberFormatException nfe) {
                                LOG.error(String.format(
                                        "Unable to deserialise Summary '%s' due to NumberFormatException",
                                        i));
                                throw new SerializationException(nfe);
                            }
                        }
                    }

                    data.add(new AggregateData(uniqueId, labels, dateValue, sumUpdates));
                }
            }

            return data;
        } catch (Exception e) {
            throw new SerializationException(e);
        }

    }