# Licensed to the Apache Software Foundation (ASF) under one
# or more contributor license agreements.  See the NOTICE file
# distributed with this work for additional information
# regarding copyright ownership.  The ASF licenses this file
# to you under the Apache License, Version 2.0 (the
# "License"); you may not use this file except in compliance
# with the License.  You may obtain a copy of the License at
#
#   http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing,
# software distributed under the License is distributed on an
# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
# KIND, either express or implied.  See the License for the
# specific language governing permissions and limitations
# under the License.
import math
from abc import ABC, abstractmethod
from functools import singledispatch
from typing import (
    Any,
    Callable,
    Dict,
    Generic,
    List,
    Set,
    SupportsFloat,
    Tuple,
    TypeVar,
    Union,
)

from pyiceberg.conversions import from_bytes
from pyiceberg.expressions import (
    AlwaysFalse,
    AlwaysTrue,
    And,
    BooleanExpression,
    BoundEqualTo,
    BoundGreaterThan,
    BoundGreaterThanOrEqual,
    BoundIn,
    BoundIsNaN,
    BoundIsNull,
    BoundLessThan,
    BoundLessThanOrEqual,
    BoundLiteralPredicate,
    BoundNotEqualTo,
    BoundNotIn,
    BoundNotNaN,
    BoundNotNull,
    BoundNotStartsWith,
    BoundPredicate,
    BoundSetPredicate,
    BoundStartsWith,
    BoundTerm,
    BoundUnaryPredicate,
    Not,
    Or,
    UnboundPredicate,
)
from pyiceberg.expressions.literals import Literal
from pyiceberg.manifest import DataFile, ManifestFile, PartitionFieldSummary
from pyiceberg.partitioning import UNPARTITIONED_PARTITION_SPEC, PartitionSpec
from pyiceberg.schema import Schema
from pyiceberg.typedef import EMPTY_DICT, L, Record, StructProtocol
from pyiceberg.types import (
    DoubleType,
    FloatType,
    IcebergType,
    NestedField,
    PrimitiveType,
    StructType,
    TimestampType,
    TimestamptzType,
)
from pyiceberg.utils.datetime import micros_to_timestamp, micros_to_timestamptz

T = TypeVar("T")


class BooleanExpressionVisitor(Generic[T], ABC):
    @abstractmethod
    def visit_true(self) -> T:
        """Visit method for an AlwaysTrue boolean expression.

        Note: This visit method has no arguments since AlwaysTrue instances have no context.
        """

    @abstractmethod
    def visit_false(self) -> T:
        """Visit method for an AlwaysFalse boolean expression.

        Note: This visit method has no arguments since AlwaysFalse instances have no context.
        """

    @abstractmethod
    def visit_not(self, child_result: T) -> T:
        """Visit method for a Not boolean expression.

        Args:
            child_result (T): The result of visiting the child of the Not boolean expression.
        """

    @abstractmethod
    def visit_and(self, left_result: T, right_result: T) -> T:
        """Visit method for an And boolean expression.

        Args:
            left_result (T): The result of visiting the left side of the expression.
            right_result (T): The result of visiting the right side of the expression.
        """

    @abstractmethod
    def visit_or(self, left_result: T, right_result: T) -> T:
        """Visit method for an Or boolean expression.

        Args:
            left_result (T): The result of visiting the left side of the expression.
            right_result (T): The result of visiting the right side of the expression.
        """

    @abstractmethod
    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
        """Visit method for an unbound predicate in an expression tree.

        Args:
            predicate (UnboundPredicate[L): An instance of an UnboundPredicate.
        """

    @abstractmethod
    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
        """Visit method for a bound predicate in an expression tree.

        Args:
            predicate (BoundPredicate[L]): An instance of a BoundPredicate.
        """


@singledispatch
def visit(obj: BooleanExpression, visitor: BooleanExpressionVisitor[T]) -> T:
    """Apply a boolean expression visitor to any point within an expression.

    The function traverses the expression in post-order fashion.

    Args:
        obj (BooleanExpression): An instance of a BooleanExpression.
        visitor (BooleanExpressionVisitor[T]): An instance of an implementation of the generic BooleanExpressionVisitor base class.

    Raises:
        NotImplementedError: If attempting to visit an unsupported expression.
    """
    raise NotImplementedError(f"Cannot visit unsupported expression: {obj}")


