in analytics-kotlin/src/main/kotlin/services/kinesisanalytics/operators/JsonToTimestreamPayloadFn.kt [22:52]
override fun map(jsonString: String): List<TimestreamPoint> {
val map = Gson().fromJson<HashMap<String, String>>(
jsonString,
object : TypeToken<HashMap<String, String>>() {}.type
)
val basePoint = TimestreamPoint()
val measures = HashMap<String, String>(map.size)
for ((key, value) in map) {
if (key.lowercase(Locale.ENGLISH).endsWith("_measure")) {
measures[key] = value
continue
}
when (key.lowercase(Locale.ENGLISH)) {
"time" -> basePoint.time = value.toLong()
"timeunit" -> basePoint.timeUnit = value
else -> basePoint.addDimension(key, value)
}
}
LOG.trace("mapped to point {}", basePoint)
return measures.entries.asSequence()
.map {
basePoint.copy(
measureName = it.key, measureValue = it.value,
measureValueType = MeasureValueType.DOUBLE
)
}
.toList()
}