def _merge_joined_fields()

in odps/df/expr/merge.py [0:0]


    def _merge_joined_fields(self, merge_columns):
        if not merge_columns:
            return self

        predicate_fields = self._get_predicate_fields()
        if not predicate_fields:
            raise ValueError('No fields in predicate. Cannot merge columns.')

        src_map = self._column_origins
        rename_map = dict()
        for name, src in six.iteritems(src_map):
            src_id, src_name = src
            if src_name not in predicate_fields:
                continue
            if src_name not in rename_map:
                rename_map[src_name] = [None, None]
            rename_map[src_name][src_id] = name

        if merge_columns in ('auto', 'left', 'right') or (isinstance(merge_columns, bool) and merge_columns):
            merge_columns = dict((k, merge_columns) for k in six.iterkeys(rename_map))
        if isinstance(merge_columns, six.string_types):
            merge_columns = {merge_columns: 'auto'}
        if isinstance(merge_columns, list):
            merge_columns = dict((k, 'auto') for k in merge_columns)

        excludes = set()
        for col, action in six.iteritems(merge_columns):
            if col not in rename_map:
                raise ValueError('Column {0} not exists in join predicate.'.format(col))
            if isinstance(action, bool) and action:
                merge_columns[col] = 'auto'
            else:
                merge_columns[col] = action.lower()
            excludes.update(rename_map[col])

        selects = []
        merged = set()
        for col in self.schema.names:
            if col not in excludes:
                selects.append(self[col])
            else:
                src_name = src_map[col][1]
                if src_name in merged:
                    continue

                merged.add(src_name)

                left_name, right_name = rename_map[src_name]
                left_col = self[left_name]
                right_col = self[right_name]

                if merge_columns[src_name] == 'auto':
                    selects.append(left_col.isnull().ifelse(right_col, left_col).rename(src_name))
                elif merge_columns[src_name] == 'left':
                    selects.append(left_col.rename(src_name))
                elif merge_columns[src_name] == 'right':
                    selects.append(right_col.rename(src_name))
        selected = self.select(*selects)
        return JoinFieldMergedCollectionExpr(_input=self, _fields=selected._fields,
                                             _schema=selected._schema, _rename_map=rename_map)