private def average()

in pekko-sample-sharding-scala/killrweather/src/main/scala/sample/killrweather/WeatherStation.scala [91:142]


  private def average(values: Vector[Double]): Double =
    if (values.isEmpty) Double.NaN
    else values.sum / values.size

  private def running(context: ActorContext[Command], wsid: String, values: Vector[Data]): Behavior[Command] =
    Behaviors.receiveMessage[Command] {
      case Record(data, received, replyTo) =>
        val updated = values :+ data
        if (context.log.isDebugEnabled) {
          val averageForSameType = average(updated.filter(_.dataType == data.dataType).map(_.value))
          context.log.debugN(
            "{} total readings from station {}, type {}, average {}, diff: processingTime - eventTime: {} ms",
            updated.size,
            wsid,
            data.dataType,
            averageForSameType,
            received - data.eventTime)
        }
        replyTo ! DataRecorded(wsid)
        running(context, wsid, updated) // store

      case Query(dataType, func, replyTo) =>
        val valuesForType = values.filter(_.dataType == dataType)
        val queryResult: Vector[TimeWindow] =
          if (valuesForType.isEmpty) Vector.empty
          else
            func match {
              case Function.Average =>
                val start: Long = valuesForType.head.eventTime
                val end: Long = valuesForType.last.eventTime
                Vector(TimeWindow(start, end, average(valuesForType.map(_.value))))

              case Function.HighLow =>
                val (start, min) = valuesForType.map(e => e.eventTime -> e.value).min
                val (end, max) = valuesForType.map(e => e.eventTime -> e.value).max
                Vector(TimeWindow(start, end, min), TimeWindow(start, end, max))

              case Function.Current =>
                // we know it is not empty from above
                Vector(valuesForType.lastOption
                  .map(e => TimeWindow(e.eventTime, e.eventTime, e.value))
                  .get)

            }
        replyTo ! QueryResult(wsid, dataType, func, valuesForType.size, queryResult)
        Behaviors.same

    }.receiveSignal {
      case (_, PostStop) =>
        context.log.info("Stopping, losing all recorded state for station {}", wsid)
        Behaviors.same
    }