in analytics/src/main/java/com/amazonaws/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.java [30:65]
public Collection<TimestreamPoint> map(String jsonString) {
HashMap<String, String> map = new Gson().fromJson(jsonString,
new TypeToken<HashMap<String, String>>() {
}.getType());
TimestreamPoint basePoint = new TimestreamPoint();
LOG.info("will map entity {}", map);
Map<String, String> measures = new HashMap<>(map.size());
for (Map.Entry<String, String> entry : map.entrySet()) {
String key = entry.getKey();
String value = entry.getValue();
// assuming these fields are present in every JSON record
if (key.toLowerCase().endsWith("_measure")) {
measures.put(key, value);
continue;
}
switch (key.toLowerCase()) {
case "time":
basePoint.setTime(Long.parseLong(value));
break;
case "timeunit":
basePoint.setTimeUnit(value);
break;
default:
basePoint.addDimension(key, value);
}
}
LOG.info("mapped to point {}", basePoint);
return measures.entrySet().stream()
.map(measure -> new TimestreamPoint(
basePoint, measure.getKey(), measure.getValue(),
MeasureValueType.DOUBLE))
.collect(Collectors.toList());
}