in core/maxframe/dataframe/reduction/core.py [0:0]
def _generate_function_idl(self, out_tileable: TileableType) -> Tuple[List, List]:
"""
Generate function IDL from tileable DAG
IDL Format: [
["in_var", "input_var_name"],
["op", "op_output_var", ["op_arg1", "op_arg2"], {"op_key1": "op_key2"}],
["out_var", "output_var_name"],
]
"""
from ...tensor.arithmetic.core import TensorBinOp, TensorUnaryOp
from ...tensor.datasource import Scalar
from ...tensor.misc import TensorWhere
from ..arithmetic.core import DataFrameBinOp, DataFrameUnaryOp
from ..datasource.dataframe import DataFrameDataSource
from ..datasource.series import SeriesDataSource
from ..indexing.where import DataFrameWhere
input_key_to_var = OrderedDict()
local_key_to_var = dict()
idl_lines = []
input_op_types = (
DataFrameDataSource,
SeriesDataSource,
DataFrameReductionOperator,
)
def _gen_expr_str(t):
# generate code for t
if t.key in local_key_to_var:
return
if isinstance(t.op, input_op_types):
# tileable is an input arg, build a function variable
if t.key not in input_key_to_var: # pragma: no branch
input_key_to_var[t.key] = local_key_to_var[
t.key
] = f"invar{len(input_key_to_var)}"
else:
for inp in t.inputs:
_gen_expr_str(inp)
var_name = local_key_to_var[t.key] = f"var{len(local_key_to_var)}"
keys_to_vars = {inp.key: local_key_to_var[inp.key] for inp in t.inputs}
def _interpret_var(v):
v = get_item_if_scalar(v)
# get representation for variables
if hasattr(v, "key"):
return keys_to_vars[v.key]
elif isinstance(v, _idl_primitive_types):
return v
else:
raise NotImplementedError(
f"Type {type(v)} currently not interpretable"
)
func_name = getattr(t.op, "_func_name", None)
if func_name is None:
func_name = getattr(t.op, "_bit_func_name", None)
# handle function name differences between numpy and pandas arithmetic ops
if func_name in _func_name_converts:
func_name = _func_name_converts[func_name]
# build given different op types
if isinstance(t.op, (DataFrameUnaryOp, TensorUnaryOp)):
val = _interpret_var(t.inputs[0])
statements = [
[UNARY_IDL_OP_PREFIX + func_name, var_name, [val], {}]
]
elif isinstance(t.op, (DataFrameBinOp, TensorBinOp)):
lhs, rhs = t.op.lhs, t.op.rhs
op_axis = (
1 - self._axis
if hasattr(lhs, "ndim")
and hasattr(rhs, "ndim")
and lhs.ndim != rhs.ndim
else None
)
lhs = _interpret_var(lhs)
rhs = _interpret_var(rhs)
axis_arg = {"axis": op_axis} if op_axis is not None else {}
statements = [
[
BINARY_IDL_OP_PREFIX + func_name,
var_name,
[lhs, rhs],
{},
axis_arg,
]
]
elif isinstance(t.op, TensorWhere):
cond = _interpret_var(t.op.condition)
x = _interpret_var(t.op.x)
y = _interpret_var(t.op.y)
statements = [[WHERE_VAR_OP, var_name, [cond, x, y], {}]]
elif isinstance(t.op, DataFrameWhere):
func_name = MASK_VAR_OP if t.op.replace_true else WHERE_VAR_OP
inp = _interpret_var(t.op.input)
cond = _interpret_var(t.op.cond)
other = _interpret_var(t.op.other)
statements = [
[
func_name,
var_name,
[cond, inp, other],
{"axis": t.op.axis, "level": t.op.level},
]
]
elif isinstance(t.op, Scalar):
# for scalar inputs of other operators
data = _interpret_var(t.op.data)
statements = [[LET_VAR_OP, var_name, [data]]]
else: # pragma: no cover
raise NotImplementedError(
f"Does not support aggregating on {type(t.op)}"
)
idl_lines.extend(statements)
_gen_expr_str(out_tileable)
input_idls = [
[IN_VAR_IDL_OP, var_name] for var_name in input_key_to_var.values()
]
output_idls = [[OUT_VAR_IDL_OP, local_key_to_var[out_tileable.key]]]
return input_idls + idl_lines + output_idls, list(input_key_to_var.keys())