public List getData()

in src/main/java/com/amazonaws/services/kinesis/io/ObjectExtractor.java [192:279]


    public List<AggregateData> getData(InputEvent event) throws SerializationException {
        if (!validated) {
            try {
                validate();
            } catch (Exception e) {
                throw new SerializationException(e);
            }
        }

        try {
            List<AggregateData> data = new ArrayList<>();

            Object o = serialiser.toClass(event);

            // get the value of the reflected labels
            LabelSet labels = new LabelSet();
            for (String key : this.aggregateLabelMethods) {
                labels.put(key, aggregateLabelMethodMap.get(key).invoke(o).toString());
            }

            // get the unique ID value from the object
            String uniqueId = null;
            if (this.uniqueIdMethodName != null) {
                switch (this.uniqueIdMethodName) {
                    case StreamAggregator.REF_PARTITION_KEY:
                        uniqueId = event.getPartitionKey();
                        break;
                    case StreamAggregator.REF_SEQUENCE:
                        uniqueId = event.getSequenceNumber();
                        break;
                    default:
                        Object id = uniqueIdMethod.invoke(o);
                        if (id != null) {
                            uniqueId = id.toString();
                        }
                        break;
                }
            }

            // get the date value from the object
            if (this.dateMethod != null) {
                eventDate = dateMethod.invoke(o);

                if (eventDate == null) {
                    dateValue = new Date(System.currentTimeMillis());
                } else {
                    if (eventDate instanceof Date) {
                        dateValue = (Date) eventDate;
                    } else if (eventDate instanceof Long) {
                        dateValue = new Date((Long) eventDate);
                    } else {
                        throw new Exception(String.format(
                                "Cannot use data type %s for date value on event",
                                eventDate.getClass().getSimpleName()));
                    }
                }
            }

            // extract all summed values from the serialised object
            if (this.aggregatorType.equals(AggregatorType.SUM)) {
                // lift out the aggregated method value
                for (String s : this.sumValueMap.keySet()) {
                    summaryValue = this.sumValueMap.get(s).invoke(o);

                    if (summaryValue != null) {
                        if (summaryValue instanceof Double) {
                            sumUpdates.put(s, (Double) summaryValue);
                        } else if (summaryValue instanceof Long) {
                            sumUpdates.put(s, ((Long) summaryValue).doubleValue());
                        } else if (summaryValue instanceof Integer) {
                            sumUpdates.put(s, ((Integer) summaryValue).doubleValue());
                        } else {
                            String msg = String.format(
                                    "Unable to access  Summary %s due to NumberFormatException", s);
                            LOG.error(msg);
                            throw new SerializationException(msg);
                        }
                    }
                }
            }

            data.add(new AggregateData(uniqueId, labels, dateValue, sumUpdates));

            return data;
        } catch (Exception e) {
            throw new SerializationException(e);
        }
    }