in pekko-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala [91:142]
private def average(values: Vector[Double]): Double =
if (values.isEmpty) Double.NaN
else values.sum / values.size
private def running(context: ActorContext[Command], wsid: String, values: Vector[Data]): Behavior[Command] =
Behaviors.receiveMessage[Command] {
case Record(data, received, replyTo) =>
val updated = values :+ data
if (context.log.isDebugEnabled) {
val averageForSameType = average(updated.filter(_.dataType == data.dataType).map(_.value))
context.log.debugN(
"{} total readings from station {}, type {}, average {}, diff: processingTime - eventTime: {} ms",
updated.size,
wsid,
data.dataType,
averageForSameType,
received - data.eventTime)
}
replyTo ! DataRecorded(wsid)
running(context, wsid, updated) // store
case Query(dataType, func, replyTo) =>
val valuesForType = values.filter(_.dataType == dataType)
val queryResult: Vector[TimeWindow] =
if (valuesForType.isEmpty) Vector.empty
else
func match {
case Function.Average =>
val start: Long = valuesForType.head.eventTime
val end: Long = valuesForType.last.eventTime
Vector(TimeWindow(start, end, average(valuesForType.map(_.value))))
case Function.HighLow =>
val (start, min) = valuesForType.map(e => e.eventTime -> e.value).min
val (end, max) = valuesForType.map(e => e.eventTime -> e.value).max
Vector(TimeWindow(start, end, min), TimeWindow(start, end, max))
case Function.Current =>
// we know it is not empty from above
Vector(valuesForType.lastOption
.map(e => TimeWindow(e.eventTime, e.eventTime, e.value))
.get)
}
replyTo ! QueryResult(wsid, dataType, func, valuesForType.size, queryResult)
Behaviors.same
}.receiveSignal {
case (_, PostStop) =>
context.log.info("Stopping, losing all recorded state for station {}", wsid)
Behaviors.same
}