in odps/df/backends/utils.py [0:0]
def fetch_data_source_size(expr_dag, node, table):
schema = table.table_schema
if not schema.partitions:
# not partitioned table
return table.size
size = 0
for parent in expr_dag.successors(node):
if isinstance(parent, FilterPartitionCollectionExpr):
partition_predicates = parent._predicate_string.split('/')
for p in partition_predicates:
curr_size = _fetch_partition_size(p, table)
if curr_size:
size += curr_size
else:
return
elif isinstance(parent, FilterCollectionExpr):
def gen(n):
if isinstance(n, Equal):
expr = n.lhs
if isinstance(expr, Column) and is_source_partition(expr, next(expr.data_source())) \
and isinstance(n.rhs, Scalar):
return '%s=%s' % (expr.source_name, n.rhs.value)
elif isinstance(n, And):
left_partition_spec = gen(n.lhs)
right_partition_spec = gen(n.rhs)
if left_partition_spec and right_partition_spec:
partition_spec = ','.join((left_partition_spec, right_partition_spec))
elif left_partition_spec:
partition_spec = left_partition_spec
elif right_partition_spec:
partition_spec = right_partition_spec
else:
return
return partition_spec
def walk(predicate):
if isinstance(predicate, Or):
left_size = walk(predicate._lhs)
right_size = walk(predicate._rhs)
return (left_size or 0) + (right_size or 0)
elif isinstance(predicate, (And, Equal)):
partition_spec = gen(predicate)
curr_size = _fetch_partition_size(partition_spec, table) \
if partition_spec is not None else None
if curr_size is not None:
return curr_size
filter_size = walk(parent._predicate)
if not filter_size:
return
size += filter_size
return size