odps/ml/expr/mixin.py (392 lines of code) (raw):
# encoding: utf-8
# Copyright 1999-2022 Alibaba Group Holding Ltd.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import itertools
import uuid
from collections import namedtuple, OrderedDict
from . import op
from .core import AlgoExprMixin
from ..enums import FieldRole
from ..utils import KVConfig, MLField
from ...compat import six, reduce
from ...df import DataFrame
from ...df.backends.odpssql.types import odps_schema_to_df_schema, df_schema_to_odps_schema, \
df_type_to_odps_type
from ...df.expr.collections import Node, CollectionExpr, SequenceExpr, Column
from ...df.expr.dynamic import DynamicMixin, DynamicCollectionExpr, DynamicSchema
from ...models.table import TableSchema
try:
from collections.abc import Iterable
except ImportError:
from collections import Iterable
class DynamicDataFrame(DynamicMixin, DataFrame):
def __init__(self, *args, **kw):
DynamicMixin.__init__(self)
DataFrame.__init__(self, *args, **kw)
_project = DynamicCollectionExpr._project
def copy_df(df):
if isinstance(df, AlgoExprMixin):
new_df = df.copy(register_expr=True)
else:
new_df = df.copy()
new_df._ml_fields = df._ml_fields
new_df._ml_uplink = [df]
return new_df
def _get_field_name(field):
if isinstance(field, SequenceExpr):
return field.name
else:
return field
def _render_field_set(fields):
if isinstance(fields, six.string_types) or not isinstance(fields, Iterable):
fields = [fields, ]
fields = [f.name if isinstance(f, SequenceExpr) else f for f in fields]
field_arrays = map(lambda v: v.replace(',', ' ').split(' ') if isinstance(v, six.string_types) else v, fields)
return reduce(lambda a, b: set(a) | set(b), field_arrays, set()) - set(['', ])
def _change_singleton_roles(df, role_map, clear_feature):
new_df = copy_df(df)
new_df._perform_operation(op.SingletonRoleOperation(role_map, clear_feature))
return new_df
def _batch_change_roles(df, fields, role, augment):
new_df = copy_df(df)
new_df._perform_operation(op.BatchRoleOperation(fields, role, augment))
return new_df
class MLCollectionMixin(Node):
"""
PyODPS ML Plugin for odps.df.core.CollectionExpr. This plugin is installed automatically,
you do not need to instantiate this class manually.
"""
@property
def _ml_fields(self):
if getattr(self, '_ml_fields_cache', None) is None:
fields = OrderedDict((col.name, MLField.from_column(col))
for col in df_schema_to_odps_schema(self.schema))
to_be_fixed = dict((k, k) for k in six.iterkeys(fields))
for n in self.traverse(top_down=True, unique=True,
stop_cond=lambda it: getattr(it, '_ml_fields_cache', None) is not None):
if isinstance(n, Column):
if n.source_name != n.name and n.name in to_be_fixed:
to_be_fixed[n.source_name] = to_be_fixed[n.name]
for n in self.traverse(top_down=True, unique=True,
stop_cond=lambda it: getattr(it, '_ml_fields_cache', None) is not None):
if isinstance(n, CollectionExpr) and getattr(n, '_ml_fields_cache', None) is not None:
for col in n.columns:
if col.name not in to_be_fixed:
continue
field_obj = [f for f in n._ml_fields_cache if f.name == col.name]
fields[to_be_fixed[col.name]].role = field_obj[0].role
for f in six.itervalues(fields):
if not f.role:
f.role = set([FieldRole.FEATURE])
self._ml_fields_cache = list(six.itervalues(fields))
return self._ml_fields_cache
@_ml_fields.setter
def _ml_fields(self, value):
self._ml_fields_cache = value
def _rebuild_df_schema(self, dynamic=False):
self._schema = odps_schema_to_df_schema(
TableSchema.from_dict(OrderedDict([(f.name, f.type) for f in self._ml_fields]))
)
if dynamic:
self._schema = DynamicSchema.from_schema(self._schema)
def _perform_operation(self, op):
op.execute(self._ml_uplink, self)
self._ml_operations.append(op)
def _assert_ml_fields_valid(self, *fields):
if any(f not in self.schema for f in fields):
raise ValueError('Column not found in DataFrame.')
###################
# meta operations #
###################
def exclude_fields(self, *args):
"""
Exclude one or more fields from feature fields.
:rtype: DataFrame
"""
if not args:
raise ValueError("Field list cannot be None.")
new_df = copy_df(self)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
new_df._perform_operation(op.ExcludeFieldsOperation(fields))
return new_df
def select_features(self, *args, **kwargs):
"""
Select one or more fields as feature fields.
:rtype: DataFrame
"""
if not args:
raise ValueError("Field list cannot be empty.")
# generate selected set from args
augment = kwargs.get('add', False)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
return _batch_change_roles(self, fields, FieldRole.FEATURE, augment)
def weight_field(self, f):
"""
Select one field as the weight field.
Note that this field will be exclude from feature fields.
:param f: Selected weight field
:type f: str
:rtype: DataFrame
"""
if f is None:
raise ValueError("Field name cannot be None.")
self._assert_ml_fields_valid(f)
return _change_singleton_roles(self, {f: FieldRole.WEIGHT}, clear_feature=True)
def label_field(self, f):
"""
Select one field as the label field.
Note that this field will be exclude from feature fields.
:param f: Selected label field
:type f: str
:rtype: DataFrame
"""
if f is None:
raise ValueError("Label field name cannot be None.")
self._assert_ml_fields_valid(f)
return _change_singleton_roles(self, {_get_field_name(f): FieldRole.LABEL}, clear_feature=True)
def continuous(self, *args):
"""
Set fields to be continuous.
:rtype: DataFrame
:Example:
>>> # Table schema is create table test(f1 double, f2 string)
>>> # Original continuity: f1=DISCRETE, f2=DISCRETE
>>> # Now we want to set ``f1`` and ``f2`` into continuous
>>> new_ds = df.continuous('f1 f2')
"""
new_df = copy_df(self)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
new_df._perform_operation(op.FieldContinuityOperation(dict((_get_field_name(f), True) for f in fields)))
return new_df
def discrete(self, *args):
"""
Set fields to be discrete.
:rtype: DataFrame
:Example:
>>> # Table schema is create table test(f1 double, f2 string)
>>> # Original continuity: f1=CONTINUOUS, f2=CONTINUOUS
>>> # Now we want to set ``f1`` and ``f2`` into continuous
>>> new_ds = df.discrete('f1 f2')
"""
new_df = copy_df(self)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
new_df._perform_operation(op.FieldContinuityOperation(dict((_get_field_name(f), False) for f in fields)))
return new_df
def key_value(self, *args, **kwargs):
"""
Set fields to be key-value represented.
:rtype: DataFrame
:Example:
>>> new_ds = df.key_value('f1 f2', kv=':', item=',')
"""
new_df = copy_df(self)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
new_df._perform_operation(
op.FieldKVConfigOperation(dict((_get_field_name(f), KVConfig(**kwargs)) for f in fields)))
return new_df
def erase_key_value(self, *args):
"""
Erase key-value represented fields.
:rtype: DataFrame
:Example:
>>> new_ds = df.erase_key_value('f1 f2')
"""
new_df = copy_df(self)
fields = _render_field_set(args)
self._assert_ml_fields_valid(*fields)
new_df._perform_operation(op.FieldKVConfigOperation(dict((_get_field_name(f), None) for f in fields)))
return new_df
def roles(self, clear_features=True, **field_roles):
"""
Set roles of fields
:param clear_features: Clear feature roles on fields
:param field_roles:
:return:
"""
field_roles = dict((k, v.name if isinstance(v, SequenceExpr) else v)
for k, v in six.iteritems(field_roles))
self._assert_ml_fields_valid(*list(six.itervalues(field_roles)))
field_roles = dict((_get_field_name(f), MLField.translate_role_name(role))
for role, f in six.iteritems(field_roles))
if field_roles:
return _change_singleton_roles(self, field_roles, clear_features)
else:
return self
##########################
# simple transformations #
##########################
def split(self, frac):
"""
Split the DataFrame into two DataFrames with certain ratio.
:param frac: Split ratio
:type frac: float
:return: two split DataFrame objects
:rtype: list[DataFrame]
"""
from .. import preprocess
split_obj = getattr(preprocess, '_Split')(fraction=frac)
return split_obj.transform(self)
def append_id(self, id_col_name='append_id', cols=None):
"""
Append an ID column to current DataFrame.
:param str id_col_name: name of appended ID field.
:param str cols: fields contained in output. All fields by default.
:return: DataFrame with ID field
:rtype: DataFrame
"""
from .. import preprocess
if id_col_name in self.schema:
raise ValueError('ID column collides with existing columns.')
append_id_obj = getattr(preprocess, '_AppendID')(id_col=id_col_name, selected_cols=cols)
return append_id_obj.transform(self)
def merge_with(self, *dfs, **kwargs):
return merge_data(self, *dfs, **kwargs)
def _xflow_sample(self, columns=None, n=None, frac=None, replace=False, weights=None, strata=None,
random_state=None):
try:
import pandas as pd
except (ImportError, ValueError):
pd = None
if pd is not None and all(isinstance(df, pd.DataFrame) for df in self.data_source()):
sample_func = getattr(self, '__sample')
return sample_func(columns=columns, n=n, frac=frac, replace=replace,
weights=weights, strata=strata, random_state=random_state)
from .. import preprocess
if weights is not None:
if not isinstance(weights, (six.string_types, SequenceExpr)):
raise ValueError('weights should be the name of the weight column.')
algo_cls = getattr(preprocess, '_WeightedSample')
algo_obj = algo_cls(sample_size=n, sample_ratio=frac, prob_col=_get_field_name(weights),
replace=replace, random_seed=random_state)
elif strata is not None:
def dict_to_kv(d):
if not isinstance(d, dict):
return d
return ','.join('{0}:{1}'.format(k, v) for k, v in six.iteritems(d))
if replace:
raise ValueError('Stratified sampling with replacement is not supported.')
algo_cls = getattr(preprocess, '_StratifiedSample')
algo_obj = algo_cls(sample_size=dict_to_kv(n), sample_ratio=dict_to_kv(frac),
strata_col_name=_get_field_name(strata), random_seed=random_state)
else:
algo_cls = getattr(preprocess, '_RandomSample')
algo_obj = algo_cls(sample_size=n, sample_ratio=frac, replace=replace,
random_seed=random_state)
return algo_obj.transform(self)
class MLSequenceMixin(Node):
def _perform_operation(self, op):
op.execute(self._ml_uplink, self)
self._ml_operations.append(op)
@property
def _ml_fields(self):
if getattr(self, '_ml_fields_cache', None) is None:
if hasattr(self, 'input'):
for fobj in self.input._ml_fields:
if fobj.name == self.name:
self._ml_fields_cache = [fobj.copy()]
break
self._ml_fields_cache = [MLField(self.name, df_type_to_odps_type(self.dtype).name, FieldRole.FEATURE)]
return self._ml_fields_cache
@_ml_fields.setter
def _ml_fields(self, value):
self._ml_fields_cache = value
def continuous(self):
"""
Set sequence to be continuous.
:rtype: Column
:Example:
>>> # Table schema is create table test(f1 double, f2 string)
>>> # Original continuity: f1=DISCRETE, f2=DISCRETE
>>> # Now we want to set ``f1`` and ``f2`` into continuous
>>> new_ds = df.continuous('f1 f2')
"""
field_name = self.name
new_df = copy_df(self)
new_df._perform_operation(op.FieldContinuityOperation({field_name: True}))
return new_df
def discrete(self):
"""
Set sequence to be discrete.
:rtype: Column
:Example:
>>> # Table schema is create table test(f1 double, f2 string)
>>> # Original continuity: f1=CONTINUOUS, f2=CONTINUOUS
>>> # Now we want to set ``f1`` and ``f2`` into continuous
>>> new_ds = df.discrete('f1 f2')
"""
field_name = self.name
new_df = copy_df(self)
new_df._perform_operation(op.FieldContinuityOperation({field_name: False}))
return new_df
def key_value(self, **kwargs):
"""
Set fields to be key-value represented.
:rtype: Column
:Example:
>>> new_ds = df.key_value('f1 f2', kv=':', item=',')
"""
field_name = self.name
new_df = copy_df(self)
new_df._perform_operation(op.FieldKVConfigOperation({field_name: KVConfig(**kwargs)}))
return new_df
def erase_key_value(self):
"""
Erase key-value represented fields.
:rtype: Column
:Example:
>>> new_ds = df.erase_key_value('f1 f2')
"""
field_name = self.name
new_df = copy_df(self)
new_df._perform_operation(op.FieldKVConfigOperation({field_name: None}))
return new_df
def role(self, role_name):
"""
Set role of current column
:param role_name: name of the role to be selected.
:return:
"""
field_name = self.name
field_roles = {field_name: MLField.translate_role_name(role_name)}
if field_roles:
return _change_singleton_roles(self, field_roles, True)
else:
return self
##########################
# simple transformations #
##########################
def merge_with(self, *dfs, **kwargs):
return merge_data(self, *dfs, **kwargs)
def _xflow_split(expr, frac, seed=None):
try:
import pandas as pd
except (ImportError, ValueError):
pd = None
if pd is not None and all(isinstance(df, pd.DataFrame) for df in expr.data_source()):
split_func = getattr(expr, '_split')
return split_func(frac, seed=seed)
from .. import preprocess
split_obj = getattr(preprocess, '_Split')(fraction=frac, random_seed=seed)
return split_obj.transform(expr)
def _xflow_append_id(expr, id_col='append_id'):
try:
import pandas as pd
except (ImportError, ValueError):
pd = None
if pd is not None and all(isinstance(df, pd.DataFrame) for df in expr.data_source()):
append_id_func = getattr(expr, '_append_id')
return append_id_func(id_col)
from .. import preprocess
append_id_obj = getattr(preprocess, '_AppendID')(id_col=id_col)
return append_id_obj.transform(expr)
def _xflow_concat(left, rights):
try:
import pandas as pd
except (ImportError, ValueError):
pd = None
chained = itertools.chain((left, ), rights)
if pd is not None and all(isinstance(df, pd.DataFrame) for i in chained for df in i.data_source()):
concat_func = getattr(left, '__horz_concat')
return concat_func(rights)
return merge_data(left, *rights)
MLCollectionMixin._xflow_concat = _xflow_concat
MLCollectionMixin._xflow_append_id = _xflow_append_id
MLCollectionMixin._xflow_split = _xflow_split
def ml_collection_mixin(cls):
def setter_generator(role, clear_feature=True):
def role_setter(self, field_name):
if field_name is None:
raise ValueError("Field name cannot be None.")
if isinstance(field_name, six.string_types):
return _change_singleton_roles(self, {field_name: role}, clear_feature=clear_feature)
else:
return _batch_change_roles(self, field_name, role, False)
return role_setter
if hasattr(cls, 'field_role_enum'):
field_role_enum = cls.field_role_enum
delattr(cls, 'field_role_enum')
MLField.register_roles(field_role_enum)
if hasattr(cls, 'non_feature_roles'):
non_feature_roles = set(cls.non_feature_roles)
delattr(cls, 'non_feature_roles')
else:
non_feature_roles = set()
for role in field_role_enum:
setter = setter_generator(role, role in non_feature_roles)
setter.__name__ = '%s_field' % role.name.lower()
setattr(cls, setter.__name__, setter)
if cls not in MLCollectionMixin.__bases__:
MLCollectionMixin.__bases__ += (cls, )
return cls
def merge_data(*data_frames, **kwargs):
"""
Merge DataFrames by column. Number of rows in tables must be the same.
This method can be called both outside and as a DataFrame method.
:param list[DataFrame] data_frames: DataFrames to be merged.
:param bool auto_rename: if True, fields in source DataFrames will be renamed in the output.
:return: merged data frame.
:rtype: DataFrame
:Example:
>>> merged1 = merge_data(df1, df2)
>>> merged2 = df1.merge_with(df2, auto_rename_col=True)
"""
from .specialized import build_merge_expr
from ..utils import ML_ARG_PREFIX
if len(data_frames) <= 1:
raise ValueError('Count of DataFrames should be at least 2.')
norm_data_pairs = []
df_tuple = namedtuple('MergeTuple', 'df cols exclude')
for pair in data_frames:
if isinstance(pair, tuple):
if len(pair) == 2:
df, cols = pair
exclude = False
else:
df, cols, exclude = pair
if isinstance(cols, six.string_types):
cols = cols.split(',')
else:
df, cols, exclude = pair, None, False
norm_data_pairs.append(df_tuple(df, cols, exclude))
auto_rename = kwargs.get('auto_rename', False)
sel_cols_dict = dict((idx, tp.cols) for idx, tp in enumerate(norm_data_pairs) if tp.cols and not tp.exclude)
ex_cols_dict = dict((idx, tp.cols) for idx, tp in enumerate(norm_data_pairs) if tp.cols and tp.exclude)
merge_expr = build_merge_expr(len(norm_data_pairs))
arg_dict = dict(_params={'autoRenameCol': str(auto_rename)},
selected_cols=sel_cols_dict, excluded_cols=ex_cols_dict)
for idx, dp in enumerate(norm_data_pairs):
arg_dict[ML_ARG_PREFIX + 'input%d' % (1 + idx)] = dp.df
out_df = merge_expr(register_expr=True, _exec_id=uuid.uuid4(), _output_name='output', **arg_dict)
out_df._ml_uplink = [dp.df for dp in norm_data_pairs]
out_df._perform_operation(op.MergeFieldsOperation(auto_rename, sel_cols_dict, ex_cols_dict))
out_df._rebuild_df_schema()
return out_df
class MLSchema(TableSchema):
__slots__ = '_collection',
class MLAttrCollection(object):
def __init__(self, collection=None):
if collection is not None:
self._ml_fields = dict((f.name, f) for f in collection._ml_fields)
else:
self._ml_fields = dict()
def __getitem__(self, item):
return self._ml_fields.get(item)
def __contains__(self, item):
return item in self._ml_fields
@property
def ml_attr(self):
if hasattr(self, '_collection'):
return self.MLAttrCollection(self._collection)
else:
return self.MLAttrCollection()
def _repr(self, strip=True):
original_repr = super(MLSchema, self)._repr(strip=False)
sio = six.StringIO()
ml_attrs = self.ml_attr
for line in original_repr.splitlines():
if not line.startswith(' '):
sio.write(line + '\n')
continue
ftypes = [s.strip() for s in line.strip().rsplit(' ', 1)]
if ftypes[0].startswith('"') and ftypes[0].endswith('"'):
if isinstance(ftypes[0], bytes):
ftypes[0] = ftypes[0][1:-1].decode('unicode-escape').encode('utf-8')
else:
ftypes[0] = ftypes[0][1:-1].decode('unicode-escape')
if ftypes[0] not in ml_attrs:
sio.write(line + '\n')
continue
ml_attr = ml_attrs[ftypes[0]]
if ml_attr.kv_config:
kv_repr = ml_attr._repr_type_()
slen = len(line) - line.find(ftypes[1])
if len(kv_repr) > slen:
line = line.replace(ftypes[1] + ' ' * (slen - len(ftypes[1])), kv_repr)
elif len(kv_repr) > len(ftypes[1]):
line = line.replace(ftypes[1] + ' ' * (len(kv_repr) - len(ftypes[1])), kv_repr)
else:
line = line.replace(ftypes[1], kv_repr + ' ' * (len(ftypes[1]) - len(kv_repr)))
[sio.write(s) for s in (line, ' ', ml_attr._repr_role_(), '\n')]
return sio.getvalue()
def collection_dtypes_wrapper(self):
if getattr(self, '_ml_fields_cache', None) is None:
return self.schema
else:
schema = MLSchema(columns=self.schema.columns, partitions=self.schema.partitions)
schema._collection = self
return schema
def install_mixin():
CollectionExpr.__bases__ += (MLCollectionMixin, )
SequenceExpr.__bases__ += (MLSequenceMixin, )
CollectionExpr.dtypes = property(fget=collection_dtypes_wrapper)
install_mixin()