in pekko-sample-sharding-java/killrweather/src/main/java/sample/killrweather/WeatherStation.java [197:228]
private Behavior<Command> onQuery(Query query) {
List<Data> dataForType = values.stream().filter(d -> d.dataType == query.dataType).collect(Collectors.toList());
final List<TimeWindow> queryResult;
if (dataForType.isEmpty()) {
queryResult = Collections.emptyList();
} else {
switch (query.func) {
case Average:
long start = dataForType.stream().findFirst().map(d -> d.eventTime).orElse(0L);
long end = dataForType.isEmpty() ? 0 : dataForType.get(dataForType.size() - 1).eventTime;
List<Double> valuesForType = dataForType.stream().map(d -> d.value).collect(Collectors.toList());
queryResult = Collections.singletonList(new TimeWindow(start, end, average(valuesForType)));
break;
case HighLow:
Data min = dataForType.stream().reduce((a, b) -> a.value < b.value ? a : b).get();
Data max = dataForType.stream().reduce((a, b) -> a.value > b.value ? a : b).get();
queryResult = Arrays.asList(
new TimeWindow(min.eventTime, max.eventTime, min.value),
new TimeWindow(min.eventTime, max.eventTime, max.value));
break;
case Current:
// we know it is not empty from up above
Data current = dataForType.get(dataForType.size() - 1);
queryResult = Collections.singletonList(new TimeWindow(current.eventTime, current.eventTime, current.value));
break;
default:
throw new IllegalArgumentException("Unknown operation " + query.func);
}
}
query.replyTo.tell(new QueryResult(wsid, query.dataType, query.func, dataForType.size(), queryResult));
return this;
}