def _build_collection()

in odps/ml/algolib/base_algo.py [0:0]


    def _build_collection(self, input_obj_dict, schema_def, coll_type=None, **expr_kw):
        coll_type = coll_type or self._collection_expr
        if schema_def and schema_def.dynamic:
            coll_type = type(coll_type)('Dynamic' + coll_type.__name__, (DynamicMixin, coll_type), {})

        if schema_def is None:
            schema_def = SchemaDef()
            schema_def.copy_input = six.next(k for k, v in six.iteritems(input_obj_dict) if is_df_object(v))

        kw = dict((ML_ARG_PREFIX + k, v) for k, v in six.iteritems(input_obj_dict))
        kw.update(expr_kw)
        if schema_def.copy_input is not None:
            src_df = coll_type(register_expr=True, _schema=input_obj_dict[schema_def.copy_input]._schema, **kw)
            src_df._ml_uplink = [input_obj_dict[schema_def.copy_input]]
        else:
            src_df = coll_type(register_expr=True, _schema=TableSchema(_columns=[]), **kw)
            src_df._ml_uplink = [o for o in six.itervalues(input_obj_dict) if is_df_object(o)]

        out_schemas = dict((pname, dict((f.name, f.type) for f in df._ml_fields))
                           for pname, df in six.iteritems(input_obj_dict) if is_df_object(df))
        for model_name, model in ((nm, m) for nm, m in six.iteritems(input_obj_dict) if isinstance(m, ODPSModelExpr)):
            if not getattr(model, '_model_collections', None):
                continue
            out_schemas.update(dict(('%s.%s' % (model_name, ds_name), dict((f.name, f.type) for f in df._ml_fields))
                                    for ds_name, df in six.iteritems(model._model_collections)))

        if schema_def.programmatic:
            generator = import_class_member(schema_def.schema)
        else:
            generator = _static_ml_fields_generator
        func_args = get_function_args(generator)

        kw = OrderedDict()
        if 'params' in func_args:
            kw['params'] = src_df.convert_params()
        if 'fields' in func_args:
            kw['fields'] = out_schemas
        if 'algorithm' in func_args:
            kw['algorithm'] = self
        if 'schema' in func_args:
            kw['schema'] = schema_def.schema

        op = ProgrammaticFieldChangeOperation(functools.partial(generator, **kw), schema_def.copy_input is not None)
        src_df._perform_operation(op)
        src_df._rebuild_df_schema(schema_def.dynamic)
        return src_df