@visit.register(AlwaysTrue)
def _(_: AlwaysTrue, visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit an AlwaysTrue boolean expression with a concrete BooleanExpressionVisitor."""
    return visitor.visit_true()


@visit.register(AlwaysFalse)
def _(_: AlwaysFalse, visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit an AlwaysFalse boolean expression with a concrete BooleanExpressionVisitor."""
    return visitor.visit_false()


@visit.register(Not)
def _(obj: Not, visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit a Not boolean expression with a concrete BooleanExpressionVisitor."""
    child_result: T = visit(obj.child, visitor=visitor)
    return visitor.visit_not(child_result=child_result)


@visit.register(And)
def _(obj: And, visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit an And boolean expression with a concrete BooleanExpressionVisitor."""
    left_result: T = visit(obj.left, visitor=visitor)
    right_result: T = visit(obj.right, visitor=visitor)
    return visitor.visit_and(left_result=left_result, right_result=right_result)


@visit.register(UnboundPredicate)
def _(obj: UnboundPredicate[L], visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit an unbound boolean expression with a concrete BooleanExpressionVisitor."""
    return visitor.visit_unbound_predicate(predicate=obj)


@visit.register(BoundPredicate)
def _(obj: BoundPredicate[L], visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit a bound boolean expression with a concrete BooleanExpressionVisitor."""
    return visitor.visit_bound_predicate(predicate=obj)


@visit.register(Or)
def _(obj: Or, visitor: BooleanExpressionVisitor[T]) -> T:
    """Visit an Or boolean expression with a concrete BooleanExpressionVisitor."""
    left_result: T = visit(obj.left, visitor=visitor)
    right_result: T = visit(obj.right, visitor=visitor)
    return visitor.visit_or(left_result=left_result, right_result=right_result)


def bind(schema: Schema, expression: BooleanExpression, case_sensitive: bool) -> BooleanExpression:
    """Travers over an expression to bind the predicates to the schema.

    Args:
      schema (Schema): A schema to use when binding the expression.
      expression (BooleanExpression): An expression containing UnboundPredicates that can be bound.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case a predicate is already bound.
    """
    return visit(expression, BindVisitor(schema, case_sensitive))


class BindVisitor(BooleanExpressionVisitor[BooleanExpression]):
    """Rewrites a boolean expression by replacing unbound references with references to fields in a struct schema.

    Args:
      schema (Schema): A schema to use when binding the expression.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case a predicate is already bound.
    """

    schema: Schema
    case_sensitive: bool

    def __init__(self, schema: Schema, case_sensitive: bool) -> None:
        self.schema = schema
        self.case_sensitive = case_sensitive

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child=child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        return predicate.bind(self.schema, case_sensitive=self.case_sensitive)

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        raise TypeError(f"Found already bound predicate: {predicate}")


class BoundBooleanExpressionVisitor(BooleanExpressionVisitor[T], ABC):
    @abstractmethod
    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
        """Visit a bound In predicate."""

    @abstractmethod
    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> T:
        """Visit a bound NotIn predicate."""

    @abstractmethod
    def visit_is_nan(self, term: BoundTerm[L]) -> T:
        """Visit a bound IsNan predicate."""

    @abstractmethod
    def visit_not_nan(self, term: BoundTerm[L]) -> T:
        """Visit a bound NotNan predicate."""

    @abstractmethod
    def visit_is_null(self, term: BoundTerm[L]) -> T:
        """Visit a bound IsNull predicate."""

    @abstractmethod
    def visit_not_null(self, term: BoundTerm[L]) -> T:
        """Visit a bound NotNull predicate."""

    @abstractmethod
    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound Equal predicate."""

    @abstractmethod
    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound NotEqual predicate."""

    @abstractmethod
    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound GreaterThanOrEqual predicate."""

    @abstractmethod
    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound GreaterThan predicate."""

    @abstractmethod
    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound LessThan predicate."""

    @abstractmethod
    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit a bound LessThanOrEqual predicate."""

    @abstractmethod
    def visit_true(self) -> T:
        """Visit a bound True predicate."""

    @abstractmethod
    def visit_false(self) -> T:
        """Visit a bound False predicate."""

    @abstractmethod
    def visit_not(self, child_result: T) -> T:
        """Visit a bound Not predicate."""

    @abstractmethod
    def visit_and(self, left_result: T, right_result: T) -> T:
        """Visit a bound And predicate."""

    @abstractmethod
    def visit_or(self, left_result: T, right_result: T) -> T:
        """Visit a bound Or predicate."""

    @abstractmethod
    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit bound StartsWith predicate."""

    @abstractmethod
    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> T:
        """Visit bound NotStartsWith predicate."""

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> T:
        """Visit an unbound predicate.

        Args:
            predicate (UnboundPredicate[L]): An unbound predicate.
        Raises:
            TypeError: This always raises since an unbound predicate is not expected in a bound boolean expression.
        """
        raise TypeError(f"Not a bound predicate: {predicate}")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> T:
        """Visit a bound predicate.

        Args:
            predicate (BoundPredicate[L]): A bound predicate.
        """
        return visit_bound_predicate(predicate, self)


@singledispatch
def visit_bound_predicate(expr: BoundPredicate[L], _: BooleanExpressionVisitor[T]) -> T:
    raise TypeError(f"Unknown predicate: {expr}")


@visit_bound_predicate.register(BoundIn)
def _(expr: BoundIn[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_in(term=expr.term, literals=expr.value_set)


@visit_bound_predicate.register(BoundNotIn)
def _(expr: BoundNotIn[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_not_in(term=expr.term, literals=expr.value_set)


@visit_bound_predicate.register(BoundIsNaN)
def _(expr: BoundIsNaN[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_is_nan(term=expr.term)


@visit_bound_predicate.register(BoundNotNaN)
def _(expr: BoundNotNaN[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_not_nan(term=expr.term)


@visit_bound_predicate.register(BoundIsNull)
def _(expr: BoundIsNull[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_is_null(term=expr.term)


@visit_bound_predicate.register(BoundNotNull)
def _(expr: BoundNotNull[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_not_null(term=expr.term)


@visit_bound_predicate.register(BoundEqualTo)
def _(expr: BoundEqualTo[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_equal(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundNotEqualTo)
def _(expr: BoundNotEqualTo[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_not_equal(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundGreaterThanOrEqual)
def _(expr: BoundGreaterThanOrEqual[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    """Visit a bound GreaterThanOrEqual predicate."""
    return visitor.visit_greater_than_or_equal(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundGreaterThan)
def _(expr: BoundGreaterThan[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_greater_than(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundLessThan)
def _(expr: BoundLessThan[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_less_than(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundLessThanOrEqual)
def _(expr: BoundLessThanOrEqual[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_less_than_or_equal(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundStartsWith)
def _(expr: BoundStartsWith[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_starts_with(term=expr.term, literal=expr.literal)


@visit_bound_predicate.register(BoundNotStartsWith)
def _(expr: BoundNotStartsWith[L], visitor: BoundBooleanExpressionVisitor[T]) -> T:
    return visitor.visit_not_starts_with(term=expr.term, literal=expr.literal)


def rewrite_not(expr: BooleanExpression) -> BooleanExpression:
    return visit(expr, _RewriteNotVisitor())


class _RewriteNotVisitor(BooleanExpressionVisitor[BooleanExpression]):
    """Inverts the negations."""

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return ~child_result

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        return predicate

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        return predicate


def expression_evaluator(schema: Schema, unbound: BooleanExpression, case_sensitive: bool) -> Callable[[StructProtocol], bool]:
    return _ExpressionEvaluator(schema, unbound, case_sensitive).eval


class _ExpressionEvaluator(BoundBooleanExpressionVisitor[bool]):
    bound: BooleanExpression
    struct: StructProtocol

    def __init__(self, schema: Schema, unbound: BooleanExpression, case_sensitive: bool):
        self.bound = bind(schema, unbound, case_sensitive)

    def eval(self, struct: StructProtocol) -> bool:
        self.struct = struct
        return visit(self.bound, self)

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        return term.eval(self.struct) in literals

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        return term.eval(self.struct) not in literals

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        val = term.eval(self.struct)
        return val != val

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        val = term.eval(self.struct)
        return val == val

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        return term.eval(self.struct) is None

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        return term.eval(self.struct) is not None

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return term.eval(self.struct) == literal.value

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return term.eval(self.struct) != literal.value

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        value = term.eval(self.struct)
        return value is not None and value >= literal.value

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        value = term.eval(self.struct)
        return value is not None and value > literal.value

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        value = term.eval(self.struct)
        return value is not None and value < literal.value

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        value = term.eval(self.struct)
        return value is not None and value <= literal.value

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        eval_res = term.eval(self.struct)
        return eval_res is not None and str(eval_res).startswith(str(literal.value))

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return not self.visit_starts_with(term, literal)

    def visit_true(self) -> bool:
        return True

    def visit_false(self) -> bool:
        return False

    def visit_not(self, child_result: bool) -> bool:
        return not child_result

    def visit_and(self, left_result: bool, right_result: bool) -> bool:
        return left_result and right_result

    def visit_or(self, left_result: bool, right_result: bool) -> bool:
        return left_result or right_result


ROWS_MIGHT_MATCH = True
ROWS_MUST_MATCH = True
ROWS_CANNOT_MATCH = False
ROWS_MIGHT_NOT_MATCH = False
IN_PREDICATE_LIMIT = 200


def _from_byte_buffer(field_type: IcebergType, val: bytes) -> Any:
    if not isinstance(field_type, PrimitiveType):
        raise ValueError(f"Expected a PrimitiveType, got: {type(field_type)}")
    return from_bytes(field_type, val)


class _ManifestEvalVisitor(BoundBooleanExpressionVisitor[bool]):
    partition_fields: List[PartitionFieldSummary]
    partition_filter: BooleanExpression

    def __init__(self, partition_struct_schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool) -> None:
        self.partition_filter = bind(partition_struct_schema, rewrite_not(partition_filter), case_sensitive)

    def eval(self, manifest: ManifestFile) -> bool:
        if partitions := manifest.partitions:
            self.partition_fields = partitions
            return visit(self.partition_filter, self)

        # No partition information
        return ROWS_MIGHT_MATCH

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.lower_bound is None:
            return ROWS_CANNOT_MATCH

        if len(literals) > IN_PREDICATE_LIMIT:
            return ROWS_MIGHT_MATCH

        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)

        if all(lower > val for val in literals):
            return ROWS_CANNOT_MATCH

        if field.upper_bound is not None:
            upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
            if all(upper < val for val in literals):
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        # because the bounds are not necessarily a min or max value, this cannot be answered using
        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
        return ROWS_MIGHT_MATCH

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.contains_nan is False:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.contains_nan is True and field.contains_null is False and field.lower_bound is None:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        pos = term.ref().accessor.position

        if self.partition_fields[pos].contains_null is False:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        pos = term.ref().accessor.position

        # contains_null encodes whether at least one partition value is null,
        # lowerBound is null if all partition values are null
        all_null = self.partition_fields[pos].contains_null is True and self.partition_fields[pos].lower_bound is None

        if all_null and isinstance(term.ref().field.field_type, (DoubleType, FloatType)):
            # floating point types may include NaN values, which we check separately.
            # In case bounds don't include NaN value, contains_nan needs to be checked against.
            all_null = self.partition_fields[pos].contains_nan is False

        if all_null:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.lower_bound is None or field.upper_bound is None:
            # values are all null and literal cannot contain null
            return ROWS_CANNOT_MATCH

        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)

        if lower > literal.value:
            return ROWS_CANNOT_MATCH

        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)

        if literal.value > upper:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # because the bounds are not necessarily a min or max value, this cannot be answered using
        # them. notEq(col, X) with (X, Y) doesn't guarantee that X is a value in col.
        return ROWS_MIGHT_MATCH

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.upper_bound is None:
            return ROWS_CANNOT_MATCH

        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)

        if literal.value > upper:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.upper_bound is None:
            return ROWS_CANNOT_MATCH

        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)

        if literal.value >= upper:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.lower_bound is None:
            return ROWS_CANNOT_MATCH

        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)

        if literal.value <= lower:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]

        if field.lower_bound is None:
            return ROWS_CANNOT_MATCH

        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)

        if literal.value < lower:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]
        prefix = str(literal.value)
        len_prefix = len(prefix)

        if field.lower_bound is None:
            return ROWS_CANNOT_MATCH

        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
        # truncate lower bound so that its length is not greater than the length of prefix
        if lower is not None and lower[:len_prefix] > prefix:
            return ROWS_CANNOT_MATCH

        if field.upper_bound is None:
            return ROWS_CANNOT_MATCH

        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)
        # truncate upper bound so that its length is not greater than the length of prefix
        if upper is not None and upper[:len_prefix] < prefix:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        pos = term.ref().accessor.position
        field = self.partition_fields[pos]
        prefix = str(literal.value)
        len_prefix = len(prefix)

        if field.contains_null or field.lower_bound is None or field.upper_bound is None:
            return ROWS_MIGHT_MATCH

        # not_starts_with will match unless all values must start with the prefix. This happens when
        # the lower and upper bounds both start with the prefix.
        lower = _from_byte_buffer(term.ref().field.field_type, field.lower_bound)
        upper = _from_byte_buffer(term.ref().field.field_type, field.upper_bound)

        if lower is not None and upper is not None:
            # if lower is shorter than the prefix then lower doesn't start with the prefix
            if len(lower) < len_prefix:
                return ROWS_MIGHT_MATCH

            if lower[:len_prefix] == prefix:
                # if upper is shorter than the prefix then upper can't start with the prefix
                if len(upper) < len_prefix:
                    return ROWS_MIGHT_MATCH

                if upper[:len_prefix] == prefix:
                    return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_true(self) -> bool:
        return ROWS_MIGHT_MATCH

    def visit_false(self) -> bool:
        return ROWS_CANNOT_MATCH

    def visit_not(self, child_result: bool) -> bool:
        return not child_result

    def visit_and(self, left_result: bool, right_result: bool) -> bool:
        return left_result and right_result

    def visit_or(self, left_result: bool, right_result: bool) -> bool:
        return left_result or right_result


def manifest_evaluator(
    partition_spec: PartitionSpec, schema: Schema, partition_filter: BooleanExpression, case_sensitive: bool = True
) -> Callable[[ManifestFile], bool]:
    partition_type = partition_spec.partition_type(schema)
    partition_schema = Schema(*partition_type.fields)
    evaluator = _ManifestEvalVisitor(partition_schema, partition_filter, case_sensitive)
    return evaluator.eval


class ProjectionEvaluator(BooleanExpressionVisitor[BooleanExpression], ABC):
    schema: Schema
    spec: PartitionSpec
    case_sensitive: bool

    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool):
        self.schema = schema
        self.spec = spec
        self.case_sensitive = case_sensitive

    def project(self, expr: BooleanExpression) -> BooleanExpression:
        #  projections assume that there are no NOT nodes in the expression tree. to ensure that this
        #  is the case, the expression is rewritten to push all NOT nodes down to the expression
        #  leaf nodes.
        #  this is necessary to ensure that the default expression returned when a predicate can't be
        #  projected is correct.
        return visit(bind(self.schema, rewrite_not(expr), self.case_sensitive), self)

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        raise ValueError(f"Cannot project not expression, should be rewritten: {child_result}")

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left_result, right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left_result, right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        raise ValueError(f"Cannot project unbound predicate: {predicate}")


class InclusiveProjection(ProjectionEvaluator):
    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
        parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)

        result: BooleanExpression = AlwaysTrue()
        for part in parts:
            # consider (d = 2019-01-01) with bucket(7, d) and bucket(5, d)
            # projections: b1 = bucket(7, '2019-01-01') = 5, b2 = bucket(5, '2019-01-01') = 0
            # any value where b1 != 5 or any value where b2 != 0 cannot be the '2019-01-01'
            #
            # similarly, if partitioning by day(ts) and hour(ts), the more restrictive
            # projection should be used. ts = 2019-01-01T01:00:00 produces day=2019-01-01 and
            # hour=2019-01-01-01. the value will be in 2019-01-01-01 and not in 2019-01-01-02.
            incl_projection = part.transform.project(name=part.name, pred=predicate)
            if incl_projection is not None:
                result = And(result, incl_projection)

        return result


def inclusive_projection(
    schema: Schema, spec: PartitionSpec, case_sensitive: bool = True
) -> Callable[[BooleanExpression], BooleanExpression]:
    return InclusiveProjection(schema, spec, case_sensitive).project


class _ColumnNameTranslator(BooleanExpressionVisitor[BooleanExpression]):
    """Converts the column names with the ones in the actual file.

    Args:
      file_schema (Schema): The schema of the file.
      case_sensitive (bool): Whether to consider case when binding a reference to a field in a schema, defaults to True.

    Raises:
        TypeError: In the case of an UnboundPredicate.
        ValueError: When a column name cannot be found.
    """

    file_schema: Schema
    case_sensitive: bool

    def __init__(self, file_schema: Schema, case_sensitive: bool) -> None:
        self.file_schema = file_schema
        self.case_sensitive = case_sensitive

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child=child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left=left_result, right=right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left=left_result, right=right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        raise TypeError(f"Expected Bound Predicate, got: {predicate.term}")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> BooleanExpression:
        file_column_name = self.file_schema.find_column_name(predicate.term.ref().field.field_id)

        if file_column_name is None:
            # In the case of schema evolution, the column might not be present
            # in the file schema when reading older data
            if isinstance(predicate, BoundIsNull):
                return AlwaysTrue()
            else:
                return AlwaysFalse()

        if isinstance(predicate, BoundUnaryPredicate):
            return predicate.as_unbound(file_column_name)
        elif isinstance(predicate, BoundLiteralPredicate):
            return predicate.as_unbound(file_column_name, predicate.literal)
        elif isinstance(predicate, BoundSetPredicate):
            return predicate.as_unbound(file_column_name, predicate.literals)
        else:
            raise ValueError(f"Unsupported predicate: {predicate}")


def translate_column_names(expr: BooleanExpression, file_schema: Schema, case_sensitive: bool) -> BooleanExpression:
    return visit(expr, _ColumnNameTranslator(file_schema, case_sensitive))


class _ExpressionFieldIDs(BooleanExpressionVisitor[Set[int]]):
    """Extracts the field IDs used in the BooleanExpression."""

    def visit_true(self) -> Set[int]:
        return set()

    def visit_false(self) -> Set[int]:
        return set()

    def visit_not(self, child_result: Set[int]) -> Set[int]:
        return child_result

    def visit_and(self, left_result: Set[int], right_result: Set[int]) -> Set[int]:
        return left_result.union(right_result)

    def visit_or(self, left_result: Set[int], right_result: Set[int]) -> Set[int]:
        return left_result.union(right_result)

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> Set[int]:
        raise ValueError("Only works on bound records")

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> Set[int]:
        return {predicate.term.ref().field.field_id}


def extract_field_ids(expr: BooleanExpression) -> Set[int]:
    return visit(expr, _ExpressionFieldIDs())


class _RewriteToDNF(BooleanExpressionVisitor[Tuple[BooleanExpression, ...]]):
    def visit_true(self) -> Tuple[BooleanExpression, ...]:
        return (AlwaysTrue(),)

    def visit_false(self) -> Tuple[BooleanExpression, ...]:
        return (AlwaysFalse(),)

    def visit_not(self, child_result: Tuple[BooleanExpression, ...]) -> Tuple[BooleanExpression, ...]:
        raise ValueError(f"Not expressions are not allowed: {child_result}")

    def visit_and(
        self, left_result: Tuple[BooleanExpression, ...], right_result: Tuple[BooleanExpression, ...]
    ) -> Tuple[BooleanExpression, ...]:
        # Distributive law:
        # ((P OR Q) AND (R OR S)) AND (((P AND R) OR (P AND S)) OR ((Q AND R) OR ((Q AND S)))
        # A AND (B OR C) = (A AND B) OR (A AND C)
        # (A OR B) AND C = (A AND C) OR (B AND C)
        return tuple(And(le, re) for le in left_result for re in right_result)

    def visit_or(
        self, left_result: Tuple[BooleanExpression, ...], right_result: Tuple[BooleanExpression, ...]
    ) -> Tuple[BooleanExpression, ...]:
        return left_result + right_result

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> Tuple[BooleanExpression, ...]:
        return (predicate,)

    def visit_bound_predicate(self, predicate: BoundPredicate[L]) -> Tuple[BooleanExpression, ...]:
        return (predicate,)


def rewrite_to_dnf(expr: BooleanExpression) -> Tuple[BooleanExpression, ...]:
    # Rewrites an arbitrary boolean expression to disjunctive normal form (DNF):
    # (A AND NOT(B) AND C) OR (NOT(D) AND E AND F) OR (G)
    expr_without_not = rewrite_not(expr)
    return visit(expr_without_not, _RewriteToDNF())


class ExpressionToPlainFormat(BoundBooleanExpressionVisitor[List[Tuple[str, str, Any]]]):
    cast_int_to_date: bool

    def __init__(self, cast_int_to_date: bool = False) -> None:
        self.cast_int_to_date = cast_int_to_date

    def _cast_if_necessary(self, iceberg_type: IcebergType, literal: Union[L, Set[L]]) -> Union[L, Set[L]]:
        if self.cast_int_to_date:
            iceberg_type_class = type(iceberg_type)
            conversions = {TimestampType: micros_to_timestamp, TimestamptzType: micros_to_timestamptz}
            if iceberg_type_class in conversions:
                conversion_function = conversions[iceberg_type_class]
                if isinstance(literal, set):
                    return {conversion_function(lit) for lit in literal}  # type: ignore
                else:
                    return conversion_function(literal)  # type: ignore
        return literal

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> List[Tuple[str, str, Any]]:
        field = term.ref().field
        return [(term.ref().field.name, "in", self._cast_if_necessary(field.field_type, literals))]

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> List[Tuple[str, str, Any]]:
        field = term.ref().field
        return [(field.name, "not in", self._cast_if_necessary(field.field_type, literals))]

    def visit_is_nan(self, term: BoundTerm[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "==", float("nan"))]

    def visit_not_nan(self, term: BoundTerm[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "!=", float("nan"))]

    def visit_is_null(self, term: BoundTerm[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "==", None)]

    def visit_not_null(self, term: BoundTerm[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "!=", None)]

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "==", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "!=", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, ">=", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, ">", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "<", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return [(term.ref().field.name, "<=", self._cast_if_necessary(term.ref().field.field_type, literal.value))]

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return []

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> List[Tuple[str, str, Any]]:
        return []

    def visit_true(self) -> List[Tuple[str, str, Any]]:
        return []  # Not supported

    def visit_false(self) -> List[Tuple[str, str, Any]]:
        raise ValueError("Not supported: AlwaysFalse")

    def visit_not(self, child_result: List[Tuple[str, str, Any]]) -> List[Tuple[str, str, Any]]:
        raise ValueError(f"Not allowed: {child_result}")

    def visit_and(
        self, left_result: List[Tuple[str, str, Any]], right_result: List[Tuple[str, str, Any]]
    ) -> List[Tuple[str, str, Any]]:
        return left_result + right_result

    def visit_or(
        self, left_result: List[Tuple[str, str, Any]], right_result: List[Tuple[str, str, Any]]
    ) -> List[Tuple[str, str, Any]]:
        raise ValueError(f"Not allowed: {left_result} || {right_result}")


def expression_to_plain_format(
    expressions: Tuple[BooleanExpression, ...], cast_int_to_datetime: bool = False
) -> List[List[Tuple[str, str, Any]]]:
    """Format a Disjunctive Normal Form expression.

    These are the formats that the expression can be fed into:

    - https://arrow.apache.org/docs/python/generated/pyarrow.parquet.read_table.html
    - https://docs.dask.org/en/stable/generated/dask.dataframe.read_parquet.html

    Contrary to normal DNF that may contain Not expressions, but here they should have
    been rewritten. This can be done using ``rewrite_not(...)``.

    Keep in mind that this is only used for page skipping, and still needs to filter
    on a row level.

    Args:
        expressions: Expression in Disjunctive Normal Form.

    Returns:
        Formatter filter compatible with Dask and PyArrow.
    """
    # In the form of expr1 ∨ expr2 ∨ ... ∨ exprN
    visitor = ExpressionToPlainFormat(cast_int_to_datetime)
    return [visit(expression, visitor) for expression in expressions]


class _MetricsEvaluator(BoundBooleanExpressionVisitor[bool], ABC):
    value_counts: Dict[int, int]
    null_counts: Dict[int, int]
    nan_counts: Dict[int, int]
    lower_bounds: Dict[int, bytes]
    upper_bounds: Dict[int, bytes]

    def visit_true(self) -> bool:
        # all rows match
        return ROWS_MIGHT_MATCH

    def visit_false(self) -> bool:
        # all rows fail
        return ROWS_CANNOT_MATCH

    def visit_not(self, child_result: bool) -> bool:
        raise ValueError(f"NOT should be rewritten: {child_result}")

    def visit_and(self, left_result: bool, right_result: bool) -> bool:
        return left_result and right_result

    def visit_or(self, left_result: bool, right_result: bool) -> bool:
        return left_result or right_result

    def _contains_nulls_only(self, field_id: int) -> bool:
        if (value_count := self.value_counts.get(field_id)) and (null_count := self.null_counts.get(field_id)):
            return value_count == null_count
        return False

    def _contains_nans_only(self, field_id: int) -> bool:
        if (nan_count := self.nan_counts.get(field_id)) and (value_count := self.value_counts.get(field_id)):
            return nan_count == value_count
        return False

    def _is_nan(self, val: Any) -> bool:
        try:
            return math.isnan(val)
        except TypeError:
            # In the case of None or other non-numeric types
            return False


class _InclusiveMetricsEvaluator(_MetricsEvaluator):
    struct: StructType
    expr: BooleanExpression

    def __init__(
        self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True, include_empty_files: bool = False
    ) -> None:
        self.struct = schema.as_struct()
        self.include_empty_files = include_empty_files
        self.expr = bind(schema, rewrite_not(expr), case_sensitive)

    def eval(self, file: DataFile) -> bool:
        """Test whether the file may contain records that match the expression."""
        if not self.include_empty_files and file.record_count == 0:
            return ROWS_CANNOT_MATCH

        if file.record_count < 0:
            # Older version don't correctly implement record count from avro file and thus
            # set record count -1 when importing avro tables to iceberg tables. This should
            # be updated once we implemented and set correct record count.
            return ROWS_MIGHT_MATCH

        self.value_counts = file.value_counts or EMPTY_DICT
        self.null_counts = file.null_value_counts or EMPTY_DICT
        self.nan_counts = file.nan_value_counts or EMPTY_DICT
        self.lower_bounds = file.lower_bounds or EMPTY_DICT
        self.upper_bounds = file.upper_bounds or EMPTY_DICT

        return visit(self.expr, self)

    def _may_contain_null(self, field_id: int) -> bool:
        return self.null_counts is None or (field_id in self.null_counts and self.null_counts.get(field_id) is not None)

    def _contains_nans_only(self, field_id: int) -> bool:
        if (nan_count := self.nan_counts.get(field_id)) and (value_count := self.value_counts.get(field_id)):
            return nan_count == value_count
        return False

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self.null_counts.get(field_id) == 0:
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has no non-null values, the expression cannot match
        field_id = term.ref().field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self.nan_counts.get(field_id) == 0:
            return ROWS_CANNOT_MATCH

        # when there's no nanCounts information, but we already know the column only contains null,
        # it's guaranteed that there's no NaN value
        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)

            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound >= literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound > literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if upper_bound <= literal.value:  # type: ignore[operator]
                if self._is_nan(upper_bound):
                    # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                    return ROWS_MIGHT_MATCH

                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if upper_bound < literal.value:  # type: ignore[operator]
                if self._is_nan(upper_bound):
                    # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                    return ROWS_MIGHT_MATCH

                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if lower_bound > literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            if self._is_nan(upper_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            if upper_bound < literal.value:  # type: ignore[operator]
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_MATCH

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field = term.ref().field
        field_id = field.field_id

        if self._contains_nulls_only(field_id) or self._contains_nans_only(field_id):
            return ROWS_CANNOT_MATCH

        if len(literals) > IN_PREDICATE_LIMIT:
            # skip evaluating the predicate if the number of values is too big
            return ROWS_MIGHT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = from_bytes(field.field_type, lower_bound_bytes)
            if self._is_nan(lower_bound):
                # NaN indicates unreliable bounds. See the InclusiveMetricsEvaluator docs for more.
                return ROWS_MIGHT_MATCH

            literals = {lit for lit in literals if lower_bound <= lit}  # type: ignore[operator]
            if len(literals) == 0:
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = from_bytes(field.field_type, upper_bound_bytes)
            # this is different from Java, here NaN is always larger
            if self._is_nan(upper_bound):
                return ROWS_MIGHT_MATCH

            literals = {lit for lit in literals if upper_bound >= lit}  # type: ignore[operator]
            if len(literals) == 0:
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        # because the bounds are not necessarily a min or max value, this cannot be answered using
        # them. notIn(col, {X, ...}) with (X, Y) doesn't guarantee that X is a value in col.
        return ROWS_MIGHT_MATCH

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id: int = field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_CANNOT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        prefix = str(literal.value)
        len_prefix = len(prefix)

        if lower_bound_bytes := self.lower_bounds.get(field_id):
            lower_bound = str(from_bytes(field.field_type, lower_bound_bytes))

            # truncate lower bound so that its length is not greater than the length of prefix
            if lower_bound and lower_bound[:len_prefix] > prefix:
                return ROWS_CANNOT_MATCH

        if upper_bound_bytes := self.upper_bounds.get(field_id):
            upper_bound = str(from_bytes(field.field_type, upper_bound_bytes))

            # truncate upper bound so that its length is not greater than the length of prefix
            if upper_bound is not None and upper_bound[:len_prefix] < prefix:
                return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        field = term.ref().field
        field_id: int = field.field_id

        if self._may_contain_null(field_id):
            return ROWS_MIGHT_MATCH

        if not isinstance(field.field_type, PrimitiveType):
            raise ValueError(f"Expected PrimitiveType: {field.field_type}")

        prefix = str(literal.value)
        len_prefix = len(prefix)

        # not_starts_with will match unless all values must start with the prefix. This happens when
        # the lower and upper bounds both start with the prefix.
        if (lower_bound_bytes := self.lower_bounds.get(field_id)) and (upper_bound_bytes := self.upper_bounds.get(field_id)):
            lower_bound = str(from_bytes(field.field_type, lower_bound_bytes))
            upper_bound = str(from_bytes(field.field_type, upper_bound_bytes))

            # if lower is shorter than the prefix then lower doesn't start with the prefix
            if len(lower_bound) < len_prefix:
                return ROWS_MIGHT_MATCH

            if lower_bound[:len_prefix] == prefix:
                # if upper is shorter than the prefix then upper can't start with the prefix
                if len(upper_bound) < len_prefix:
                    return ROWS_MIGHT_MATCH

                if upper_bound[:len_prefix] == prefix:
                    return ROWS_CANNOT_MATCH

        return ROWS_MIGHT_MATCH


def strict_projection(
    schema: Schema, spec: PartitionSpec, case_sensitive: bool = True
) -> Callable[[BooleanExpression], BooleanExpression]:
    return StrictProjection(schema, spec, case_sensitive).project


class StrictProjection(ProjectionEvaluator):
    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
        parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)

        result: BooleanExpression = AlwaysFalse()
        for part in parts:
            # consider (ts > 2019-01-01T01:00:00) with day(ts) and hour(ts)
            # projections: d >= 2019-01-02 and h >= 2019-01-01-02 (note the inclusive bounds).
            # any timestamp where either projection predicate is true must match the original
            # predicate. For example, ts = 2019-01-01T03:00:00 matches the hour projection but not
            # the day, but does match the original predicate.
            strict_projection = part.transform.strict_project(name=part.name, pred=predicate)
            if strict_projection is not None:
                result = Or(result, strict_projection)

        return result


class _StrictMetricsEvaluator(_MetricsEvaluator):
    struct: StructType
    expr: BooleanExpression

    def __init__(
        self, schema: Schema, expr: BooleanExpression, case_sensitive: bool = True, include_empty_files: bool = False
    ) -> None:
        self.struct = schema.as_struct()
        self.include_empty_files = include_empty_files
        self.expr = bind(schema, rewrite_not(expr), case_sensitive)

    def eval(self, file: DataFile) -> bool:
        """Test whether all records within the file match the expression.

        Args:
            file: A data file

        Returns: false if the file may contain any row that doesn't match
                    the expression, true otherwise.
        """
        if file.record_count <= 0:
            # Older version don't correctly implement record count from avro file and thus
            # set record count -1 when importing avro tables to iceberg tables. This should
            # be updated once we implemented and set correct record count.
            return ROWS_MUST_MATCH

        self.value_counts = file.value_counts or EMPTY_DICT
        self.null_counts = file.null_value_counts or EMPTY_DICT
        self.nan_counts = file.nan_value_counts or EMPTY_DICT
        self.lower_bounds = file.lower_bounds or EMPTY_DICT
        self.upper_bounds = file.upper_bounds or EMPTY_DICT

        return visit(self.expr, self)

    def visit_is_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has any non-null values, the expression does not match
        field_id = term.ref().field.field_id

        if self._contains_nulls_only(field_id):
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_not_null(self, term: BoundTerm[L]) -> bool:
        # no need to check whether the field is required because binding evaluates that case
        # if the column has any non-null values, the expression does not match
        field_id = term.ref().field.field_id

        if (null_count := self.null_counts.get(field_id)) is not None and null_count == 0:
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_is_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._contains_nans_only(field_id):
            return ROWS_MUST_MATCH
        else:
            return ROWS_MIGHT_NOT_MATCH

    def visit_not_nan(self, term: BoundTerm[L]) -> bool:
        field_id = term.ref().field.field_id

        if (nan_count := self.nan_counts.get(field_id)) is not None and nan_count == 0:
            return ROWS_MUST_MATCH

        if self._contains_nulls_only(field_id):
            return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <----------Min----Max---X------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            field = self._get_field(field_id)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper < literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <----------Min----Max---X------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            field = self._get_field(field_id)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper <= literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <-------X---Min----Max---------->

        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if lower_bytes := self.lower_bounds.get(field_id):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower > literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when: <-------X---Min----Max---------->
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if lower_bytes := self.lower_bounds.get(field_id):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower >= literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when Min == X == Max
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)):
            field = self._get_field(field_id)
            lower = _from_byte_buffer(field.field_type, lower_bytes)
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if lower != literal.value or upper != literal.value:
                return ROWS_MIGHT_NOT_MATCH
            else:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        # Rows must match when X < Min or Max < X because it is not in the range
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MUST_MATCH

        field = self._get_field(field_id)

        if lower_bytes := self.lower_bounds.get(field_id):
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the _StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            if lower > literal.value:
                return ROWS_MUST_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            if upper < literal.value:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MIGHT_NOT_MATCH

        field = self._get_field(field_id)

        if (lower_bytes := self.lower_bounds.get(field_id)) and (upper_bytes := self.upper_bounds.get(field_id)):
            # similar to the implementation in eq, first check if the lower bound is in the set
            lower = _from_byte_buffer(field.field_type, lower_bytes)
            if lower not in literals:
                return ROWS_MIGHT_NOT_MATCH

            # check if the upper bound is in the set
            upper = _from_byte_buffer(field.field_type, upper_bytes)
            if upper not in literals:
                return ROWS_MIGHT_NOT_MATCH

            # finally check if the lower bound and the upper bound are equal
            if lower != upper:
                return ROWS_MIGHT_NOT_MATCH

            # All values must be in the set if the lower bound and the upper bound are
            # in the set and are equal.
            return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> bool:
        field_id = term.ref().field.field_id

        if self._can_contain_nulls(field_id) or self._can_contain_nans(field_id):
            return ROWS_MUST_MATCH

        field = self._get_field(field_id)

        if lower_bytes := self.lower_bounds.get(field_id):
            lower = _from_byte_buffer(field.field_type, lower_bytes)

            if self._is_nan(lower):
                # NaN indicates unreliable bounds.
                # See the StrictMetricsEvaluator docs for more.
                return ROWS_MIGHT_NOT_MATCH

            literals = {val for val in literals if lower <= val}
            if len(literals) == 0:
                return ROWS_MUST_MATCH

        if upper_bytes := self.upper_bounds.get(field_id):
            upper = _from_byte_buffer(field.field_type, upper_bytes)

            literals = {val for val in literals if upper >= val}

            if len(literals) == 0:
                return ROWS_MUST_MATCH

        return ROWS_MIGHT_NOT_MATCH

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_NOT_MATCH

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> bool:
        return ROWS_MIGHT_NOT_MATCH

    def _get_field(self, field_id: int) -> NestedField:
        field = self.struct.field(field_id=field_id)
        if field is None:
            raise ValueError(f"Cannot find field, might be nested or missing: {field_id}")

        return field

    def _can_contain_nulls(self, field_id: int) -> bool:
        return (null_count := self.null_counts.get(field_id)) is not None and null_count > 0

    def _can_contain_nans(self, field_id: int) -> bool:
        return (nan_count := self.nan_counts.get(field_id)) is not None and nan_count > 0


class ResidualVisitor(BoundBooleanExpressionVisitor[BooleanExpression], ABC):
    """Finds the residuals for an Expression the partitions in the given PartitionSpec.

    A residual expression is made by partially evaluating an expression using partition values.
    For example, if a table is partitioned by day(utc_timestamp) and is read with a filter expression
    utc_timestamp > a and utc_timestamp < b, then there are 4 possible residuals expressions
    for the partition data, d:


    1. If d > day(a) and d &lt; day(b), the residual is always true
    2. If d == day(a) and d != day(b), the residual is utc_timestamp > a
    3. if d == day(b) and d != day(a), the residual is utc_timestamp < b
    4. If d == day(a) == day(b), the residual is utc_timestamp > a and utc_timestamp < b
    Partition data is passed using StructLike. Residuals are returned by residualFor(StructLike).
    """

    schema: Schema
    spec: PartitionSpec
    case_sensitive: bool
    expr: BooleanExpression

    def __init__(self, schema: Schema, spec: PartitionSpec, case_sensitive: bool, expr: BooleanExpression) -> None:
        self.schema = schema
        self.spec = spec
        self.case_sensitive = case_sensitive
        self.expr = expr

    def eval(self, partition_data: Record) -> BooleanExpression:
        self.struct = partition_data
        return visit(self.expr, visitor=self)

    def visit_true(self) -> BooleanExpression:
        return AlwaysTrue()

    def visit_false(self) -> BooleanExpression:
        return AlwaysFalse()

    def visit_not(self, child_result: BooleanExpression) -> BooleanExpression:
        return Not(child_result)

    def visit_and(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return And(left_result, right_result)

    def visit_or(self, left_result: BooleanExpression, right_result: BooleanExpression) -> BooleanExpression:
        return Or(left_result, right_result)

    def visit_is_null(self, term: BoundTerm[L]) -> BooleanExpression:
        if term.eval(self.struct) is None:
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_not_null(self, term: BoundTerm[L]) -> BooleanExpression:
        if term.eval(self.struct) is not None:
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_is_nan(self, term: BoundTerm[L]) -> BooleanExpression:
        val = term.eval(self.struct)
        if isinstance(val, SupportsFloat) and math.isnan(val):
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_nan(self, term: BoundTerm[L]) -> BooleanExpression:
        val = term.eval(self.struct)
        if isinstance(val, SupportsFloat) and not math.isnan(val):
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_less_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) < literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_less_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) <= literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_greater_than(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) > literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_greater_than_or_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) >= literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) == literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_equal(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if term.eval(self.struct) != literal.value:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
        if term.eval(self.struct) in literals:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_not_in(self, term: BoundTerm[L], literals: Set[L]) -> BooleanExpression:
        if term.eval(self.struct) not in literals:
            return self.visit_true()
        else:
            return self.visit_false()

    def visit_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        eval_res = term.eval(self.struct)
        if eval_res is not None and str(eval_res).startswith(str(literal.value)):
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_not_starts_with(self, term: BoundTerm[L], literal: Literal[L]) -> BooleanExpression:
        if not self.visit_starts_with(term, literal):
            return AlwaysTrue()
        else:
            return AlwaysFalse()

    def visit_bound_predicate(self, predicate: BoundPredicate[Any]) -> BooleanExpression:
        """
        If there is no strict projection or if it evaluates to false, then return the predicate.

        Get the strict projection and inclusive projection of this predicate in partition data,
        then use them to determine whether to return the original predicate. The strict projection
        returns true iff the original predicate would have returned true, so the predicate can be
        eliminated if the strict projection evaluates to true. Similarly the inclusive projection
        returns false iff the original predicate would have returned false, so the predicate can
        also be eliminated if the inclusive projection evaluates to false.

        """
        parts = self.spec.fields_by_source_id(predicate.term.ref().field.field_id)
        if parts == []:
            return predicate

        def struct_to_schema(struct: StructType) -> Schema:
            return Schema(*struct.fields)

        for part in parts:
            strict_projection = part.transform.strict_project(part.name, predicate)
            strict_result = None

            if strict_projection is not None:
                bound = strict_projection.bind(
                    struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
                )
                if isinstance(bound, BoundPredicate):
                    strict_result = super().visit_bound_predicate(bound)
                else:
                    # if the result is not a predicate, then it must be a constant like alwaysTrue or alwaysFalse
                    strict_result = bound

            if isinstance(strict_result, AlwaysTrue):
                return AlwaysTrue()

            inclusive_projection = part.transform.project(part.name, predicate)
            inclusive_result = None
            if inclusive_projection is not None:
                bound_inclusive = inclusive_projection.bind(
                    struct_to_schema(self.spec.partition_type(self.schema)), case_sensitive=self.case_sensitive
                )
                if isinstance(bound_inclusive, BoundPredicate):
                    # using predicate method specific to inclusive
                    inclusive_result = super().visit_bound_predicate(bound_inclusive)
                else:
                    # if the result is not a predicate, then it must be a constant like alwaysTrue or
                    # alwaysFalse
                    inclusive_result = bound_inclusive
            if isinstance(inclusive_result, AlwaysFalse):
                return AlwaysFalse()

        return predicate

    def visit_unbound_predicate(self, predicate: UnboundPredicate[L]) -> BooleanExpression:
        bound = predicate.bind(self.schema, case_sensitive=self.case_sensitive)

        if isinstance(bound, BoundPredicate):
            bound_residual = self.visit_bound_predicate(predicate=bound)
            if not isinstance(bound_residual, (AlwaysFalse, AlwaysTrue)):
                # replace inclusive original unbound predicate
                return predicate

            # use the non-predicate residual (e.g. alwaysTrue)
            return bound_residual

        # if binding didn't result in a Predicate, return the expression
        return bound


class ResidualEvaluator(ResidualVisitor):
    def residual_for(self, partition_data: Record) -> BooleanExpression:
        return self.eval(partition_data)


class UnpartitionedResidualEvaluator(ResidualEvaluator):
    # Finds the residuals for an Expression the partitions in the given PartitionSpec
    def __init__(self, schema: Schema, expr: BooleanExpression):
        super().__init__(schema=schema, spec=UNPARTITIONED_PARTITION_SPEC, expr=expr, case_sensitive=False)
        self.expr = expr

    def residual_for(self, partition_data: Record) -> BooleanExpression:
        return self.expr


def residual_evaluator_of(
    spec: PartitionSpec, expr: BooleanExpression, case_sensitive: bool, schema: Schema
) -> ResidualEvaluator:
    return (
        UnpartitionedResidualEvaluator(schema=schema, expr=expr)
        if spec.is_unpartitioned()
        else ResidualEvaluator(spec=spec, expr=expr, schema=schema, case_sensitive=case_sensitive)
    )
