def visit_composite_op()

in odps/df/backends/pd/compiler.py [0:0]


    def visit_composite_op(self, expr):
        def handle(kw):
            def _zip_args(fields):
                zip_args = []
                seq_index = None
                for it in fields:
                    if isinstance(it, SequenceExpr):
                        zip_args.append(kw[it])
                        seq_index = kw[it].index
                    else:
                        zip_args.append(itertools.repeat(kw[it]))
                return seq_index, zip_args

            children_vals = self._get_children_vals(kw, expr)
            _input = children_vals[0]
            if isinstance(expr, composites.ListDictLength):
                return _input.apply(lambda v: len(v) if v is not None else None)
            elif isinstance(expr, composites.ListDictGetItem):
                def _get_list_item(l, x):
                    try:
                        return l[x] if l is not None else None
                    except IndexError:
                        return None

                _value = children_vals[1]
                if isinstance(expr.input.dtype, df_types.List):
                    item_fun = _get_list_item
                else:
                    item_fun = lambda s, k: s.get(k) if s is not None else None
                if isinstance(expr, Scalar):
                    return item_fun(_input, _value)
                else:
                    if isinstance(expr.input, Scalar):
                        return _value.apply(lambda v: item_fun(_input, v))
                    if isinstance(expr._key, Scalar):
                        return _input.apply(lambda v: item_fun(v, _value))
                    seq_values = [item_fun(k, v) for k, v in compat.izip(_input, _value)]
                    return pd.Series(seq_values, index=_input.index, name=expr.name)
            elif isinstance(expr, composites.ListContains):
                _value = children_vals[1]
                contains_fun = lambda s, k: k in s if s is not None else None
                if isinstance(expr, Scalar):
                    return contains_fun(_input, _value)
                else:
                    if isinstance(expr.input, Scalar):
                        return _value.apply(lambda v: contains_fun(_input, v))
                    if isinstance(expr._value, Scalar):
                        return _input.apply(lambda v: contains_fun(v, _value))
                    seq_values = [contains_fun(k, v) for k, v in compat.izip(_input, _value)]
                    return pd.Series(seq_values, index=_input.index, name=expr.name)
            elif isinstance(expr, composites.ListSort):
                return _input.apply(lambda l: sorted(l) if l is not None else None)
            elif isinstance(expr, composites.DictKeys):
                return _input.apply(lambda d: list(six.iterkeys(d)) if d is not None else None)
            elif isinstance(expr, composites.DictValues):
                return _input.apply(lambda d: list(six.itervalues(d)) if d is not None else None)
            elif isinstance(expr, composites.ListBuilder):
                if isinstance(expr, Scalar):
                    return [kw[v] for v in expr._values]
                else:
                    seq_index, zip_args = _zip_args(expr._values)
                    seq_values = []
                    for r in compat.izip(*zip_args):
                        seq_values.append(list(r))
                    return pd.Series(seq_values, index=seq_index, name=expr.name)
            elif isinstance(expr, composites.DictBuilder):
                if isinstance(expr, Scalar):
                    return OrderedDict((kw[k], kw[v]) for k, v in compat.izip(expr._keys, expr._values))
                else:
                    seq_index, zip_args = _zip_args(expr._keys + expr._values)
                    seq_values = []
                    dict_len = len(expr._values)
                    for r in zip(*zip_args):
                        seq_values.append(OrderedDict((k, v) for k, v in compat.izip(r[:dict_len], r[dict_len:])))
                    return pd.Series(seq_values, index=seq_index, name=expr.name)
            else:
                raise NotImplementedError

        self._add_node(expr, handle)