def filter_parts()

in odps/df/expr/expressions.py [0:0]


    def filter_parts(self, predicate='', exclude=True):
        """
        Filter the data by partition string. A partition string looks like `pt1=1,pt2=2/pt1=2,pt2=1`, where
        comma (,) denotes 'and', while (/) denotes 'or'.

        :param str|Partition predicate: predicate string of partition filter
        :param bool exclude: True if you want to exclude partition fields, otherwise False. True for default.
        :return: new collection
        :rtype: :class:`odps.df.expr.expressions.CollectionExpr`
        """
        source = self._source_data
        if source is None:
            raise ExpressionError('Can only filter on data sources.')

        def _parse_partition_predicate(p):
            if '=' not in p:
                raise ExpressionError('Illegal partition predicate.')
            field_name, field_value = [s.strip() for s in p.split('=', 1)]
            if not hasattr(source, 'table_schema'):
                raise ExpressionError('filter_partition can only be applied on ODPS DataFrames')
            if field_name not in source.table_schema:
                raise ExpressionError('Column `%s` not exists in input collection' % field_name)
            if field_name not in source.table_schema._partition_schema:
                raise ExpressionError('`%s` is not a partition column' % field_name)
            part_col = self[field_name]
            if field_value.startswith('\'') or field_value.startswith('\"'):
                encoding = 'string-escape' if six.PY2 else 'unicode-escape'
                field_value = to_binary(field_value.strip('"\'')).decode(encoding)

            if isinstance(part_col.data_type, types.Integer):
                field_value = int(field_value)
            elif isinstance(part_col.data_type, types.Float):
                field_value = float(field_value)
            return part_col == field_value

        from ...models.partition import Partition
        from ...types import PartitionSpec

        if isinstance(predicate, Partition):
            predicate = predicate.partition_spec
        if isinstance(predicate, PartitionSpec):
            predicate = ','.join("%s='%s'" % (k, v) for k, v in six.iteritems(predicate.kv))

        if isinstance(predicate, list):
            predicate = '/'.join(str(s) for s in predicate)
        elif not isinstance(predicate, six.string_types):
            raise ExpressionError('Only accept string predicates.')

        if not predicate:
            predicate_obj = None
        else:
            part_formatter = lambda p: reduce(operator.and_, map(_parse_partition_predicate, p.split(',')))
            predicate_obj = reduce(operator.or_, map(part_formatter, predicate.split('/')))

        if not source.table_schema.partitions:
            raise ExpressionError('No partition columns in the collection.')
        if exclude:
            columns = [
                c for c in self.schema if c.name not in source.table_schema._partition_schema
            ]
            new_schema = types.TableSchema.from_lists([c.name for c in columns], [c.type for c in columns])
            return FilterPartitionCollectionExpr(self, predicate_obj, _schema=new_schema, _predicate_string=predicate)
        else:
            return self.filter(predicate_obj)