pyiceberg/expressions/parser.py (211 lines of code) (raw):

# Licensed to the Apache Software Foundation (ASF) under one # or more contributor license agreements. See the NOTICE file # distributed with this work for additional information # regarding copyright ownership. The ASF licenses this file # to you under the Apache License, Version 2.0 (the # "License"); you may not use this file except in compliance # with the License. You may obtain a copy of the License at # # http://www.apache.org/licenses/LICENSE-2.0 # # Unless required by applicable law or agreed to in writing, # software distributed under the License is distributed on an # "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY # KIND, either express or implied. See the License for the # specific language governing permissions and limitations # under the License. import re from decimal import Decimal from pyparsing import ( CaselessKeyword, DelimitedList, Group, MatchFirst, ParseException, ParserElement, ParseResults, QuotedString, Suppress, Word, alphanums, alphas, infix_notation, one_of, opAssoc, sgl_quoted_string, ) from pyparsing.common import pyparsing_common as common from pyiceberg.expressions import ( AlwaysFalse, AlwaysTrue, And, BooleanExpression, EqualTo, GreaterThan, GreaterThanOrEqual, In, IsNaN, IsNull, LessThan, LessThanOrEqual, Not, NotEqualTo, NotIn, NotNaN, NotNull, Or, Reference, StartsWith, ) from pyiceberg.expressions.literals import ( BooleanLiteral, DecimalLiteral, Literal, LongLiteral, StringLiteral, ) from pyiceberg.typedef import L from pyiceberg.types import strtobool ParserElement.enablePackrat() AND = CaselessKeyword("and") OR = CaselessKeyword("or") NOT = CaselessKeyword("not") IS = CaselessKeyword("is") IN = CaselessKeyword("in") NULL = CaselessKeyword("null") NAN = CaselessKeyword("nan") LIKE = CaselessKeyword("like") unquoted_identifier = Word(alphas + "_", alphanums + "_$") quoted_identifier = QuotedString('"', escChar="\\", unquoteResults=True) @quoted_identifier.set_parse_action def validate_quoted_identifier(result: ParseResults) -> str: if "." in result[0]: raise ParseException("Expected '\"', found '.'") return result[0] identifier = MatchFirst([unquoted_identifier, quoted_identifier]).set_results_name("identifier") column = DelimitedList(identifier, delim=".", combine=False).set_results_name("column") like_regex = r"(?P<valid_wildcard>(?<!\\)%$)|(?P<invalid_wildcard>(?<!\\)%)" @column.set_parse_action def _(result: ParseResults) -> Reference: return Reference(".".join(result.column)) boolean = one_of(["true", "false"], caseless=True).set_results_name("boolean") string = sgl_quoted_string.set_results_name("raw_quoted_string") decimal = common.real().set_results_name("decimal") integer = common.signed_integer().set_results_name("integer") literal = Group(string | decimal | integer | boolean).set_results_name("literal") literal_set = Group( DelimitedList(string) | DelimitedList(decimal) | DelimitedList(integer) | DelimitedList(boolean) ).set_results_name("literal_set") @boolean.set_parse_action def _(result: ParseResults) -> Literal[bool]: if strtobool(result.boolean): return BooleanLiteral(True) else: return BooleanLiteral(False) @string.set_parse_action def _(result: ParseResults) -> Literal[str]: return StringLiteral(result.raw_quoted_string[1:-1].replace("''", "'")) @decimal.set_parse_action def _(result: ParseResults) -> Literal[Decimal]: return DecimalLiteral(Decimal(result.decimal)) @integer.set_parse_action def _(result: ParseResults) -> Literal[int]: return LongLiteral(int(result.integer)) @literal.set_parse_action def _(result: ParseResults) -> Literal[L]: return result[0][0] @literal_set.set_parse_action def _(result: ParseResults) -> Literal[L]: return result[0] comparison_op = one_of(["<", "<=", ">", ">=", "=", "==", "!=", "<>"], caseless=True).set_results_name("op") left_ref = column + comparison_op + literal right_ref = literal + comparison_op + column comparison = left_ref | right_ref @left_ref.set_parse_action def _(result: ParseResults) -> BooleanExpression: if result.op == "<": return LessThan(result.column, result.literal) elif result.op == "<=": return LessThanOrEqual(result.column, result.literal) elif result.op == ">": return GreaterThan(result.column, result.literal) elif result.op == ">=": return GreaterThanOrEqual(result.column, result.literal) if result.op in ("=", "=="): return EqualTo(result.column, result.literal) if result.op in ("!=", "<>"): return NotEqualTo(result.column, result.literal) raise ValueError(f"Unsupported operation type: {result.op}") @right_ref.set_parse_action def _(result: ParseResults) -> BooleanExpression: if result.op == "<": return GreaterThan(result.column, result.literal) elif result.op == "<=": return GreaterThanOrEqual(result.column, result.literal) elif result.op == ">": return LessThan(result.column, result.literal) elif result.op == ">=": return LessThanOrEqual(result.column, result.literal) elif result.op in ("=", "=="): return EqualTo(result.column, result.literal) elif result.op in ("!=", "<>"): return NotEqualTo(result.column, result.literal) raise ValueError(f"Unsupported operation type: {result.op}") is_null = column + IS + NULL not_null = column + IS + NOT + NULL null_check = is_null | not_null @is_null.set_parse_action def _(result: ParseResults) -> BooleanExpression: return IsNull(result.column) @not_null.set_parse_action def _(result: ParseResults) -> BooleanExpression: return NotNull(result.column) is_nan = column + IS + NAN not_nan = column + IS + NOT + NAN nan_check = is_nan | not_nan @is_nan.set_parse_action def _(result: ParseResults) -> BooleanExpression: return IsNaN(result.column) @not_nan.set_parse_action def _(result: ParseResults) -> BooleanExpression: return NotNaN(result.column) is_in = column + IN + "(" + literal_set + ")" not_in = column + NOT + IN + "(" + literal_set + ")" in_check = is_in | not_in @is_in.set_parse_action def _(result: ParseResults) -> BooleanExpression: return In(result.column, result.literal_set) @not_in.set_parse_action def _(result: ParseResults) -> BooleanExpression: return NotIn(result.column, result.literal_set) starts_with = column + LIKE + string not_starts_with = column + NOT + LIKE + string starts_check = starts_with | not_starts_with @starts_with.set_parse_action def _(result: ParseResults) -> BooleanExpression: return _evaluate_like_statement(result) @not_starts_with.set_parse_action def _(result: ParseResults) -> BooleanExpression: return ~_evaluate_like_statement(result) def _evaluate_like_statement(result: ParseResults) -> BooleanExpression: literal_like: StringLiteral = result.raw_quoted_string match = re.search(like_regex, literal_like.value) if match and match.groupdict()["invalid_wildcard"]: raise ValueError("LIKE expressions only supports wildcard, '%', at the end of a string") elif match and match.groupdict()["valid_wildcard"]: return StartsWith(result.column, StringLiteral(literal_like.value[:-1].replace("\\%", "%"))) else: return EqualTo(result.column, StringLiteral(literal_like.value.replace("\\%", "%"))) predicate = (comparison | in_check | null_check | nan_check | starts_check | boolean).set_results_name("predicate") def handle_not(result: ParseResults) -> Not: return Not(result[0][0]) def handle_and(result: ParseResults) -> And: return And(*result[0]) def handle_or(result: ParseResults) -> Or: return Or(*result[0]) def handle_always_expression(result: ParseResults) -> BooleanExpression: # If the entire result is "true" or "false", return AlwaysTrue or AlwaysFalse expr = result[0] if isinstance(expr, BooleanLiteral): if expr.value: return AlwaysTrue() else: return AlwaysFalse() return result[0] boolean_expression = ( infix_notation( predicate, [ (Suppress(NOT), 1, opAssoc.RIGHT, handle_not), (Suppress(AND), 2, opAssoc.LEFT, handle_and), (Suppress(OR), 2, opAssoc.LEFT, handle_or), ], ) .set_name("expr") .add_parse_action(handle_always_expression) ) def parse(expr: str) -> BooleanExpression: """Parse a boolean expression.""" return boolean_expression.parse_string(expr, parse_all=True)[0]