in core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java [127:154]
public GroupedStream<K, V> max(SelectAction<? extends Number, V> selectAction) {
String name = OperatorNameMaker.makeName(MAX_PREFIX, pipeline.getJobId());
Supplier<Processor<V>> supplier = new AggregateSupplier<>(name, parent.getName(), () -> null, (AggregateAction<K, V, V>) (key, value, accumulator) -> {
Number number = selectAction.select(value);
if (accumulator == null) {
return value;
} else {
Number storedMax = selectAction.select(accumulator);
double newValue = number.doubleValue();
double oldValue = storedMax.doubleValue();
if (newValue > oldValue) {
return value;
} else {
return accumulator;
}
}
});
GraphNode graphNode;
if (this.parent.shuffleNode()) {
graphNode = new ShuffleProcessorNode<>(name, parent.getName(), supplier);
} else {
graphNode = new ProcessorNode<>(name, parent.getName(), supplier);
}
return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
}