private Behavior onQuery()

in pekko-sample-sharding-java/killrweather/src/main/java/sample/killrweather/WeatherStation.java [197:228]


  private Behavior<Command> onQuery(Query query) {
    List<Data> dataForType = values.stream().filter(d -> d.dataType == query.dataType).collect(Collectors.toList());
    final List<TimeWindow> queryResult;
    if (dataForType.isEmpty()) {
      queryResult = Collections.emptyList();
    } else {
      switch (query.func) {
        case Average:
          long start = dataForType.stream().findFirst().map(d -> d.eventTime).orElse(0L);
          long end = dataForType.isEmpty() ? 0 : dataForType.get(dataForType.size() - 1).eventTime;
          List<Double> valuesForType = dataForType.stream().map(d -> d.value).collect(Collectors.toList());
          queryResult = Collections.singletonList(new TimeWindow(start, end, average(valuesForType)));
          break;
        case HighLow:
          Data min = dataForType.stream().reduce((a, b) -> a.value < b.value ? a : b).get();
          Data max = dataForType.stream().reduce((a, b) -> a.value > b.value ? a : b).get();
          queryResult = Arrays.asList(
              new TimeWindow(min.eventTime, max.eventTime, min.value),
              new TimeWindow(min.eventTime, max.eventTime, max.value));
          break;
        case Current:
          // we know it is not empty from up above
          Data current = dataForType.get(dataForType.size() - 1);
          queryResult = Collections.singletonList(new TimeWindow(current.eventTime, current.eventTime, current.value));
          break;
        default:
          throw new IllegalArgumentException("Unknown operation " + query.func);
      }
    }
    query.replyTo.tell(new QueryResult(wsid, query.dataType, query.func, dataForType.size(), queryResult));
    return this;
  }