in odps/df/expr/expressions.py [0:0]
def filter_parts(self, predicate='', exclude=True):
"""
Filter the data by partition string. A partition string looks like `pt1=1,pt2=2/pt1=2,pt2=1`, where
comma (,) denotes 'and', while (/) denotes 'or'.
:param str|Partition predicate: predicate string of partition filter
:param bool exclude: True if you want to exclude partition fields, otherwise False. True for default.
:return: new collection
:rtype: :class:`odps.df.expr.expressions.CollectionExpr`
"""
source = self._source_data
if source is None:
raise ExpressionError('Can only filter on data sources.')
def _parse_partition_predicate(p):
if '=' not in p:
raise ExpressionError('Illegal partition predicate.')
field_name, field_value = [s.strip() for s in p.split('=', 1)]
if not hasattr(source, 'table_schema'):
raise ExpressionError('filter_partition can only be applied on ODPS DataFrames')
if field_name not in source.table_schema:
raise ExpressionError('Column `%s` not exists in input collection' % field_name)
if field_name not in source.table_schema._partition_schema:
raise ExpressionError('`%s` is not a partition column' % field_name)
part_col = self[field_name]
if field_value.startswith('\'') or field_value.startswith('\"'):
encoding = 'string-escape' if six.PY2 else 'unicode-escape'
field_value = to_binary(field_value.strip('"\'')).decode(encoding)
if isinstance(part_col.data_type, types.Integer):
field_value = int(field_value)
elif isinstance(part_col.data_type, types.Float):
field_value = float(field_value)
return part_col == field_value
from ...models.partition import Partition
from ...types import PartitionSpec
if isinstance(predicate, Partition):
predicate = predicate.partition_spec
if isinstance(predicate, PartitionSpec):
predicate = ','.join("%s='%s'" % (k, v) for k, v in six.iteritems(predicate.kv))
if isinstance(predicate, list):
predicate = '/'.join(str(s) for s in predicate)
elif not isinstance(predicate, six.string_types):
raise ExpressionError('Only accept string predicates.')
if not predicate:
predicate_obj = None
else:
part_formatter = lambda p: reduce(operator.and_, map(_parse_partition_predicate, p.split(',')))
predicate_obj = reduce(operator.or_, map(part_formatter, predicate.split('/')))
if not source.table_schema.partitions:
raise ExpressionError('No partition columns in the collection.')
if exclude:
columns = [
c for c in self.schema if c.name not in source.table_schema._partition_schema
]
new_schema = types.TableSchema.from_lists([c.name for c in columns], [c.type for c in columns])
return FilterPartitionCollectionExpr(self, predicate_obj, _schema=new_schema, _predicate_string=predicate)
else:
return self.filter(predicate_obj)