in analytics-kotlin/src/main/kotlin/services/kinesisanalytics/operators/OffsetFutureTimestreamPoints.kt [16:27]
override fun processElement(
points: Collection<TimestreamPoint>, ctx: Context,
out: Collector<Collection<TimestreamPoint>>
) {
points.asSequence()
.filter { pointTimestamp(it) > System.currentTimeMillis() + TIMESTREAM_FUTURE_THRESHOLD }
.forEach {
it.time = ctx.timestamp()
it.timeUnit = TimeUnit.MILLISECONDS.name
}
out.collect(points)
}