def fetch_data_source_size()

in odps/df/backends/utils.py [0:0]


def fetch_data_source_size(expr_dag, node, table):
    schema = table.table_schema

    if not schema.partitions:
        # not partitioned table
        return table.size

    size = 0
    for parent in expr_dag.successors(node):
        if isinstance(parent, FilterPartitionCollectionExpr):
            partition_predicates = parent._predicate_string.split('/')
            for p in partition_predicates:
                curr_size = _fetch_partition_size(p, table)
                if curr_size:
                    size += curr_size
                else:
                    return
        elif isinstance(parent, FilterCollectionExpr):
            def gen(n):
                if isinstance(n, Equal):
                    expr = n.lhs
                    if isinstance(expr, Column) and is_source_partition(expr, next(expr.data_source())) \
                            and isinstance(n.rhs, Scalar):
                        return '%s=%s' % (expr.source_name, n.rhs.value)
                elif isinstance(n, And):
                    left_partition_spec = gen(n.lhs)
                    right_partition_spec = gen(n.rhs)
                    if left_partition_spec and right_partition_spec:
                        partition_spec = ','.join((left_partition_spec, right_partition_spec))
                    elif left_partition_spec:
                        partition_spec = left_partition_spec
                    elif right_partition_spec:
                        partition_spec = right_partition_spec
                    else:
                        return
                    return partition_spec

            def walk(predicate):
                if isinstance(predicate, Or):
                    left_size = walk(predicate._lhs)
                    right_size = walk(predicate._rhs)
                    return (left_size or 0) + (right_size or 0)
                elif isinstance(predicate, (And, Equal)):
                    partition_spec = gen(predicate)
                    curr_size = _fetch_partition_size(partition_spec, table) \
                        if partition_spec is not None else None
                    if curr_size is not None:
                        return curr_size

            filter_size = walk(parent._predicate)
            if not filter_size:
                return
            size += filter_size

    return size