in odps/ml/algolib/base_algo.py [0:0]
def _build_collection(self, input_obj_dict, schema_def, coll_type=None, **expr_kw):
coll_type = coll_type or self._collection_expr
if schema_def and schema_def.dynamic:
coll_type = type(coll_type)('Dynamic' + coll_type.__name__, (DynamicMixin, coll_type), {})
if schema_def is None:
schema_def = SchemaDef()
schema_def.copy_input = six.next(k for k, v in six.iteritems(input_obj_dict) if is_df_object(v))
kw = dict((ML_ARG_PREFIX + k, v) for k, v in six.iteritems(input_obj_dict))
kw.update(expr_kw)
if schema_def.copy_input is not None:
src_df = coll_type(register_expr=True, _schema=input_obj_dict[schema_def.copy_input]._schema, **kw)
src_df._ml_uplink = [input_obj_dict[schema_def.copy_input]]
else:
src_df = coll_type(register_expr=True, _schema=TableSchema(_columns=[]), **kw)
src_df._ml_uplink = [o for o in six.itervalues(input_obj_dict) if is_df_object(o)]
out_schemas = dict((pname, dict((f.name, f.type) for f in df._ml_fields))
for pname, df in six.iteritems(input_obj_dict) if is_df_object(df))
for model_name, model in ((nm, m) for nm, m in six.iteritems(input_obj_dict) if isinstance(m, ODPSModelExpr)):
if not getattr(model, '_model_collections', None):
continue
out_schemas.update(dict(('%s.%s' % (model_name, ds_name), dict((f.name, f.type) for f in df._ml_fields))
for ds_name, df in six.iteritems(model._model_collections)))
if schema_def.programmatic:
generator = import_class_member(schema_def.schema)
else:
generator = _static_ml_fields_generator
func_args = get_function_args(generator)
kw = OrderedDict()
if 'params' in func_args:
kw['params'] = src_df.convert_params()
if 'fields' in func_args:
kw['fields'] = out_schemas
if 'algorithm' in func_args:
kw['algorithm'] = self
if 'schema' in func_args:
kw['schema'] = schema_def.schema
op = ProgrammaticFieldChangeOperation(functools.partial(generator, **kw), schema_def.copy_input is not None)
src_df._perform_operation(op)
src_df._rebuild_df_schema(schema_def.dynamic)
return src_df