def visit_reduction()

in odps/df/backends/odpssql/compiler.py [0:0]


    def visit_reduction(self, expr):
        is_unique = getattr(expr, '_unique', False)

        if isinstance(expr, (Count, GroupedCount)) and isinstance(expr.input, CollectionExpr):
            compiled = 'COUNT(1)'
            self._ctx.add_expr_compiled(expr, compiled)
            return

        if isinstance(expr, (Std, GroupedStd)):
            if expr._ddof not in (0, 1):
                raise CompileError('Does not support %s with ddof=%s' % (
                    expr.node_name, expr._ddof))

        compiled = None

        if isinstance(expr, (Mean, GroupedMean)):
            node_name = 'avg'
        elif isinstance(expr, (Std, GroupedStd)):
            node_name = 'stddev' if expr._ddof == 0 else 'stddev_samp'
        elif isinstance(expr, (Sum, GroupedSum)) and expr.input.dtype == df_types.string:
            if is_unique:
                compiled = 'WM_CONCAT(DISTINCT \'\', %s)' % self._ctx.get_expr_compiled(expr.input)
            else:
                compiled = 'WM_CONCAT(\'\', %s)' % self._ctx.get_expr_compiled(expr.input)
        elif isinstance(expr, (Sum, GroupedSum)) and expr.input.dtype == df_types.boolean:
            if getattr(expr, '_unique', False):
                compiled = 'SUM(DISTINCT IF(%s, 1, 0))' % self._ctx.get_expr_compiled(expr.input)
            else:
                compiled = 'SUM(IF(%s, 1, 0))' % self._ctx.get_expr_compiled(expr.input)
        elif isinstance(expr, (Max, GroupedMax, Min, GroupedMin)) and \
                expr.input.dtype == df_types.boolean:
            compiled = '%s(IF(%s, 1, 0)) == 1' % (
                expr.node_name, self._ctx.get_expr_compiled(expr.input))
        elif isinstance(expr, (Any, GroupedAny)):
            compiled = 'MAX(IF(%s, 1, 0)) == 1' % self._ctx.get_expr_compiled(expr.args[0])
        elif isinstance(expr, (All, GroupedAll)):
            compiled = 'MIN(IF(%s, 1, 0)) == 1' % self._ctx.get_expr_compiled(expr.args[0])
        elif isinstance(expr, (NUnique, GroupedNUnique)):
            compiled = 'COUNT(DISTINCT %s)' % ', '.join(
                self._ctx.get_expr_compiled(c) for c in expr.inputs)
        elif isinstance(expr, (Cat, GroupedCat)):
            if is_unique:
                compiled = 'WM_CONCAT(DISTINCT %s, %s)' % (self._ctx.get_expr_compiled(expr._sep),
                                                           self._ctx.get_expr_compiled(expr.input))
            else:
                compiled = 'WM_CONCAT(%s, %s)' % (self._ctx.get_expr_compiled(expr._sep),
                                                  self._ctx.get_expr_compiled(expr.input))
        elif isinstance(expr, (Quantile, GroupedQuantile)):
            if not isinstance(expr._prob, (list, set)):
                probs_expr = expr._prob
            else:
                probs_expr = 'ARRAY(' + ', '.join(str(p) for p in expr._prob) + ')'

            if expr.input.data_type in (df_types.float32, df_types.float64) and types.get_local_use_odps2_types():
                func_name = 'PERCENTILE_APPROX'
            else:
                func_name = 'PERCENTILE'

            if is_unique:
                compiled = '%s(DISTINCT %s, %s)' % (func_name, self._ctx.get_expr_compiled(expr.input), probs_expr)
            else:
                compiled = '%s(%s, %s)' % (func_name, self._ctx.get_expr_compiled(expr.input), probs_expr)
        elif isinstance(expr, (ToList, GroupedToList)):
            func_name = 'COLLECT_SET' if expr._unique else 'COLLECT_LIST'
            compiled = '%s(%s)' % (func_name, self._ctx.get_expr_compiled(expr.input))
        else:
            node_name = expr.node_name

        if compiled is None:
            if is_unique:
                compiled = '{0}(DISTINCT {1})'.format(
                    node_name.upper(), self._ctx.get_expr_compiled(expr.args[0]))
            else:
                compiled = '{0}({1})'.format(
                    node_name.upper(), self._ctx.get_expr_compiled(expr.args[0]))

        self._ctx.add_expr_compiled(expr, compiled)