public GroupedStream min()

in core/src/main/java/org/apache/rocketmq/streams/core/rstream/GroupedStreamImpl.java [96:124]


    public GroupedStream<K, V> min(SelectAction<? extends Number, V> selectAction) {
        String name = OperatorNameMaker.makeName(MIN_PREFIX, pipeline.getJobId());

        Supplier<Processor<V>> supplier = new AggregateSupplier<>(name, parent.getName(), () -> null, (AggregateAction<K, V, V>) (key, value, accumulator) -> {
            Number number = selectAction.select(value);
            if (accumulator == null) {
                return value;
            } else {
                Number storedMin = selectAction.select(accumulator);
                double newValue = number.doubleValue();
                double oldValue = storedMin.doubleValue();

                if (newValue < oldValue) {
                    return value;
                } else {
                    return accumulator;
                }
            }
        });

        GraphNode graphNode;
        if (this.parent.shuffleNode()) {
            graphNode = new ShuffleProcessorNode<>(name, parent.getName(), supplier);
        } else {
            graphNode = new ProcessorNode<>(name, parent.getName(), supplier);
        }

        return this.pipeline.addGroupedStreamVirtualNode(graphNode, parent);
    }