odps/df/backends/pd/compiler.py (1,371 lines of code) (raw):

#!/usr/bin/env python # -*- coding: utf-8 -*- # Copyright 1999-2022 Alibaba Group Holding Ltd. # # Licensed under the Apache License, Version 2.0 (the "License"); # you may not use this file except in compliance with the License. # You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, software # distributed under the License is distributed on an "AS IS" BASIS, # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. # See the License for the specific language governing permissions and # limitations under the License. import itertools import json import re import time import uuid import warnings from datetime import datetime from ..core import Backend from ...expr.expressions import * from ...expr.arithmetic import Power from ...expr.reduction import GroupedSequenceReduction, GroupedCount, Count, \ GroupedCat, Cat, NUnique, GroupedNUnique, ToList, GroupedToList, Quantile, \ GroupedQuantile from ...expr.merge import JoinCollectionExpr from ...expr.datetimes import DTScalar from ...expr.collections import PivotCollectionExpr from ...expr import arithmetic, element, composites from ...utils import traverse_until_source from ....dag import DAG from ..errors import CompileError from ..utils import refresh_dynamic from . import types from ... import types as df_types from ....models import FileResource, TableResource, TableSchema from .... import compat from ....lib import cloudpickle from ....lib.xnamedtuple import xnamedtuple from ....compat import lzip, Version try: import numpy as np import pandas as pd except (ImportError, ValueError): pd = None np = None if pd is not None: PD_APPLY_HAS_RESULT_TYPE = Version(pd.__version__) >= Version('0.23.0') else: PD_APPLY_HAS_RESULT_TYPE = False BINARY_OP_TO_PANDAS = { 'Add': operator.add, 'Substract': operator.sub, 'Multiply': operator.mul, 'Divide': operator.div if six.PY2 else operator.truediv, 'Mod': operator.mod, 'FloorDivide': operator.floordiv, 'Power': operator.pow, 'Greater': operator.gt, 'GreaterEqual': operator.ge, 'Less': operator.lt, 'LessEqual': operator.le, 'Equal': operator.eq, 'NotEqual': operator.ne, 'And': operator.and_, 'Or': operator.or_ } UNARY_OP_TO_PANDAS = { 'Negate': operator.neg, 'Invert': operator.inv, 'Abs': operator.abs } if pd: SORT_CUM_WINDOW_OP_TO_PANDAS = { 'CumSum': lambda s: s.expanding(min_periods=1).sum(), 'CumMean': lambda s: s.expanding(min_periods=1).mean(), 'CumMedian': lambda s: s.expanding(min_periods=1).median(), 'CumStd': lambda s: s.expanding(min_periods=1).std(), 'CumMin': lambda s: s.expanding(min_periods=1).min(), 'CumMax': lambda s: s.expanding(min_periods=1).max(), 'CumCount': lambda s: s.expanding(min_periods=1).count(), } if np: CUM_WINDOW_OP_TO_PANDAS = { 'CumSum': np.sum, 'CumMean': np.mean, 'CumMedian': np.median, 'CumStd': np.std, 'CumMin': np.min, 'CumMax': np.max, 'CumCount': lambda x: len(x), } JOIN_DICT = { 'INNER': 'inner', 'LEFT OUTER': 'left', 'RIGHT OUTER': 'right', 'FULL OUTER': 'outer' } def _explode(obj): if obj and isinstance(obj, tuple): obj = obj[0] if obj is None: return if isinstance(obj, dict): for k, v in six.iteritems(obj): yield k, v else: for v in obj: yield v def _pos_explode(obj): if obj and isinstance(obj, tuple): obj = obj[0] if obj is None: return for idx, v in enumerate(obj): yield idx, v def _filter_none(col): import numpy as np if hasattr(col, 'dropna'): col = col.dropna() else: try: col = col[~np.isnan(col)] except TypeError: col = col[np.fromiter((v is not None for v in col), np.bool_)] return col BUILTIN_FUNCS = { 'EXPLODE': _explode, 'POSEXPLODE': _pos_explode, } class PandasCompiler(Backend): """ PandasCompiler will compile an Expr into a DAG in which each node is a pair of <expr, function>. """ def __init__(self, expr_dag): self._dag = DAG() self._expr_to_dag_node = dict() self._expr_dag = expr_dag self._callbacks = list() def compile(self, expr): try: return self._compile(expr) finally: self._cleanup() def _cleanup(self): for callback in self._callbacks: callback() self._callbacks = list() def _compile(self, expr, traversed=None): if traversed is None: traversed = set() root = self._retrieve_until_find_root(expr) if root is not None and id(root) not in traversed: self._compile_join_node(root, traversed) traversed.add(id(root)) for node in traverse_until_source(expr): if id(node) not in traversed: node.accept(self) traversed.add(id(node)) return self._dag def _compile_join_node(self, expr, traversed): nodes = [] self._compile(expr._lhs, traversed) nodes.append(expr._lhs) self._compile(expr._rhs, traversed) nodes.append(expr._rhs) for node in expr._predicate: nodes.append(node._lhs) self._compile(node._lhs, traversed) nodes.append(node._rhs) self._compile(node._rhs, traversed) expr.accept(self) for node in nodes: self._dag.add_edge(self._expr_to_dag_node[node], self._expr_to_dag_node[expr]) cached_args = expr.args def cb(): for arg_name, arg in zip(expr._args, cached_args): setattr(expr, arg_name, arg) self._callbacks.append(cb) for arg_name in expr._args: setattr(expr, arg_name, None) @classmethod def _retrieve_until_find_root(cls, expr): for node in traverse_until_source(expr, top_down=True, unique=True): if isinstance(node, JoinCollectionExpr): return node def _add_node(self, expr, handle): children = expr.children() node = (expr, handle) self._dag.add_node(node) self._expr_to_dag_node[expr] = node # the dependencies do not exist in self._expr_to_dag_node predecessors = [self._expr_to_dag_node[child] for child in children if child in self._expr_to_dag_node] [self._dag.add_edge(p, node) for p in predecessors] @staticmethod def _attempt_pickle_unpickle(*args): try: cloudpickle.loads(cloudpickle.dumps(args)) except Exception as ex: # pragma: no cover warnings.warn( "Failed serializing user-defined function %r. Might get errors " "when running remotely. Error message: %s" % (args, ex), RuntimeWarning, ) def visit_source_collection(self, expr): df = next(expr.data_source()) if not isinstance(df, pd.DataFrame): raise ValueError('Expr data must be a pandas DataFrame.') def handle(_): copy_data = len(self._expr_dag.descendants(expr)) > 0 if not copy_data and list(df.columns) == expr.schema.names: # no need to copy when there is no descendants for the dataframe return df else: # make a copy to avoid modify return df.rename(columns=dict(zip(df.columns, expr.schema.names))) self._add_node(expr, handle) @classmethod def _get_children_vals(cls, kw, expr=None, children=None): children = children or expr.children() return [kw.get(child) for child in children] @classmethod def _merge_values(cls, exprs, kw): fields = [kw.get(expr) for expr in exprs] size = max(len(f) for f, e in zip(fields, exprs) if isinstance(e, SequenceExpr)) fields = [pd.Series([f] * size) if isinstance(e, Scalar) else f for f, e in zip(fields, exprs)] return pd.concat(fields, axis=1, keys=[e.name for e in exprs]) def visit_project_collection(self, expr): def handle(kw): children = expr.children() fields = self._get_children_vals(kw, children=children)[1:] names = expr.schema.names if isinstance(expr, Summary): size = 1 else: size = max(len(f) for f, e in zip(fields, expr._fields) if isinstance(e, SequenceExpr)) for i in range(len(fields)): if not isinstance(fields[i], pd.Series): fields[i] = pd.Series([fields[i]] * size) return pd.concat(fields, axis=1, keys=names) self._add_node(expr, handle) def visit_filter_partition_collection(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) df, predicate = children_vals[0:1] return df[predicate][expr.schema.names] self._add_node(expr, handle) def visit_filter_collection(self, expr): def handle(kw): df, predicate = tuple(self._get_children_vals(kw, expr)) return df[predicate] self._add_node(expr, handle) def visit_slice_collection(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) df = children_vals[0] start, end, step = expr.start, expr.stop, expr.step return df[start: end: step] self._add_node(expr, handle) def visit_element_op(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) input, args = children_vals[0], children_vals[1:] if isinstance(expr.input, Scalar): input = pd.Series([input]) def run(): if isinstance(expr, element.IsNull): return input.isnull() elif isinstance(expr, element.NotNull): return input.notnull() elif isinstance(expr, element.IsNa): return input.isna() elif isinstance(expr, element.NotNa): return input.notna() elif isinstance(expr, element.FillNa): return input.fillna(args[0]) elif isinstance(expr, element.IsIn): if isinstance(expr._values[0], SequenceExpr): return input.isin(list(args[0])) else: return input.isin(args) elif isinstance(expr, element.NotIn): if isinstance(expr._values[0], SequenceExpr): return ~input.isin(list(args[0])) else: return ~input.isin(args) elif isinstance(expr, element.IfElse): return pd.Series(np.where(input, args[0], args[1]), name=expr.name, index=input.index) elif isinstance(expr, element.Switch): case = None if expr.case is None else kw.get(expr.case) default = None if expr.default is None else kw.get(expr.default) conditions = [kw.get(it) for it in expr.conditions] thens = [kw.get(it) for it in expr.thens] if case is not None: conditions = [case == condition for condition in conditions] condition_exprs = [expr.case == cond for cond in expr.conditions] else: condition_exprs = expr.conditions size = max(len(val) for e, val in zip(condition_exprs + expr.thens, conditions + thens) if isinstance(e, SequenceExpr)) curr = pd.Series([None] * size) for condition, then in zip(conditions, thens): curr = curr.where(-condition, then) if default is not None: return curr.fillna(default) return curr elif isinstance(expr, element.Between): return input.between(*args) elif isinstance(expr, element.Cut): bins = [bin.value for bin in expr.bins] if expr.include_under: bins.insert(0, -float('inf')) if expr.include_over: bins.append(float('inf')) labels = [l.value for l in expr.labels] return pd.cut(input, bins, right=expr.right, labels=labels, include_lowest=expr.include_lowest) if isinstance(expr.input, Scalar): return run()[0] else: return run() self._add_node(expr, handle) def visit_binary_op(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) if expr.lhs.dtype == df_types.datetime and expr.rhs.dtype == df_types.datetime: return ((pd.to_datetime(children_vals[0]) - pd.to_datetime(children_vals[1])) / np.timedelta64(1, 'ms')).astype(np.int64) op = BINARY_OP_TO_PANDAS[expr.node_name] if isinstance(expr, Power) and isinstance(expr.dtype, df_types.Integer): return op(*children_vals).astype(types.df_type_to_np_type(expr.dtype)) return op(*children_vals) self._add_node(expr, handle) def visit_unary_op(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) op = UNARY_OP_TO_PANDAS[expr.node_name] return op(*children_vals) self._add_node(expr, handle) def visit_math(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) if isinstance(expr, math.Log) and expr._base is not None: base = expr._base.value return np.log(children_vals[0]) / np.log(base) elif isinstance(expr, math.Trunc): decimals = expr._decimals.value order = 10 ** decimals return np.trunc(children_vals[0] * order) / order else: op = getattr(np, expr.node_name.lower()) return op(*children_vals) self._add_node(expr, handle) def visit_string_op(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) input = children_vals[0] if isinstance(expr.input, Scalar): input = pd.Series([input]) assert len(expr._args) == len(expr.args) kv = dict((name.lstrip('_'), self._get(arg, kw)) for name, arg in zip(expr._args[1:], expr.args[1:])) op = expr.node_name.lower() if op == 'get': res = getattr(getattr(input, 'str'), op)(children_vals[1]) elif op == 'strptime': res = input.map(lambda x: datetime.strptime(x, children_vals[1])) elif op == 'extract': def extract(x, pat, flags, group): regex = re.compile(pat, flags=flags) m = regex.match(x) if m: return m.group(group) df = self._merge_values([expr.input, expr._pat, expr._flags, expr._group], kw) return pd.Series([extract(*r[1]) for r in df.iterrows()]) elif op == 'split': return input.apply(lambda v: v.split(kv['pat'], kv['n']) if v is not None else None) elif op == 'stringtodict': def _parse_dict(x): return dict(it.split(kv['kv_delim'], 1) for it in x.split(kv['item_delim'])) return input.apply(lambda v: _parse_dict(v) if v is not None else None) else: if op == 'slice': kv['stop'] = kv.pop('end', None) elif op == 'replace': assert 'regex' in kv if kv['regex']: kv.pop('regex') else: kv['pat'] = re.escape(kv['pat']) kv.pop('regex') res = getattr(getattr(input, 'str'), op)(**kv) if isinstance(expr.input, Scalar): return res[0] else: return res self._add_node(expr, handle) def visit_datetime_op(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) input = children_vals[0] if isinstance(expr.input, Scalar): input = pd.Series([input]) assert len(children_vals) == len(expr.args) kv = dict(zip([arg.lstrip('_') for arg in expr._args[1:]], children_vals[1:])) op = expr.node_name.lower() res = getattr(getattr(input, 'dt'), op) if not isinstance(res, pd.Series): res = res(**kv) if isinstance(expr.input, Scalar): return res[0] else: return res self._add_node(expr, handle) def visit_groupby(self, expr): def handle(kw): fields_exprs = expr._fields or expr._by + expr._aggregations fields = [[kw.get(field), ] if isinstance(field, Scalar) else kw.get(field) for field in fields_exprs] length = max(len(it) for it in fields) for i in range(len(fields)): bys = self._get_compiled_bys(kw, expr._by, length) if isinstance(fields_exprs[i], SequenceExpr): is_reduction = False for n in itertools.chain(*(fields_exprs[i].all_path(expr.input))): if isinstance(n, GroupedSequenceReduction): is_reduction = True break if not is_reduction: fields[i] = fields[i].groupby(bys).first() elif len(fields[i]) == 1: fields[i] = pd.Series(fields[i] * length, name=fields_exprs[i].name).groupby(bys).first() df = pd.concat(fields, axis=1) if expr._having is not None: having = kw.get(expr._having) if all(not isinstance(e, GroupedSequenceReduction) for e in itertools.chain(*expr._having.all_path(expr.input))): # the having comes from the by fields, we need to do Series.groupby explicitly. bys = self._get_compiled_bys(kw, expr._by, len(having)) having = having.groupby(bys).first() df = df[having] return pd.DataFrame( df.values, columns=[f.name for f in fields_exprs])[expr.schema.names] self._add_node(expr, handle) def visit_mutate(self, expr): def handle(kw): bys = self._get_compiled_bys(kw, expr._by, len(kw.get(expr.input))) bys = pd.concat(bys) bys.sort_values(inplace=True) wins = [kw.get(f) for f in expr._window_fields] return pd.DataFrame(pd.concat([bys] + wins, axis=1).values, columns=expr.schema.names) self._add_node(expr, handle) def visit_value_counts(self, expr): def handle(kw): by = kw.get(expr._by) sort = kw.get(expr._sort) ascending = kw.get(expr._ascending) dropna = kw.get(expr._dropna) df = by.value_counts(sort=sort, ascending=ascending, dropna=dropna).to_frame() df.reset_index(inplace=True) return pd.DataFrame(df.values, columns=expr.schema.names) self._add_node(expr, handle) def visit_sort(self, expr): def handle(kw): input = kw.get(expr.input) names = expr.schema.names sorted_columns = OrderedDict() for field in expr._sorted_fields: name = str(uuid.uuid4()) sorted_columns[name] = kw.get(field) input = input.assign(**sorted_columns) return input.sort_values(list(six.iterkeys(sorted_columns)), ascending=expr._ascending)[names] self._add_node(expr, handle) def visit_sort_column(self, expr): def handle(kw): input = kw.get(expr.input) if isinstance(expr.input, CollectionExpr): return input[expr._source_name] else: return input self._add_node(expr, handle) def visit_distinct(self, expr): def handle(kw): children_vals = self._get_children_vals(kw, expr) fields = children_vals[1:] ret = pd.concat(fields, axis=1, keys=expr.schema.names).drop_duplicates() ret.reset_index(drop=True, inplace=True) return ret self._add_node(expr, handle) def _get(self, item, kw): if item is None: return if isinstance(item, (list, tuple, set)): return type(item)(kw.get(it) for it in item) return kw.get(item) def visit_sample(self, expr): def handle(kw): input = self._get(expr.input, kw) parts = self._get(expr._parts, kw) i = self._get(expr._i, kw) n = self._get(expr._n, kw) frac = self._get(expr._frac, kw) replace = self._get(expr._replace, kw) weights = self._get(expr._weights, kw) strata = self._get(expr._strata, kw) random_state = self._get(expr._random_state, kw) if expr._sampled_fields: collection = pd.DataFrame( pd.concat([kw.get(e) for e in expr._sampled_fields], axis=1).values, columns=[str(uuid.uuid4()) for _ in expr._sampled_fields]) else: collection = input if parts is not None and frac is None: frac = 1 / float(parts) if i is not None and (len(i) != 1 or i[0] > 0): raise NotImplementedError if not strata: sampled = collection.sample(n=n, frac=frac, replace=replace, weights=weights, random_state=random_state) else: frames = [] frac = json.loads(frac) if expr._frac else dict() n = json.loads(n) if expr._n else dict() for val in itertools.chain(six.iterkeys(frac), six.iterkeys(n)): v_frac = frac.get(val) v_n = n.get(val) filtered = collection[collection[strata].astype(str) == val] sampled = filtered.sample(n=v_n, frac=v_frac, replace=replace, random_state=random_state) frames.append(sampled) if frames: sampled = pd.concat(frames) else: sampled = pd.DataFrame(columns=collection.columns) if expr._sampled_fields: return pd.concat([input, sampled], axis=1, join='inner')[ [n for n in input.columns.tolist()]] return sampled self._add_node(expr, handle) def _get_names(self, x, force_list=False): if x is None: return x res = [it.name for it in x] if not force_list and len(res) == 1: return res[0] return res def _get_pivot_handler(self, expr): def handle(kw): df = self._merge_values(expr._group + expr._columns + expr._values, kw) pivoted = df.pivot(index=self._get_names(expr._group), columns=self._get_names(expr._columns)) columns = pivoted.columns.levels pivoted.reset_index(inplace=True) names = self._get_names(expr._group, True) tps = [g.dtype for g in expr._group] if len(columns[0]) == 1: tp = expr._values[0].dtype for name in columns[1]: names.append(name) tps.append(tp) else: for value_name, value_col in zip(columns[0], expr._values): for name in columns[1]: names.append('{0}_{1}'.format(name, value_name)) tps.append(value_col.dtype) expr._schema = TableSchema.from_lists(names, tps) res = pd.DataFrame(pivoted.values, columns=names) to_sub = CollectionExpr(_source_data=res, _schema=expr._schema) self._expr_dag.substitute(expr, to_sub) # trigger refresh of dynamic operations def func(expr): for c in traverse_until_source(expr, unique=True): if c not in self._expr_to_dag_node: c.accept(self) refresh_dynamic(to_sub, self._expr_dag, func=func) return to_sub, res return handle def _get_pivot_table_handler(self, expr): from ...expr.query import ExprVisitor class WrappedNumpyFunction(object): def __init__(self, fun): self._fun = fun def __call__(self, *args, **kwargs): return self._fun(*args, **kwargs) class AggFuncVisitor(ExprVisitor): def __init__(self, np_object, env): super(AggFuncVisitor, self).__init__(env) self.np_object = np_object def get_named_object(self, obj_name): if obj_name == 'count': return WrappedNumpyFunction(np.size) elif obj_name == 'nunique': return WrappedNumpyFunction(lambda x: np.size(np.unique(x))) elif obj_name == 'quantile': return WrappedNumpyFunction(lambda x, prob: np.percentile(x, prob * 100)) else: return WrappedNumpyFunction(getattr(np, obj_name)) def visit_Call(self, node): func = self.visit(node.func) args = [self.visit(n) for n in node.args] if isinstance(func, WrappedNumpyFunction): args = [self.np_object] + args kwargs = OrderedDict([(kw.arg, self.visit(kw.value)) for kw in node.keywords]) return func(*args, **kwargs) def get_real_aggfunc(aggfunc): if isinstance(aggfunc, six.string_types): if aggfunc == 'count': return getattr(np, 'size') if aggfunc == 'nunique': return lambda x: np.size(np.unique(x)) if hasattr(np, aggfunc): return getattr(np, aggfunc) def agg_eval(x): visitor = AggFuncVisitor(x, {}) return visitor.eval(aggfunc, rewrite=False) return agg_eval if inspect.isclass(aggfunc): aggfunc = aggfunc() def func(x): buffer = aggfunc.buffer() for it in x: aggfunc(buffer, it) return aggfunc.getvalue(buffer) return func return aggfunc def handle(kw): columns = expr._columns if expr._columns else [] df = self._merge_values(expr._group + columns + expr._values, kw) pivoted = df.pivot_table(index=self._get_names(expr._group), columns=self._get_names(expr._columns), values=self._get_names(expr._values), aggfunc=[get_real_aggfunc(f) for f in expr._agg_func], fill_value=expr.fill_value) levels = pivoted.columns.levels if isinstance(pivoted.columns, pd.MultiIndex) \ else [pivoted.columns] pivoted.reset_index(inplace=True) names = self._get_names(expr._group, True) tps = [g.dtype for g in expr._group] columns_values = levels[-1] if expr._columns else [None, ] for agg_func_name in expr._agg_func_names: for value_col in expr._values: for col in columns_values: base = '{0}_'.format(col) if col is not None else '' name = '{0}{1}_{2}'.format(base, value_col.name, agg_func_name) names.append(name) tps.append(value_col.dtype) if expr._columns: expr._schema = TableSchema.from_lists(names, tps) res = pd.DataFrame(pivoted.values, columns=names) to_sub = CollectionExpr(_source_data=res, _schema=expr._schema) self._expr_dag.substitute(expr, to_sub) # trigger refresh of dynamic operations def func(expr): for c in traverse_until_source(expr, unique=True): if c not in self._expr_to_dag_node: c.accept(self) refresh_dynamic(to_sub, self._expr_dag, func=func) return to_sub, res return handle def visit_pivot(self, expr): if isinstance(expr, PivotCollectionExpr): handle = self._get_pivot_handler(expr) else: handle = self._get_pivot_table_handler(expr) self._add_node(expr, handle) def _get_compiled_bys(self, kw, by_exprs, length): bys = [[kw.get(by), ] if isinstance(by, Scalar) else kw.get(by) for by in by_exprs] if any(isinstance(e, SequenceExpr) for e in by_exprs): size = max(len(by) for by, e in zip(bys, by_exprs) if isinstance(e, SequenceExpr)) else: size = length return [(by * size if len(by) == 1 else by) for by in bys] def _compile_grouped_reduction(self, kw, expr): if isinstance(expr, GroupedCount) and isinstance(expr._input, CollectionExpr): df = kw.get(expr.input) bys = [[kw.get(by), ] if isinstance(by, Scalar) else kw.get(by) for by in expr._by] if any(isinstance(e, SequenceExpr) for e in expr._by): size = max(len(by) for by, e in zip(bys, expr._by) if isinstance(e, SequenceExpr)) else: size = len(df) bys = [(by * size if len(by) == 1 else by) for by in bys] return df.groupby(bys).size() if isinstance(expr, GroupedNUnique): input_df = pd.concat([kw.get(ip) for ip in expr.inputs], axis=1) bys = self._get_compiled_bys(kw, expr._by, len(input_df)) return input_df.groupby(bys).apply(lambda x: pd.Series([len(x.drop_duplicates())]))[0] series = kw.get(expr.input) if isinstance(expr.input, SequenceExpr) \ else pd.Series([kw.get(expr.input)], name=expr.input.name) bys = self._get_compiled_bys(kw, expr._by, len(series)) if isinstance(expr.input, Scalar): series = pd.Series(series.repeat(len(bys[0])).values, index=bys[0].index) if isinstance(expr, GroupedCat): return series.groupby(bys).apply(lambda x: kw.get(expr._sep).join(x)) if isinstance(expr, GroupedToList): if expr._unique: return series.groupby(bys).apply(lambda x: list(set(x))) else: return series.groupby(bys).apply(list) kv = dict() if hasattr(expr, '_ddof'): kv['ddof'] = expr._ddof op = expr.node_name.lower() op = 'size' if op == 'count' else op return getattr(series.groupby(bys), op)(**kv) def visit_reduction(self, expr): def handle(kw): if isinstance(expr, GroupedSequenceReduction): return self._compile_grouped_reduction(kw, expr) children_vals = self._get_children_vals(kw, expr) kv = dict() if hasattr(expr, '_ddof'): kv['ddof'] = expr._ddof op = expr.node_name.lower() op = 'size' if op == 'count' else op if isinstance(expr, NUnique): inputs = children_vals[:len(expr.inputs)] if len(expr.inputs) == 1: inputs[0] = _filter_none(inputs[0]) return len(pd.concat(inputs, axis=1).drop_duplicates()) input = children_vals[0] if getattr(expr, '_unique', False): input = input.unique() if isinstance(expr, Count): if isinstance(expr.input, CollectionExpr): return len(input) elif isinstance(expr.input, SequenceExpr): return len(_filter_none(input)) input = _filter_none(input) if isinstance(expr, (Cat, GroupedCat)): kv['sep'] = expr._sep.value if isinstance(expr._sep, Scalar) else expr._sep kv['na_rep'] = expr._na_rep.value \ if isinstance(expr._na_rep, Scalar) else expr._na_rep return getattr(getattr(input, 'str'), 'cat')(**kv) elif isinstance(expr, (ToList, GroupedToList)): return list(input) elif isinstance(expr, (Quantile, GroupedQuantile)): if isinstance(expr._prob, (list, set)): return [np.percentile(input, p * 100) for p in expr._prob] else: return np.percentile(input, expr._prob * 100) return getattr(input, op)(**kv) self._add_node(expr, handle) def visit_user_defined_aggregator(self, expr): def handle(kw): resources = self._get_resources(expr, kw) input = self._merge_values(expr._inputs, kw) func = expr._aggregator args = expr._func_args kwargs = expr._func_kwargs or dict() self._attempt_pickle_unpickle(func, args, kwargs) if resources: if not args and not kwargs: agg = func(resources) else: kwargs['resources'] = resources agg = func(*args, **kwargs) else: agg = func(*args, **kwargs) if isinstance(expr, GroupedSequenceReduction): bys = [[kw.get(by), ] if isinstance(by, Scalar) else kw.get(by) for by in expr._by] else: bys = [[1, ]] if expr._by and any(isinstance(e, SequenceExpr) for e in expr._by): size = max(len(by) for by, e in zip(bys, expr._by) if isinstance(e, SequenceExpr)) else: size = len(input) bys = [(by * size if len(by) == 1 else by) for by in bys] def iterrows(x): if getattr(expr, '_unique', False): vset = set() for it in x.iterrows(): if bytes(it[1].values.data) not in vset: yield it vset.add(bytes(it[1].values.data)) else: for it in x.iterrows(): yield it def f(x): buffer = agg.buffer() for it in iterrows(x): agg(buffer, *it[1]) ret = agg.getvalue(buffer) np_type = types.df_type_to_np_type(expr.dtype) return np.array([ret,], dtype=np_type)[0] res = input.groupby(bys).apply(f) if isinstance(expr, Scalar): return res.iloc[0] return res self._add_node(expr, handle) def visit_column(self, expr): def handle(kw): chidren_vals = self._get_children_vals(kw, expr) # FIXME: consider the name which is unicode return chidren_vals[0][expr._source_name] self._add_node(expr, handle) def _get_resources(self, expr, kw): if not expr._resources: return res = [] collection_idx = 0 for resource in expr._resources: if isinstance(resource, FileResource): res.append(resource.open()) elif isinstance(resource, TableResource): def gen(): table = resource.get_source_table() named_args = xnamedtuple('NamedArgs', table.table_schema.names) partition = resource.get_source_table_partition() with table.open_reader(partition=partition) as reader: for r in reader: yield named_args(*r.values) res.append(gen()) else: resource = expr._collection_resources[collection_idx] collection_idx += 1 df = kw.get(resource) def gen(): named_args = xnamedtuple('NamedArgs', resource.schema.names) for r in df.iterrows(): yield named_args(*r[1]) res.append(gen()) return res def visit_function(self, expr): def handle(kw): resources = self._get_resources(expr, kw) if not expr._multiple: input = self._get_children_vals(kw, expr)[0] if isinstance(expr.inputs[0], Scalar): input = pd.Series([input]) func = expr._func args = expr._func_args kwargs = expr._func_kwargs self._attempt_pickle_unpickle(func, args, kwargs) if args is not None and len(args) > 0: raise NotImplementedError if kwargs is not None and len(kwargs) > 0: raise NotImplementedError if inspect.isclass(func): if resources: func = func(resources) else: func = func() else: if resources: func = func(resources) res = input.map(func) if isinstance(expr.inputs[0], Scalar): return res[0] return res else: input = self._merge_values(expr.inputs, kw) def func(s): names = [f.name for f in expr.inputs] t = xnamedtuple('NamedArgs', names) row = t(*s.tolist()) if not inspect.isfunction(expr._func): if resources: f = expr._func(resources) else: f = expr._func() else: if resources: f = expr._func(resources) else: f = expr._func res = f(row, *expr._func_args, **expr._func_kwargs) if not inspect.isgeneratorfunction(f): return res return next(res) if PD_APPLY_HAS_RESULT_TYPE: return input.apply(func, axis=1, result_type='reduce', args=expr._func_args, **expr._func_kwargs) else: return input.apply(func, axis=1, reduce=True, args=expr._func_args, **expr._func_kwargs) self._add_node(expr, handle) def visit_reshuffle(self, expr): def handle(kw): if expr._sort_fields is not None: input = kw.get(expr._input) names = [] for sort in expr._sort_fields: name = str(uuid.uuid4()) input[name] = kw.get(sort) names.append(name) input = input.sort_values( names, ascending=[f._ascending for f in expr._sort_fields]) return input[expr.schema.names] return kw.get(expr._input) self._add_node(expr, handle) def _check_output_types(self, pd_df, expect_df_types): for field, expect_df_type in zip(pd_df.columns, expect_df_types): arr = pd_df[field].values try: df_type = types.np_type_to_df_type(pd_df[field].dtype, arr=arr) except TypeError: # all element is None continue if not expect_df_type.can_implicit_cast(df_type): raise TypeError('Field(%s) has wrong type, expect %s, got %s' % ( field, expect_df_type, df_type )) return pd_df def visit_apply_collection(self, expr): def conv(l): if isinstance(l, tuple): l = list(l) elif not isinstance(l, list): l = [l, ] return l def handle(kw): resources = self._get_resources(expr, kw) input = self._merge_values(expr.fields, kw) names = [f.name for f in expr.fields] t = xnamedtuple('NamedArgs', names) expr._func_args = expr._func_args or () expr._func_kwargs = expr._func_kwargs or {} func = expr._func self._attempt_pickle_unpickle(func, expr._func_args, expr._func_kwargs) if isinstance(func, six.string_types) and func.upper() in BUILTIN_FUNCS: func = BUILTIN_FUNCS[func.upper()] if inspect.isfunction(func): if resources: func = func(resources) is_generator_function = inspect.isgeneratorfunction(func) close_func = None is_close_generator_function = False elif hasattr(func, '__call__'): if resources: func = func(resources) else: func = func() is_generator_function = inspect.isgeneratorfunction(func.__call__) close_func = getattr(func, 'close', None) is_close_generator_function = inspect.isgeneratorfunction(close_func) else: raise NotImplementedError rows = [] indices = [] idx = 0 for s in input.iterrows(): row = t(*s[1]) res = func(row, *expr._func_args, **expr._func_kwargs) expand_num = 0 if is_generator_function: for l in res: rows.append(conv(l)) expand_num += 1 else: if res: rows.append(conv(res)) expand_num += 1 if expand_num == 0 and expr._keep_nulls: rows.append([None] * len(names)) expand_num += 1 indices.extend([s[0]] * expand_num) idx = max(idx, s[0] + 1) if close_func: expand_num = 0 if is_close_generator_function: for l in close_func(*expr._func_args, **expr._func_kwargs): rows.append(conv(l)) expand_num += 1 else: rows.append(close_func(*expr._func_args, **expr._func_kwargs)) expand_num += 1 indices.extend([idx] * expand_num) if expr._lateral_view: out_df = pd.DataFrame(rows, columns=expr.schema.names, index=pd.Int64Index(indices)) else: out_df = pd.DataFrame(rows, columns=expr.schema.names) return self._check_output_types(out_df, expr.schema.types) self._add_node(expr, handle) def visit_lateral_view(self, expr): def handle(kw): lv_sources = dict() for lv in expr.lateral_views: for col_name in lv.schema.names: lv_sources[col_name] = lv children = expr.children() fields = self._get_children_vals(kw, children=children)[1:len(expr._fields) + 1] names = expr.schema.names idx = reduce(operator.and_, (set(f.index.tolist()) for f, e in zip(fields, expr._fields) if isinstance(e, SequenceExpr))) idx = pd.Int64Index(sorted(idx)) result = pd.DataFrame(index=idx) lv_visited = set() for i in range(len(fields)): f = fields[i] if names[i] in lv_sources: lv_src = lv_sources[names[i]] if lv_src in lv_visited: continue lv_visited.add(lv_src) f = kw[lv_src] elif not isinstance(f, pd.Series): f = pd.Series([f] * len(idx), index=idx, name=names[i]) result = result.join(f) return result self._add_node(expr, handle) def visit_composite_op(self, expr): def handle(kw): def _zip_args(fields): zip_args = [] seq_index = None for it in fields: if isinstance(it, SequenceExpr): zip_args.append(kw[it]) seq_index = kw[it].index else: zip_args.append(itertools.repeat(kw[it])) return seq_index, zip_args children_vals = self._get_children_vals(kw, expr) _input = children_vals[0] if isinstance(expr, composites.ListDictLength): return _input.apply(lambda v: len(v) if v is not None else None) elif isinstance(expr, composites.ListDictGetItem): def _get_list_item(l, x): try: return l[x] if l is not None else None except IndexError: return None _value = children_vals[1] if isinstance(expr.input.dtype, df_types.List): item_fun = _get_list_item else: item_fun = lambda s, k: s.get(k) if s is not None else None if isinstance(expr, Scalar): return item_fun(_input, _value) else: if isinstance(expr.input, Scalar): return _value.apply(lambda v: item_fun(_input, v)) if isinstance(expr._key, Scalar): return _input.apply(lambda v: item_fun(v, _value)) seq_values = [item_fun(k, v) for k, v in compat.izip(_input, _value)] return pd.Series(seq_values, index=_input.index, name=expr.name) elif isinstance(expr, composites.ListContains): _value = children_vals[1] contains_fun = lambda s, k: k in s if s is not None else None if isinstance(expr, Scalar): return contains_fun(_input, _value) else: if isinstance(expr.input, Scalar): return _value.apply(lambda v: contains_fun(_input, v)) if isinstance(expr._value, Scalar): return _input.apply(lambda v: contains_fun(v, _value)) seq_values = [contains_fun(k, v) for k, v in compat.izip(_input, _value)] return pd.Series(seq_values, index=_input.index, name=expr.name) elif isinstance(expr, composites.ListSort): return _input.apply(lambda l: sorted(l) if l is not None else None) elif isinstance(expr, composites.DictKeys): return _input.apply(lambda d: list(six.iterkeys(d)) if d is not None else None) elif isinstance(expr, composites.DictValues): return _input.apply(lambda d: list(six.itervalues(d)) if d is not None else None) elif isinstance(expr, composites.ListBuilder): if isinstance(expr, Scalar): return [kw[v] for v in expr._values] else: seq_index, zip_args = _zip_args(expr._values) seq_values = [] for r in compat.izip(*zip_args): seq_values.append(list(r)) return pd.Series(seq_values, index=seq_index, name=expr.name) elif isinstance(expr, composites.DictBuilder): if isinstance(expr, Scalar): return OrderedDict((kw[k], kw[v]) for k, v in compat.izip(expr._keys, expr._values)) else: seq_index, zip_args = _zip_args(expr._keys + expr._values) seq_values = [] dict_len = len(expr._values) for r in zip(*zip_args): seq_values.append(OrderedDict((k, v) for k, v in compat.izip(r[:dict_len], r[dict_len:]))) return pd.Series(seq_values, index=seq_index, name=expr.name) else: raise NotImplementedError self._add_node(expr, handle) def visit_sequence(self, expr): raise NotImplementedError def visit_cum_window(self, expr): if expr.preceding is not None or expr.following is not None: raise NotImplementedError def handle(kw): input = kw.get(expr.input) bys = self._get_compiled_bys(kw, expr.partition_by, len(input)) grouped = input.groupby(bys) if expr.order_by: sort = [kw.get(e) for e in expr.order_by] ascendings = [e._ascending for e in expr.order_by] for s in sort: sort_name = str(uuid.uuid4()) s.name = sort_name else: sort = None ascendings = None def f(x): if sort: df = pd.concat([x] + sort, join='inner', axis=1) df.sort_values([s.name for s in sort], ascending=ascendings, inplace=True) series = df[x.name] if expr.node_name in SORT_CUM_WINDOW_OP_TO_PANDAS: return SORT_CUM_WINDOW_OP_TO_PANDAS[expr.node_name](series) elif expr.node_name == 'NthValue': values = [None] * len(series) if expr._skip_nulls: new_series = _filter_none(series) else: new_series = series if expr._nth < len(new_series): values[expr._nth:] = [new_series.iloc[expr._nth]] * (len(series) - expr._nth) return pd.Series(values, index=series.index) else: raise NotImplementedError else: if expr.distinct: new_x = x.drop_duplicates() else: new_x = x if expr.node_name in CUM_WINDOW_OP_TO_PANDAS: val = CUM_WINDOW_OP_TO_PANDAS[expr.node_name](new_x) elif expr.node_name == 'NthValue': if expr._skip_nulls: new_series = _filter_none(x) else: new_series = x if expr._nth < len(new_series): val = new_series.iloc[expr._nth] else: val = None else: raise NotImplementedError return pd.Series([val] * len(x), index=x.index) res = grouped.apply(f) if sort: for _ in bys: res = res.reset_index(level=0, drop=True) return res self._add_node(expr, handle) def visit_rank_window(self, expr): def handle(kw): input = kw.get(expr.input) sort = [kw.get(e) * (1 if e._ascending else -1) for e in expr.order_by] bys = self._get_compiled_bys(kw, expr.partition_by, len(input)) sort_names = [str(uuid.uuid4()) for _ in sort] by_names = [str(uuid.uuid4()) for _ in bys] input_names = [input.name] if isinstance(input, pd.Series) else input.columns.tolist() df = pd.DataFrame(pd.concat([input] + sort + [pd.Series(b) for b in bys], axis=1).values, columns=input_names + sort_names + by_names, index=input.index) df.sort_values(sort_names, inplace=True) grouped = df.groupby(by_names) try: pd_fast_zip = pd._libs.lib.fast_zip except AttributeError: pd_fast_zip = pd.lib.fast_zip def f(x): s_df = pd.Series(pd_fast_zip([x[s].values for s in sort_names]), index=x.index) if expr.node_name == 'Rank': return s_df.rank(method='min') elif expr.node_name == 'DenseRank': return s_df.rank(method='dense') elif expr.node_name == 'RowNumber': return pd.Series(compat.lrange(1, len(s_df) + 1), index=s_df.index) elif expr.node_name == 'PercentRank': if len(s_df) == 1: return pd.Series([0.0, ], index=s_df.index) return (s_df.rank(method='min') - 1) / (len(s_df) - 1) elif expr.node_name == 'CumeDist': return pd.Series([v * 1.0 / len(s_df) for v in compat.irange(1, len(s_df) + 1)], index=s_df.index) elif expr.node_name == 'QCut': if len(s_df) <= 1: return pd.Series([0] * len(s_df), index=s_df.index, dtype=np.int64) return pd.Series(pd.qcut(compat.irange(1, len(s_df) + 1), expr._bins, labels=False), index=s_df.index, dtype=np.int64) else: raise NotImplementedError res = grouped.apply(f) if isinstance(res, pd.DataFrame): res = res.iloc[0] else: for _ in bys: res = res.reset_index(level=0, drop=True) return res self._add_node(expr, handle) def visit_shift_window(self, expr): def handle(kw): input = kw.get(expr.input) bys = self._get_compiled_bys(kw, expr.partition_by, len(input)) grouped = input.groupby(bys) if expr.order_by: sort = [kw.get(e) for e in expr.order_by] ascendings = [e._ascending for e in expr.order_by] for s in sort: sort_name = str(uuid.uuid4()) s.name = sort_name else: sort = None ascendings = None if expr.node_name == 'Lag': shift = kw.get(expr.offset) else: assert expr.node_name == 'Lead' shift = -kw.get(expr.offset) default = kw.get(expr.default) def f(x): if sort: df = pd.concat([x] + sort, join='inner', axis=1) df.sort_values([s.name for s in sort], ascending=ascendings, inplace=True) series = df[x.name] else: series = x res = series.shift(shift) if default is not None: return res.fillna(default) return res res = grouped.apply(f) if sort: for _ in bys: res = res.reset_index(level=0, drop=True) return res self._add_node(expr, handle) def visit_scalar(self, expr): def handle(_): if isinstance(expr, DTScalar): arg_name = type(expr).__name__.lower()[:-6] + 's' value = expr.value if arg_name == 'milliseconds': arg_name = 'microseconds' value *= 1000 return pd.DateOffset(**{arg_name: value}) if expr.value is not None: return expr.value return None self._add_node(expr, handle) def visit_cast(self, expr): def handle(kw): dtype = types.df_type_to_np_type(expr.dtype) input = self._get_children_vals(kw, expr)[0] if isinstance(expr._input, Scalar): return pd.Series([input]).astype(dtype)[0] return input.astype(dtype) self._add_node(expr, handle) @classmethod def _find_all_equalizations(cls, predicate, lhs, rhs): return [eq for eq in traverse_until_source(predicate, top_down=True, unique=True) if isinstance(eq, arithmetic.Equal) and eq.is_ancestor(lhs) and eq.is_ancestor(rhs)] def visit_join(self, expr): def handle(kw): left = kw.get(expr._lhs) right = kw.get(expr._rhs) eqs = expr._predicate left_ons = [] right_ons = [] on_same_names = set() for eq in eqs: if isinstance(eq._lhs, Column) and isinstance(eq._rhs, Column) and \ eq._lhs.source_name == eq._rhs.source_name: left_ons.append(eq._lhs.source_name) right_ons.append(eq._rhs.source_name) on_same_names.add(eq._lhs.source_name) continue left_name = str(uuid.uuid4()) left[left_name] = kw.get(eq._lhs) left_ons.append(left_name) right_name = str(uuid.uuid4()) right[right_name] = kw.get(eq._rhs) right_ons.append(right_name) for idx, collection in enumerate([left, right]): collection_expr = (expr._lhs, expr._rhs)[idx] for field_name in collection_expr.schema.names: if field_name in expr._renamed_columns and field_name in on_same_names: new_name = expr._renamed_columns[field_name][idx] collection[new_name] = collection[field_name] merged = left.merge(right, how=JOIN_DICT[expr._how], left_on=left_ons, right_on=right_ons, suffixes=(expr._left_suffix, expr._right_suffix)) cols = [] for name in expr.schema.names: if name in merged: cols.append(merged[name]) else: cols.append(merged[expr._column_origins[name][1]]) return pd.concat(cols, axis=1, keys=expr.schema.names) # Just add node, shouldn't add edge here node = (expr, handle) self._dag.add_node(node) self._expr_to_dag_node[expr] = node def visit_extract_kv(self, expr): def handle(kw): from ... import types _input = kw.get(expr._input) columns = [getattr(_input, c.name) for c in expr._columns] kv_delim = kw.get(expr._kv_delimiter) item_delim = kw.get(expr._item_delimiter) default = kw.get(expr._default) kv_slot_map = dict() app_col_names = [] def validate_kv(v): parts = v.split(kv_delim) if len(parts) != 2: raise ValueError('Malformed KV pair: %s' % v) return parts[0] for col in columns: kv_slot_map[col.name] = dict() keys = col.apply(lambda s: [validate_kv(kv) for kv in s.split(item_delim)]) for k in sorted(compat.reduce(lambda a, b: set(a) | set(b), keys, set())): app_col_names.append('%s_%s' % (col.name, k)) kv_slot_map[col.name][k] = len(app_col_names) - 1 type_adapter = None if isinstance(expr._column_type, types.Float): type_adapter = float elif isinstance(expr._column_type, types.Integer): type_adapter = int append_grid = [[default] * len(app_col_names) for _ in compat.irange(len(_input))] for col in columns: series = getattr(_input, col.name) for idx, v in enumerate(series): for kv_item in v.split(item_delim): k, v = kv_item.split(kv_delim) if type_adapter: v = type_adapter(v) append_grid[idx][kv_slot_map[col.name][k]] = v intact_names = [c.name for c in expr._intact] intact_types = [c.dtype for c in expr._intact] intact_df = _input[intact_names] append_df = pd.DataFrame(append_grid, columns=app_col_names) expr._schema = TableSchema.from_lists( intact_names + app_col_names, intact_types + [expr._column_type] * len(app_col_names), ) res = pd.concat([intact_df, append_df], axis=1) to_sub = CollectionExpr(_source_data=res, _schema=expr._schema) self._expr_dag.substitute(expr, to_sub) # trigger refresh of dynamic operations def func(expr): for c in traverse_until_source(expr, unique=True): if c not in self._expr_to_dag_node: c.accept(self) refresh_dynamic(to_sub, self._expr_dag, func=func) return to_sub, res self._add_node(expr, handle) def visit_union(self, expr): if expr._distinct: raise CompileError("Distinct union is not supported here.") def handle(kw): left = kw.get(expr._lhs) right = kw.get(expr._rhs) merged = pd.concat([left, right]) return merged[expr.schema.names] self._add_node(expr, handle) def visit_concat(self, expr): def handle(kw): left = kw.get(expr._lhs) right = kw.get(expr._rhs) merged = pd.concat([left, right], axis=1) return merged[expr.schema.names] self._add_node(expr, handle) def visit_append_id(self, expr): def handle(kw): _input = kw.get(expr._input) id_col = kw.get(expr._id_col) id_seq = pd.DataFrame(compat.lrange(len(_input)), columns=[id_col]) return pd.concat([id_seq, _input], axis=1) self._add_node(expr, handle) def visit_split(self, expr): def handle(kw): _input = kw.get(expr._input) frac = kw.get(expr._frac) seed = kw.get(expr._seed) if expr._seed else None split_id = kw.get(expr._split_id) if seed is not None: np.random.seed(seed) cols = list(_input.columns) factor_col = 'rand_factor_%d' % int(time.time()) factor_df = pd.DataFrame(np.random.rand(len(_input)), columns=[factor_col]) concated_df = pd.concat([factor_df, _input], axis=1) if split_id == 0: return concated_df[concated_df[factor_col] <= frac][cols] else: return concated_df[concated_df[factor_col] > frac][cols] self._add_node(expr, handle)