def _accumulate()

in flink-python/pyflink/datastream/data_stream.py [0:0]


    def _accumulate(self, position: Union[int, str], acc_type: AccumulateType):
        """
        The base method is used for operators such as min, max, min_by, max_by, sum.
        """
        if not isinstance(position, int) and not isinstance(position, str):
            raise TypeError("The field position must be of int or str type to locate the value to "
                            "calculate for min, max, min_by, max_by and sum."
                            "The given type is: %s" % type(position))

        class AccumulateReduceFunction(ReduceFunction):

            def __init__(self, position, agg_type):
                self._pos = position
                self._agg_type = agg_type
                self._reduce_func = None

            def reduce(self, value1, value2):

                def init_reduce_func(value_to_check):
                    if acc_type == KeyedStream.AccumulateType.MIN_BY:
                        # Logic for min_by operator.
                        def reduce_func(v1, v2):
                            if isinstance(value_to_check, (tuple, list, Row)):
                                return v2 if v2[self._pos] < v1[self._pos] else v1
                            else:
                                return v2 if v2 < v1 else v1
                        self._reduce_func = reduce_func

                    elif acc_type == KeyedStream.AccumulateType.MAX_BY:
                        # Logic for max_by operator.
                        def reduce_func(v1, v2):
                            if isinstance(value_to_check, (tuple, list, Row)):
                                return v2 if v2[self._pos] > v1[self._pos] else v1
                            else:
                                return v2 if v2 > v1 else v1
                        self._reduce_func = reduce_func

                    # for MIN / MAX / SUM
                    elif isinstance(value_to_check, tuple):
                        def reduce_func(v1, v2):
                            v1_list = list(v1)
                            if acc_type == KeyedStream.AccumulateType.MIN:
                                # Logic for min operator with tuple type input.
                                v1_list[self._pos] = v2[self._pos] \
                                    if v2[self._pos] < v1[self._pos] else v1[self._pos]
                            elif acc_type == KeyedStream.AccumulateType.MAX:
                                # Logic for max operator with tuple type input.
                                v1_list[self._pos] = v2[self._pos] \
                                    if v2[self._pos] > v1[self._pos] else v1[self._pos]
                            else:
                                # Logic for sum operator with tuple type input.
                                v1_list[self._pos] = v1[self._pos] + v2[self._pos]
                                return tuple(v1_list)
                            return tuple(v1_list)

                        self._reduce_func = reduce_func

                    elif isinstance(value_to_check, (list, Row)):
                        def reduce_func(v1, v2):
                            if acc_type == KeyedStream.AccumulateType.MIN:
                                # Logic for min operator with List and Row types input.
                                v1[self._pos] = v2[self._pos] \
                                    if v2[self._pos] < v1[self._pos] else v1[self._pos]
                            elif acc_type == KeyedStream.AccumulateType.MAX:
                                # Logic for max operator with List and Row types input.
                                v1[self._pos] = v2[self._pos] \
                                    if v2[self._pos] > v1[self._pos] else v1[self._pos]
                            else:
                                # Logic for sum operator with List and Row types input.
                                v1[self._pos] = v1[self._pos] + v2[self._pos]
                            return v1

                        self._reduce_func = reduce_func

                    else:
                        if self._pos != 0:
                            raise TypeError(
                                "The %s field selected on a basic type. A field expression "
                                "on a basic type can only select the 0th field (which means "
                                "selecting the entire basic type)." % self._pos)

                        def reduce_func(v1, v2):
                            if acc_type == KeyedStream.AccumulateType.MIN:
                                # Logic for min operator with basic type input.
                                return v2 if v2 < v1 else v1
                            elif acc_type == KeyedStream.AccumulateType.MAX:
                                # Logic for max operator with basic type input.
                                return v2 if v2 > v1 else v1
                            else:
                                # Logic for sum operator with basic type input.
                                return v1 + v2

                        self._reduce_func = reduce_func

                if not self._reduce_func:
                    init_reduce_func(value2)
                return self._reduce_func(value1, value2)

        return self.reduce(AccumulateReduceFunction(position, acc_type))