in flink-python/pyflink/datastream/data_stream.py [0:0]
def _accumulate(self, position: Union[int, str], acc_type: AccumulateType):
"""
The base method is used for operators such as min, max, min_by, max_by, sum.
"""
if not isinstance(position, int) and not isinstance(position, str):
raise TypeError("The field position must be of int or str type to locate the value to "
"calculate for min, max, min_by, max_by and sum."
"The given type is: %s" % type(position))
class AccumulateReduceFunction(ReduceFunction):
def __init__(self, position, agg_type):
self._pos = position
self._agg_type = agg_type
self._reduce_func = None
def reduce(self, value1, value2):
def init_reduce_func(value_to_check):
if acc_type == KeyedStream.AccumulateType.MIN_BY:
# Logic for min_by operator.
def reduce_func(v1, v2):
if isinstance(value_to_check, (tuple, list, Row)):
return v2 if v2[self._pos] < v1[self._pos] else v1
else:
return v2 if v2 < v1 else v1
self._reduce_func = reduce_func
elif acc_type == KeyedStream.AccumulateType.MAX_BY:
# Logic for max_by operator.
def reduce_func(v1, v2):
if isinstance(value_to_check, (tuple, list, Row)):
return v2 if v2[self._pos] > v1[self._pos] else v1
else:
return v2 if v2 > v1 else v1
self._reduce_func = reduce_func
# for MIN / MAX / SUM
elif isinstance(value_to_check, tuple):
def reduce_func(v1, v2):
v1_list = list(v1)
if acc_type == KeyedStream.AccumulateType.MIN:
# Logic for min operator with tuple type input.
v1_list[self._pos] = v2[self._pos] \
if v2[self._pos] < v1[self._pos] else v1[self._pos]
elif acc_type == KeyedStream.AccumulateType.MAX:
# Logic for max operator with tuple type input.
v1_list[self._pos] = v2[self._pos] \
if v2[self._pos] > v1[self._pos] else v1[self._pos]
else:
# Logic for sum operator with tuple type input.
v1_list[self._pos] = v1[self._pos] + v2[self._pos]
return tuple(v1_list)
return tuple(v1_list)
self._reduce_func = reduce_func
elif isinstance(value_to_check, (list, Row)):
def reduce_func(v1, v2):
if acc_type == KeyedStream.AccumulateType.MIN:
# Logic for min operator with List and Row types input.
v1[self._pos] = v2[self._pos] \
if v2[self._pos] < v1[self._pos] else v1[self._pos]
elif acc_type == KeyedStream.AccumulateType.MAX:
# Logic for max operator with List and Row types input.
v1[self._pos] = v2[self._pos] \
if v2[self._pos] > v1[self._pos] else v1[self._pos]
else:
# Logic for sum operator with List and Row types input.
v1[self._pos] = v1[self._pos] + v2[self._pos]
return v1
self._reduce_func = reduce_func
else:
if self._pos != 0:
raise TypeError(
"The %s field selected on a basic type. A field expression "
"on a basic type can only select the 0th field (which means "
"selecting the entire basic type)." % self._pos)
def reduce_func(v1, v2):
if acc_type == KeyedStream.AccumulateType.MIN:
# Logic for min operator with basic type input.
return v2 if v2 < v1 else v1
elif acc_type == KeyedStream.AccumulateType.MAX:
# Logic for max operator with basic type input.
return v2 if v2 > v1 else v1
else:
# Logic for sum operator with basic type input.
return v1 + v2
self._reduce_func = reduce_func
if not self._reduce_func:
init_reduce_func(value2)
return self._reduce_func(value1, value2)
return self.reduce(AccumulateReduceFunction(position, acc_type))