in odps/df/backends/odpssql/compiler.py [0:0]
def visit_reduction(self, expr):
is_unique = getattr(expr, '_unique', False)
if isinstance(expr, (Count, GroupedCount)) and isinstance(expr.input, CollectionExpr):
compiled = 'COUNT(1)'
self._ctx.add_expr_compiled(expr, compiled)
return
if isinstance(expr, (Std, GroupedStd)):
if expr._ddof not in (0, 1):
raise CompileError('Does not support %s with ddof=%s' % (
expr.node_name, expr._ddof))
compiled = None
if isinstance(expr, (Mean, GroupedMean)):
node_name = 'avg'
elif isinstance(expr, (Std, GroupedStd)):
node_name = 'stddev' if expr._ddof == 0 else 'stddev_samp'
elif isinstance(expr, (Sum, GroupedSum)) and expr.input.dtype == df_types.string:
if is_unique:
compiled = 'WM_CONCAT(DISTINCT \'\', %s)' % self._ctx.get_expr_compiled(expr.input)
else:
compiled = 'WM_CONCAT(\'\', %s)' % self._ctx.get_expr_compiled(expr.input)
elif isinstance(expr, (Sum, GroupedSum)) and expr.input.dtype == df_types.boolean:
if getattr(expr, '_unique', False):
compiled = 'SUM(DISTINCT IF(%s, 1, 0))' % self._ctx.get_expr_compiled(expr.input)
else:
compiled = 'SUM(IF(%s, 1, 0))' % self._ctx.get_expr_compiled(expr.input)
elif isinstance(expr, (Max, GroupedMax, Min, GroupedMin)) and \
expr.input.dtype == df_types.boolean:
compiled = '%s(IF(%s, 1, 0)) == 1' % (
expr.node_name, self._ctx.get_expr_compiled(expr.input))
elif isinstance(expr, (Any, GroupedAny)):
compiled = 'MAX(IF(%s, 1, 0)) == 1' % self._ctx.get_expr_compiled(expr.args[0])
elif isinstance(expr, (All, GroupedAll)):
compiled = 'MIN(IF(%s, 1, 0)) == 1' % self._ctx.get_expr_compiled(expr.args[0])
elif isinstance(expr, (NUnique, GroupedNUnique)):
compiled = 'COUNT(DISTINCT %s)' % ', '.join(
self._ctx.get_expr_compiled(c) for c in expr.inputs)
elif isinstance(expr, (Cat, GroupedCat)):
if is_unique:
compiled = 'WM_CONCAT(DISTINCT %s, %s)' % (self._ctx.get_expr_compiled(expr._sep),
self._ctx.get_expr_compiled(expr.input))
else:
compiled = 'WM_CONCAT(%s, %s)' % (self._ctx.get_expr_compiled(expr._sep),
self._ctx.get_expr_compiled(expr.input))
elif isinstance(expr, (Quantile, GroupedQuantile)):
if not isinstance(expr._prob, (list, set)):
probs_expr = expr._prob
else:
probs_expr = 'ARRAY(' + ', '.join(str(p) for p in expr._prob) + ')'
if expr.input.data_type in (df_types.float32, df_types.float64) and types.get_local_use_odps2_types():
func_name = 'PERCENTILE_APPROX'
else:
func_name = 'PERCENTILE'
if is_unique:
compiled = '%s(DISTINCT %s, %s)' % (func_name, self._ctx.get_expr_compiled(expr.input), probs_expr)
else:
compiled = '%s(%s, %s)' % (func_name, self._ctx.get_expr_compiled(expr.input), probs_expr)
elif isinstance(expr, (ToList, GroupedToList)):
func_name = 'COLLECT_SET' if expr._unique else 'COLLECT_LIST'
compiled = '%s(%s)' % (func_name, self._ctx.get_expr_compiled(expr.input))
else:
node_name = expr.node_name
if compiled is None:
if is_unique:
compiled = '{0}(DISTINCT {1})'.format(
node_name.upper(), self._ctx.get_expr_compiled(expr.args[0]))
else:
compiled = '{0}({1})'.format(
node_name.upper(), self._ctx.get_expr_compiled(expr.args[0]))
self._ctx.add_expr_compiled(expr, compiled)