in odps/df/backends/pd/compiler.py [0:0]
def visit_composite_op(self, expr):
def handle(kw):
def _zip_args(fields):
zip_args = []
seq_index = None
for it in fields:
if isinstance(it, SequenceExpr):
zip_args.append(kw[it])
seq_index = kw[it].index
else:
zip_args.append(itertools.repeat(kw[it]))
return seq_index, zip_args
children_vals = self._get_children_vals(kw, expr)
_input = children_vals[0]
if isinstance(expr, composites.ListDictLength):
return _input.apply(lambda v: len(v) if v is not None else None)
elif isinstance(expr, composites.ListDictGetItem):
def _get_list_item(l, x):
try:
return l[x] if l is not None else None
except IndexError:
return None
_value = children_vals[1]
if isinstance(expr.input.dtype, df_types.List):
item_fun = _get_list_item
else:
item_fun = lambda s, k: s.get(k) if s is not None else None
if isinstance(expr, Scalar):
return item_fun(_input, _value)
else:
if isinstance(expr.input, Scalar):
return _value.apply(lambda v: item_fun(_input, v))
if isinstance(expr._key, Scalar):
return _input.apply(lambda v: item_fun(v, _value))
seq_values = [item_fun(k, v) for k, v in compat.izip(_input, _value)]
return pd.Series(seq_values, index=_input.index, name=expr.name)
elif isinstance(expr, composites.ListContains):
_value = children_vals[1]
contains_fun = lambda s, k: k in s if s is not None else None
if isinstance(expr, Scalar):
return contains_fun(_input, _value)
else:
if isinstance(expr.input, Scalar):
return _value.apply(lambda v: contains_fun(_input, v))
if isinstance(expr._value, Scalar):
return _input.apply(lambda v: contains_fun(v, _value))
seq_values = [contains_fun(k, v) for k, v in compat.izip(_input, _value)]
return pd.Series(seq_values, index=_input.index, name=expr.name)
elif isinstance(expr, composites.ListSort):
return _input.apply(lambda l: sorted(l) if l is not None else None)
elif isinstance(expr, composites.DictKeys):
return _input.apply(lambda d: list(six.iterkeys(d)) if d is not None else None)
elif isinstance(expr, composites.DictValues):
return _input.apply(lambda d: list(six.itervalues(d)) if d is not None else None)
elif isinstance(expr, composites.ListBuilder):
if isinstance(expr, Scalar):
return [kw[v] for v in expr._values]
else:
seq_index, zip_args = _zip_args(expr._values)
seq_values = []
for r in compat.izip(*zip_args):
seq_values.append(list(r))
return pd.Series(seq_values, index=seq_index, name=expr.name)
elif isinstance(expr, composites.DictBuilder):
if isinstance(expr, Scalar):
return OrderedDict((kw[k], kw[v]) for k, v in compat.izip(expr._keys, expr._values))
else:
seq_index, zip_args = _zip_args(expr._keys + expr._values)
seq_values = []
dict_len = len(expr._values)
for r in zip(*zip_args):
seq_values.append(OrderedDict((k, v) for k, v in compat.izip(r[:dict_len], r[dict_len:])))
return pd.Series(seq_values, index=seq_index, name=expr.name)
else:
raise NotImplementedError
self._add_node(expr, handle)